Skip to content

dag_builder

Chain #

Bases: nx.DiGraph

Chain class.

Source code in src/dag_builder/chain_based_builder.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class Chain(nx.DiGraph):
    """Chain class."""

    def __init__(self, start_idx: int) -> None:
        """Constructor.

        Parameters
        ----------
        start_idx : int
            Index of chain head.

        """
        super().__init__()
        self.start_idx: int = start_idx
        self.end_idx: int
        self.main_tail: int

    @property
    def head(self) -> int:
        return self.start_idx

    @property
    def sub_sequence_tails(self) -> List[int]:
        tails = Util.get_exit_nodes(self)
        tails.remove(self.main_tail)
        return tails

    def build_chain(
        self,
        main_sequence_length: int,
        number_of_sub_sequence: Optional[int] = None,
    ) -> None:
        """Build chain.

        Build the chain so that the main sequence is the longest.

        Parameters
        ----------
        main_sequence_length : int
            Main sequence length
        number_of_sub_sequence : Optional[int], optional
            Number of sub sequence, by default None

        """

        def create_sequence(start_idx: int, len_sequence: int) -> None:
            self.add_node(start_idx)
            for i in range(start_idx, start_idx + len_sequence - 1):
                self.add_edge(i, i + 1)

        # Build main sequence
        main_seq_i = [i for i in range(self.start_idx, self.start_idx + main_sequence_length)]
        self.main_tail = self.end_idx = main_seq_i[-1]
        create_sequence(self.start_idx, main_sequence_length)

        # Build sub sequence (Optional)
        if number_of_sub_sequence:
            for _ in range(number_of_sub_sequence):
                src_i = random.choice(main_seq_i[:-1])
                sub_seq_len = main_sequence_length - main_seq_i.index(src_i) - 1
                sub_seq_start_idx = self.end_idx + 1
                create_sequence(sub_seq_start_idx, sub_seq_len)
                self.add_edge(src_i, sub_seq_start_idx)
                self.end_idx = sub_seq_start_idx + sub_seq_len - 1

__init__(start_idx) #

Constructor.

Parameters:

Name Type Description Default
start_idx int

Index of chain head.

required
Source code in src/dag_builder/chain_based_builder.py
15
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, start_idx: int) -> None:
    """Constructor.

    Parameters
    ----------
    start_idx : int
        Index of chain head.

    """
    super().__init__()
    self.start_idx: int = start_idx
    self.end_idx: int
    self.main_tail: int

build_chain(main_sequence_length, number_of_sub_sequence=None) #

Build chain.

Build the chain so that the main sequence is the longest.

Parameters:

Name Type Description Default
main_sequence_length int

Main sequence length

required
number_of_sub_sequence Optional[int], optional

Number of sub sequence, by default None

None
Source code in src/dag_builder/chain_based_builder.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def build_chain(
    self,
    main_sequence_length: int,
    number_of_sub_sequence: Optional[int] = None,
) -> None:
    """Build chain.

    Build the chain so that the main sequence is the longest.

    Parameters
    ----------
    main_sequence_length : int
        Main sequence length
    number_of_sub_sequence : Optional[int], optional
        Number of sub sequence, by default None

    """

    def create_sequence(start_idx: int, len_sequence: int) -> None:
        self.add_node(start_idx)
        for i in range(start_idx, start_idx + len_sequence - 1):
            self.add_edge(i, i + 1)

    # Build main sequence
    main_seq_i = [i for i in range(self.start_idx, self.start_idx + main_sequence_length)]
    self.main_tail = self.end_idx = main_seq_i[-1]
    create_sequence(self.start_idx, main_sequence_length)

    # Build sub sequence (Optional)
    if number_of_sub_sequence:
        for _ in range(number_of_sub_sequence):
            src_i = random.choice(main_seq_i[:-1])
            sub_seq_len = main_sequence_length - main_seq_i.index(src_i) - 1
            sub_seq_start_idx = self.end_idx + 1
            create_sequence(sub_seq_start_idx, sub_seq_len)
            self.add_edge(src_i, sub_seq_start_idx)
            self.end_idx = sub_seq_start_idx + sub_seq_len - 1

ChainBasedBuilder #

Bases: DAGBuilderBase

Chain-based class.

Source code in src/dag_builder/chain_based_builder.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
class ChainBasedBuilder(DAGBuilderBase):
    """Chain-based class."""

    def __init__(self, config: Config) -> None:
        super().__init__(config)

    def _validate_config(self, config: Config):
        """Validate config.

        Parameters
        ----------
        config : Config
            Inputted config.

        Raises
        ------
        InfeasibleConfigError
            An infeasible parameter was entered.

        """
        main_sequence_length = Util.get_option_max(config.main_sequence_length)
        number_of_sub_sequences = config.number_of_sub_sequences
        if main_sequence_length == 1 and number_of_sub_sequences:
            raise InfeasibleConfigError(
                "Since the length of 'Main sequence length' is 1, "
                "the sub-sequence cannot be built."
            )

        if config.vertically_link_chains:
            main_sequence_tail = config.main_sequence_tail
            sub_sequence_tail = config.sub_sequence_tail
            if main_sequence_tail is False and sub_sequence_tail is False:
                raise InfeasibleConfigError(
                    "Either 'Main sequence tail' or 'Sub sequence tail' must be set to True."
                )

            number_of_chains = Util.get_option_max(config.number_of_chains)
            number_of_entry_nodes = Util.get_option_min(config.number_of_entry_nodes)
            if number_of_entry_nodes and number_of_chains < number_of_entry_nodes:  # type: ignore
                raise InfeasibleConfigError("'Number of chains' < 'Number of entry nodes.'")

        if config.merge_chains:
            middle_of_chain = config.middle_of_chain
            exit_node = config.exit_node
            if middle_of_chain is False and exit_node is False:
                raise InfeasibleConfigError(
                    "Either 'Middle of chain' or 'Exit node' must be set to True."
                )

            number_of_chains = Util.get_option_max(config.number_of_chains)
            number_of_exit_nodes = Util.get_option_min(config.number_of_exit_nodes)
            number_of_sub_sequences = Util.get_option_min(config.number_of_sub_sequences) or 0
            if number_of_chains * (number_of_sub_sequences + 1) < number_of_exit_nodes:
                raise InfeasibleConfigError(
                    "'Number of chains' * 'Number of sub sequence' < 'Number of exit nodes.'"
                )

    def build(self) -> Generator[nx.DiGraph, None, None]:
        """Build DAG using chain-based method.

        Yields
        ------
        Generator
            DAG generator.

        Raises
        ------
        BuildFailedError
            The number of build failures exceeded the maximum number of attempts.

        """
        for _ in range(self._config.number_of_dags):
            for try_i in range(1, self._max_try + 1):
                # Build each chain
                chains: List[Chain] = []
                start_idx = 0
                for _ in range(self._config.number_of_chains):
                    chain = Chain(start_idx)
                    main_sequence_length = Util.random_choice(self._config.main_sequence_length)
                    number_of_sub_sequence = (
                        Util.random_choice(self._config.number_of_sub_sequences)
                        if self._config.number_of_sub_sequences
                        else None
                    )
                    chain.build_chain(main_sequence_length, number_of_sub_sequence)
                    chains.append(chain)
                    start_idx = chain.end_idx + 1

                # Create chain-based DAG
                chain_based_dag = ChainBasedDAG(chains)

                # Vertically link chains (optional)
                if self._config.vertically_link_chains:
                    chain_based_dag.vertically_link_chains(
                        Util.random_choice(self._config.number_of_entry_nodes),
                        self._config.main_sequence_tail,
                        self._config.sub_sequence_tail,
                    )

                # Merge chains (optional)
                if self._config.merge_chains:
                    try:
                        chain_based_dag.merge_chains(
                            Util.random_choice(self._config.number_of_exit_nodes),
                            self._config.middle_of_chain,
                            self._config.exit_node,
                        )
                        break
                    except BuildFailedError:
                        if try_i == self._max_try:
                            raise BuildFailedError(
                                f"A DAG could not be built in {self._max_try} tries."
                            )
                        continue

            yield chain_based_dag

build() #

Build DAG using chain-based method.

Yields:

Type Description
Generator

DAG generator.

Raises:

Type Description
BuildFailedError

The number of build failures exceeded the maximum number of attempts.

Source code in src/dag_builder/chain_based_builder.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def build(self) -> Generator[nx.DiGraph, None, None]:
    """Build DAG using chain-based method.

    Yields
    ------
    Generator
        DAG generator.

    Raises
    ------
    BuildFailedError
        The number of build failures exceeded the maximum number of attempts.

    """
    for _ in range(self._config.number_of_dags):
        for try_i in range(1, self._max_try + 1):
            # Build each chain
            chains: List[Chain] = []
            start_idx = 0
            for _ in range(self._config.number_of_chains):
                chain = Chain(start_idx)
                main_sequence_length = Util.random_choice(self._config.main_sequence_length)
                number_of_sub_sequence = (
                    Util.random_choice(self._config.number_of_sub_sequences)
                    if self._config.number_of_sub_sequences
                    else None
                )
                chain.build_chain(main_sequence_length, number_of_sub_sequence)
                chains.append(chain)
                start_idx = chain.end_idx + 1

            # Create chain-based DAG
            chain_based_dag = ChainBasedDAG(chains)

            # Vertically link chains (optional)
            if self._config.vertically_link_chains:
                chain_based_dag.vertically_link_chains(
                    Util.random_choice(self._config.number_of_entry_nodes),
                    self._config.main_sequence_tail,
                    self._config.sub_sequence_tail,
                )

            # Merge chains (optional)
            if self._config.merge_chains:
                try:
                    chain_based_dag.merge_chains(
                        Util.random_choice(self._config.number_of_exit_nodes),
                        self._config.middle_of_chain,
                        self._config.exit_node,
                    )
                    break
                except BuildFailedError:
                    if try_i == self._max_try:
                        raise BuildFailedError(
                            f"A DAG could not be built in {self._max_try} tries."
                        )
                    continue

        yield chain_based_dag

ChainBasedDAG #

Bases: nx.DiGraph

Chain-based DAG class.

Source code in src/dag_builder/chain_based_builder.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class ChainBasedDAG(nx.DiGraph):
    """Chain-based DAG class."""

    def __init__(self, chains: List[Chain]) -> None:
        super().__init__()
        self.chains = chains
        for chain in self.chains:
            self.add_nodes_from(chain.nodes)
            self.add_edges_from(chain.edges)

    @property
    def chain_heads(self) -> List[int]:
        return [chain.head for chain in self.chains]

    def vertically_link_chains(
        self, number_of_entry_nodes: int, link_main_tail: bool, link_sub_tail: bool
    ) -> None:
        """Vertically link chains.

        Parameters
        ----------
        number_of_entry_nodes : int
            Number of entry nodes.
        link_main_tail : bool
            Allow link in main sequence tails.
        link_sub_tail : bool
            Allow link in sub sequence tails.

        """
        # Determine source option
        src_chains = set(random.sample(self.chains, number_of_entry_nodes))
        src_option = []
        for chain in src_chains:
            if link_main_tail:
                src_option.append(chain.main_tail)
            if link_sub_tail:
                src_option += chain.sub_sequence_tails

        # Determine targets
        tgt_chains = set(self.chains) - src_chains
        targets = [chain.head for chain in tgt_chains]

        # Add edges
        for tgt_i in targets:
            src_i = Util.get_min_out_node(self, src_option)
            self.add_edge(src_i, tgt_i)

    def merge_chains(
        self, number_of_exit_nodes: int, merge_middle: bool, merge_exit: bool
    ) -> None:
        """Merge chains.

        Parameters
        ----------
        number_of_exit_nodes : int
            Number of exit nodes.
        merge_middle : bool
            Allow merge to middle nodes.
        merge_exit : bool
            Allow merge to exit nodes.

        Raises
        ------
        BuildFailedError
            No merging is possible.

        """
        selected_exits = set(random.sample(Util.get_exit_nodes(self), number_of_exit_nodes))
        sources = set(Util.get_exit_nodes(self)) - set(selected_exits)

        # Determine target option
        tgt_option = set(self.nodes()) - set(Util.get_entry_nodes(self)) - sources
        if not merge_exit:
            tgt_option -= selected_exits
        if not merge_middle:
            tgt_option = selected_exits

        def anc(dag, src_i: int) -> set:
            tmp = nx.DiGraph()
            tmp.add_nodes_from(dag.nodes)
            tmp.add_edges_from(dag.edges)
            return nx.ancestors(tmp, src_i)

        # Add edges
        copy_dag = nx.DiGraph()  # Use a copy because of a bug in nx.ancestor function.
        copy_dag.add_nodes_from(self.nodes)
        copy_dag.add_edges_from(self.edges)
        for src_i in sources:
            _tgt_option = tgt_option - nx.ancestors(copy_dag, src_i)
            if not _tgt_option:
                raise BuildFailedError("No merging is possible.")
            tgt_i = Util.get_min_in_node(self, _tgt_option)
            self.add_edge(src_i, tgt_i)
            copy_dag.add_edge(src_i, tgt_i)

merge_chains(number_of_exit_nodes, merge_middle, merge_exit) #

Merge chains.

Parameters:

Name Type Description Default
number_of_exit_nodes int

Number of exit nodes.

required
merge_middle bool

Allow merge to middle nodes.

required
merge_exit bool

Allow merge to exit nodes.

required

Raises:

Type Description
BuildFailedError

No merging is possible.

Source code in src/dag_builder/chain_based_builder.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def merge_chains(
    self, number_of_exit_nodes: int, merge_middle: bool, merge_exit: bool
) -> None:
    """Merge chains.

    Parameters
    ----------
    number_of_exit_nodes : int
        Number of exit nodes.
    merge_middle : bool
        Allow merge to middle nodes.
    merge_exit : bool
        Allow merge to exit nodes.

    Raises
    ------
    BuildFailedError
        No merging is possible.

    """
    selected_exits = set(random.sample(Util.get_exit_nodes(self), number_of_exit_nodes))
    sources = set(Util.get_exit_nodes(self)) - set(selected_exits)

    # Determine target option
    tgt_option = set(self.nodes()) - set(Util.get_entry_nodes(self)) - sources
    if not merge_exit:
        tgt_option -= selected_exits
    if not merge_middle:
        tgt_option = selected_exits

    def anc(dag, src_i: int) -> set:
        tmp = nx.DiGraph()
        tmp.add_nodes_from(dag.nodes)
        tmp.add_edges_from(dag.edges)
        return nx.ancestors(tmp, src_i)

    # Add edges
    copy_dag = nx.DiGraph()  # Use a copy because of a bug in nx.ancestor function.
    copy_dag.add_nodes_from(self.nodes)
    copy_dag.add_edges_from(self.edges)
    for src_i in sources:
        _tgt_option = tgt_option - nx.ancestors(copy_dag, src_i)
        if not _tgt_option:
            raise BuildFailedError("No merging is possible.")
        tgt_i = Util.get_min_in_node(self, _tgt_option)
        self.add_edge(src_i, tgt_i)
        copy_dag.add_edge(src_i, tgt_i)

Vertically link chains.

Parameters:

Name Type Description Default
number_of_entry_nodes int

Number of entry nodes.

required
link_main_tail bool

Allow link in main sequence tails.

required
link_sub_tail bool

Allow link in sub sequence tails.

required
Source code in src/dag_builder/chain_based_builder.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def vertically_link_chains(
    self, number_of_entry_nodes: int, link_main_tail: bool, link_sub_tail: bool
) -> None:
    """Vertically link chains.

    Parameters
    ----------
    number_of_entry_nodes : int
        Number of entry nodes.
    link_main_tail : bool
        Allow link in main sequence tails.
    link_sub_tail : bool
        Allow link in sub sequence tails.

    """
    # Determine source option
    src_chains = set(random.sample(self.chains, number_of_entry_nodes))
    src_option = []
    for chain in src_chains:
        if link_main_tail:
            src_option.append(chain.main_tail)
        if link_sub_tail:
            src_option += chain.sub_sequence_tails

    # Determine targets
    tgt_chains = set(self.chains) - src_chains
    targets = [chain.head for chain in tgt_chains]

    # Add edges
    for tgt_i in targets:
        src_i = Util.get_min_out_node(self, src_option)
        self.add_edge(src_i, tgt_i)

DAGBuilderBase #

DAG builder base class.

Source code in src/dag_builder/dag_builder_base.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class DAGBuilderBase(metaclass=ABCMeta):
    """DAG builder base class."""

    def __init__(self, config: Config, max_try: int = 100) -> None:
        """Constructor.

        Parameters
        ----------
        config : Config
            Config.
        max_try : int, optional
            Maximum number of build attempts for a single DAG, by default 100

        """
        self._validate_config(config)
        self._config = config
        self._max_try = max_try

    @abstractmethod
    def build(self) -> Generator[nx.DiGraph, None, None]:
        raise NotImplementedError

    @abstractmethod
    def _validate_config(self, config: Config):
        raise NotImplementedError

    @staticmethod
    def _force_create_entry_nodes(G: nx.DiGraph, number_of_entry_nodes: int) -> None:
        """Create an entry node forcibly.

        Add 'number_of_entry_nodes' number of new nodes to the DAG
        and make them entry nodes.
        All original entry nodes are connected to the newly added node.

        Parameters
        ----------
        G : nx.DiGraph
            DAG.
        number_of_entry_nodes : int
            Number of entry nodes.

        """
        original_entries = Util.get_entry_nodes(G)
        new_entries = [G.number_of_nodes() + i for i in range(number_of_entry_nodes)]
        G.add_nodes_from(new_entries)
        DAGBuilderBase._add_minimum_edges(new_entries, original_entries, G)

    @staticmethod
    def _force_create_exit_nodes(G: nx.DiGraph, number_of_exit_nodes: int) -> None:
        """Create an exit node forcibly.

        Add 'number_of_exit_nodes' number of new nodes to the DAG
        and make them exit nodes.
        All original exit nodes are connected to the newly added node.

        Parameters
        ----------
        G : nx.DiGraph
            DAG.
        number_of_exit_nodes : int
            Number of exit nodes.

        """
        original_exits = Util.get_exit_nodes(G)
        new_exits = [G.number_of_nodes() + i for i in range(number_of_exit_nodes)]
        G.add_nodes_from(new_exits)
        DAGBuilderBase._add_minimum_edges(original_exits, new_exits, G)

    @staticmethod
    def _add_minimum_edges(src_layer: List[int], tgt_layer: List[int], G: nx.DiGraph) -> None:
        """Add minimum edges

        Connects the source and target layers with a minimum number of edges.
        The function terminates
        when the out-degree of the source layer is greater than or equal to 1
        and the in-degree of the target layer is greater than or equal to 1.

        Parameters
        ----------
        src_layer : List[int]
            Indices of nodes that are the source of the edge.
        tgt_layer : List[int]
            Indices of nodes that are the target of the edge.
        G : nx.DiGraph
            DAG.

        """

        def is_finish() -> bool:
            for src_node_i in src_layer:
                if G.out_degree(src_node_i) == 0:
                    return False
            for tgt_node_i in tgt_layer:
                if G.in_degree(tgt_node_i) == 0:
                    return False

            return True

        while not is_finish():
            min_out_src_i = Util.get_min_out_node(G, src_layer)
            min_in_tgt_i = Util.get_min_in_node(G, tgt_layer)
            G.add_edge(min_out_src_i, min_in_tgt_i)

    @staticmethod
    def _ensure_weakly_connected(G: nx.DiGraph, keep_num_entry: bool, keep_num_exit: bool) -> None:
        """Ensure weakly connected.

        Parameters
        ----------
        G : nx.DiGraph
            DAG.
        keep_num_entry : bool
            Keep the number of entry nodes.
        keep_num_exit : bool
            Keep the number of exit nodes.

        Raises
        ------
        BuildFailedError
            The number of entry nodes and the number of exit nodes
            cannot be kept because of the size 1 component.

        """
        comps = list(nx.weakly_connected_components(G))
        if len(comps) == 1:
            return None

        comps.sort(key=lambda x: len(x))
        tgt_comp = comps.pop(-1)  # Most big component

        entry_nodes = set(Util.get_entry_nodes(G))
        exit_nodes = set(Util.get_exit_nodes(G))
        if keep_num_entry and keep_num_exit and (entry_nodes & exit_nodes):
            raise BuildFailedError(
                "The number of entry nodes and the number of exit nodes"
                "cannot be maintained because of the size 1 component."
            )
        for src_comp in comps:
            src_option = src_comp - exit_nodes if keep_num_exit else src_comp
            tgt_option = tgt_comp - entry_nodes if keep_num_entry else tgt_comp
            src_i = Util.get_min_out_node(G, src_option)
            tgt_i = Util.get_min_in_node(G, tgt_option)
            G.add_edge(src_i, tgt_i)

__init__(config, max_try=100) #

Constructor.

Parameters:

Name Type Description Default
config Config

Config.

required
max_try int, optional

Maximum number of build attempts for a single DAG, by default 100

100
Source code in src/dag_builder/dag_builder_base.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, config: Config, max_try: int = 100) -> None:
    """Constructor.

    Parameters
    ----------
    config : Config
        Config.
    max_try : int, optional
        Maximum number of build attempts for a single DAG, by default 100

    """
    self._validate_config(config)
    self._config = config
    self._max_try = max_try

DAGBuilderFactory #

DAG builder factory class.

Source code in src/dag_builder/dag_builder_factory.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class DAGBuilderFactory:
    """DAG builder factory class."""

    @staticmethod
    def create_instance(config: Config) -> DAGBuilderBase:
        """Create DAG builder instance.

        Currently supported generation methods:
        - Fan-in/fan-out method (see https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf)
        - G(n, p) method (see https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf)
        - Chain-based method

        Parameters
        ----------
        config : Config
            Config.

        Returns
        -------
        DAGBuilderBase
            DAG builder.

        Raises
        ------
        NotImplementedError
            Not implement.

        """
        generation_method = config.generation_method
        if Util.ambiguous_equals(generation_method, "fan-in/fan-out"):
            return FanInFanOutBuilder(config)
        elif Util.ambiguous_equals(generation_method, "g(n, p)"):
            return GNPBuilder(config)
        elif Util.ambiguous_equals(generation_method, "chain-based"):
            return ChainBasedBuilder(config)
        else:
            raise NotImplementedError

create_instance(config) staticmethod #

Create DAG builder instance.

Currently supported generation methods: - Fan-in/fan-out method (see https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf) - G(n, p) method (see https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf) - Chain-based method

Parameters:

Name Type Description Default
config Config

Config.

required

Returns:

Type Description
DAGBuilderBase

DAG builder.

Raises:

Type Description
NotImplementedError

Not implement.

Source code in src/dag_builder/dag_builder_factory.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@staticmethod
def create_instance(config: Config) -> DAGBuilderBase:
    """Create DAG builder instance.

    Currently supported generation methods:
    - Fan-in/fan-out method (see https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf)
    - G(n, p) method (see https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf)
    - Chain-based method

    Parameters
    ----------
    config : Config
        Config.

    Returns
    -------
    DAGBuilderBase
        DAG builder.

    Raises
    ------
    NotImplementedError
        Not implement.

    """
    generation_method = config.generation_method
    if Util.ambiguous_equals(generation_method, "fan-in/fan-out"):
        return FanInFanOutBuilder(config)
    elif Util.ambiguous_equals(generation_method, "g(n, p)"):
        return GNPBuilder(config)
    elif Util.ambiguous_equals(generation_method, "chain-based"):
        return ChainBasedBuilder(config)
    else:
        raise NotImplementedError

FanInFanOutBuilder #

Bases: DAGBuilderBase

Fan-in/fan-out class.

Source code in src/dag_builder/fan_in_fan_out_builder.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class FanInFanOutBuilder(DAGBuilderBase):
    """Fan-in/fan-out class."""

    def __init__(self, config: Config) -> None:
        super().__init__(config)
        self._max_out = (
            max(self._config.out_degree)
            if isinstance(self._config.out_degree, list)
            else self._config.out_degree
        )
        self._max_in = (
            max(self._config.in_degree)
            if isinstance(self._config.in_degree, list)
            else self._config.in_degree
        )

    def _validate_config(self, config: Config):
        """Validate config.

        Parameters
        ----------
        config : Config
            Inputted config.

        Raises
        ------
        InfeasibleConfigError
            An infeasible parameter was entered.

        """
        number_of_entry_nodes = Util.get_option_min(config.number_of_entry_nodes)
        number_of_exit_nodes = Util.get_option_min(config.number_of_exit_nodes) or 1
        number_of_nodes = Util.get_option_max(config.number_of_nodes)
        if number_of_entry_nodes + number_of_exit_nodes > number_of_nodes:  # type: ignore
            raise InfeasibleConfigError(
                "'Number of entry nodes' + 'Number of exit nodes' > 'Number of nodes'"
            )

    def build(self) -> Generator:
        """Build DAG using fan-in/fan-out method.

        See https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf.

        Yields
        ------
        Generator
            DAG generator.

        Raises
        ------
        BuildFailedError
            The number of build failures exceeded the maximum number of attempts.

        """
        for _ in range(self._config.number_of_dags):
            num_build_fail = 0

            # Determine number_of_nodes (Loop finish condition)
            num_nodes = Util.random_choice(self._config.number_of_nodes)
            num_exit = self._config.number_of_exit_nodes
            if num_exit:
                num_exit = Util.random_choice(num_exit)
                num_nodes -= num_exit

            # Initialize dag
            num_entry = Util.random_choice(self._config.number_of_entry_nodes)
            G = self._init_dag(num_entry)

            while G.number_of_nodes() != num_nodes:
                if Util.true_or_false():
                    # Fan-out
                    max_diff_node_i, diff = self._search_max_diff_node(G)
                    num_add = random.randint(1, diff)
                    add_node_i_list = [G.number_of_nodes() + i for i in range(num_add)]
                    nx.add_star(G, [max_diff_node_i] + add_node_i_list)

                else:
                    # Fan-in
                    num_sources = random.randint(1, self._max_in)
                    nodes = list(G.nodes())
                    random.shuffle(nodes)
                    sources = []
                    for node_i in nodes:
                        if G.out_degree(node_i) < self._max_out:
                            sources.append(node_i)
                            if len(sources) == num_sources:
                                break

                    add_node_i = G.number_of_nodes()
                    G.add_node(add_node_i)
                    for source_node_i in sources:
                        G.add_edge(source_node_i, add_node_i)

                # Check build fail
                if G.number_of_nodes() > num_nodes:
                    num_build_fail += 1
                    if num_build_fail == self._max_try:
                        msg = (
                            "A DAG satisfying 'Number of nodes' could not be built "
                            f"in {self._max_try} tries."
                        )
                        raise BuildFailedError(msg)
                    else:
                        G = self._init_dag(num_entry)  # reset

            # Add exit nodes (Optional)
            if num_exit:
                self._force_create_exit_nodes(G, num_exit)

            # Ensure weakly connected (Optional)
            if self._config.ensure_weakly_connected:
                self._ensure_weakly_connected(G, True, bool(num_exit))

            yield G

    def _search_max_diff_node(self, G: nx.DiGraph) -> Tuple[int, int]:
        """Search max difference node.

        Find the node with the biggest difference
        between its out-degree and max value of 'out-degree' parameter.

        Returns
        -------
        Tuple[int, int]
            - Index of node with the biggest difference
            - Difference size

        """
        min_out_i = Util.get_min_out_node(G, G.nodes)
        max_diff = self._max_out - G.out_degree(min_out_i)

        return min_out_i, max_diff

    def _init_dag(self, num_entry: int) -> nx.DiGraph:
        G = nx.DiGraph()
        for _ in range(num_entry):
            G.add_node(G.number_of_nodes())

        return G

build() #

Build DAG using fan-in/fan-out method.

See https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf.

Yields:

Type Description
Generator

DAG generator.

Raises:

Type Description
BuildFailedError

The number of build failures exceeded the maximum number of attempts.

Source code in src/dag_builder/fan_in_fan_out_builder.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def build(self) -> Generator:
    """Build DAG using fan-in/fan-out method.

    See https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf.

    Yields
    ------
    Generator
        DAG generator.

    Raises
    ------
    BuildFailedError
        The number of build failures exceeded the maximum number of attempts.

    """
    for _ in range(self._config.number_of_dags):
        num_build_fail = 0

        # Determine number_of_nodes (Loop finish condition)
        num_nodes = Util.random_choice(self._config.number_of_nodes)
        num_exit = self._config.number_of_exit_nodes
        if num_exit:
            num_exit = Util.random_choice(num_exit)
            num_nodes -= num_exit

        # Initialize dag
        num_entry = Util.random_choice(self._config.number_of_entry_nodes)
        G = self._init_dag(num_entry)

        while G.number_of_nodes() != num_nodes:
            if Util.true_or_false():
                # Fan-out
                max_diff_node_i, diff = self._search_max_diff_node(G)
                num_add = random.randint(1, diff)
                add_node_i_list = [G.number_of_nodes() + i for i in range(num_add)]
                nx.add_star(G, [max_diff_node_i] + add_node_i_list)

            else:
                # Fan-in
                num_sources = random.randint(1, self._max_in)
                nodes = list(G.nodes())
                random.shuffle(nodes)
                sources = []
                for node_i in nodes:
                    if G.out_degree(node_i) < self._max_out:
                        sources.append(node_i)
                        if len(sources) == num_sources:
                            break

                add_node_i = G.number_of_nodes()
                G.add_node(add_node_i)
                for source_node_i in sources:
                    G.add_edge(source_node_i, add_node_i)

            # Check build fail
            if G.number_of_nodes() > num_nodes:
                num_build_fail += 1
                if num_build_fail == self._max_try:
                    msg = (
                        "A DAG satisfying 'Number of nodes' could not be built "
                        f"in {self._max_try} tries."
                    )
                    raise BuildFailedError(msg)
                else:
                    G = self._init_dag(num_entry)  # reset

        # Add exit nodes (Optional)
        if num_exit:
            self._force_create_exit_nodes(G, num_exit)

        # Ensure weakly connected (Optional)
        if self._config.ensure_weakly_connected:
            self._ensure_weakly_connected(G, True, bool(num_exit))

        yield G

GNPBuilder #

Bases: DAGBuilderBase

G(n, p) class.

Source code in src/dag_builder/g_n_p_builder.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class GNPBuilder(DAGBuilderBase):
    """G(n, p) class."""

    def __init__(self, config: Config) -> None:
        super().__init__(config)

    def _validate_config(self, config: Config):
        """Validate config.

        Parameters
        ----------
        config : Config
            Inputted config.

        Raises
        ------
        InfeasibleConfigError
            An infeasible parameter was entered.

        """
        number_of_entry_nodes = Util.get_option_min(config.number_of_entry_nodes) or 1
        number_of_exit_nodes = Util.get_option_min(config.number_of_exit_nodes) or 1
        number_of_nodes = Util.get_option_max(config.number_of_nodes)
        if number_of_entry_nodes + number_of_exit_nodes > number_of_nodes:  # type: ignore
            raise InfeasibleConfigError(
                "'Number of entry nodes' + 'Number of exit nodes' > 'Number of nodes'"
            )

        if Util.get_option_max(config.probability_of_edge) > 1.0:  # type: ignore
            logger.warning("'Probability of edge' > 1.0")

    def build(self) -> Generator[nx.DiGraph, None, None]:
        """Build DAG using G(n, p) method.

        See https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf.

        Yields
        ------
        Generator
            DAG generator.

        Raises
        ------
        BuildFailedError
            The number of build failures exceeded the maximum number of attempts.

        """
        for _ in range(self._config.number_of_dags):
            for try_i in range(1, self._max_try + 1):
                # Determine number_of_nodes
                num_nodes = Util.random_choice(self._config.number_of_nodes)
                num_entry = self._config.number_of_entry_nodes
                if num_entry:
                    num_entry = Util.random_choice(num_entry)
                    num_nodes -= num_entry
                num_exit = self._config.number_of_exit_nodes
                if num_exit:
                    num_exit = Util.random_choice(num_exit)
                    num_nodes -= num_exit

                # Initialize DAG
                G = nx.DiGraph()
                for i in range(num_nodes):
                    G.add_node(G.number_of_nodes())

                # Add edge
                prob_edge = Util.random_choice(self._config.probability_of_edge)
                for i in range(num_nodes):
                    for j in range(num_nodes):
                        if random.randint(1, 100) < prob_edge * 100 and i < j:
                            G.add_edge(i, j)

                # Add entry nodes (Optional)
                if num_entry:
                    self._force_create_entry_nodes(G, num_entry)

                # Add exit nodes (Optional)
                if num_exit:
                    self._force_create_exit_nodes(G, num_exit)

                # Ensure weakly connected (Optional)
                if self._config.ensure_weakly_connected:
                    try:
                        self._ensure_weakly_connected(G, bool(num_entry), bool(num_exit))
                        break
                    except BuildFailedError:
                        if try_i == self._max_try:
                            raise BuildFailedError(
                                f"A DAG could not be built in {self._max_try} tries."
                            )
                        continue

            yield G

build() #

Build DAG using G(n, p) method.

See https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf.

Yields:

Type Description
Generator

DAG generator.

Raises:

Type Description
BuildFailedError

The number of build failures exceeded the maximum number of attempts.

Source code in src/dag_builder/g_n_p_builder.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def build(self) -> Generator[nx.DiGraph, None, None]:
    """Build DAG using G(n, p) method.

    See https://hal.archives-ouvertes.fr/hal-00471255/file/ggen.pdf.

    Yields
    ------
    Generator
        DAG generator.

    Raises
    ------
    BuildFailedError
        The number of build failures exceeded the maximum number of attempts.

    """
    for _ in range(self._config.number_of_dags):
        for try_i in range(1, self._max_try + 1):
            # Determine number_of_nodes
            num_nodes = Util.random_choice(self._config.number_of_nodes)
            num_entry = self._config.number_of_entry_nodes
            if num_entry:
                num_entry = Util.random_choice(num_entry)
                num_nodes -= num_entry
            num_exit = self._config.number_of_exit_nodes
            if num_exit:
                num_exit = Util.random_choice(num_exit)
                num_nodes -= num_exit

            # Initialize DAG
            G = nx.DiGraph()
            for i in range(num_nodes):
                G.add_node(G.number_of_nodes())

            # Add edge
            prob_edge = Util.random_choice(self._config.probability_of_edge)
            for i in range(num_nodes):
                for j in range(num_nodes):
                    if random.randint(1, 100) < prob_edge * 100 and i < j:
                        G.add_edge(i, j)

            # Add entry nodes (Optional)
            if num_entry:
                self._force_create_entry_nodes(G, num_entry)

            # Add exit nodes (Optional)
            if num_exit:
                self._force_create_exit_nodes(G, num_exit)

            # Ensure weakly connected (Optional)
            if self._config.ensure_weakly_connected:
                try:
                    self._ensure_weakly_connected(G, bool(num_entry), bool(num_exit))
                    break
                except BuildFailedError:
                    if try_i == self._max_try:
                        raise BuildFailedError(
                            f"A DAG could not be built in {self._max_try} tries."
                        )
                    continue

        yield G