Returns a level-wise topological ordering.
- Each inner list contains nodes with no prerequisites inside the remaining
graph (i.e. eligible to run in parallel).
- Ordering within a level is lexicographically stable.
- Validation for missing deps/cycles matches topo_sort.
Source code in src/fastflowtransform/dag.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 | def levels(nodes: dict[str, Node]) -> list[list[str]]:
"""
Returns a level-wise topological ordering.
- Each inner list contains nodes with no prerequisites inside the remaining
graph (i.e. eligible to run in parallel).
- Ordering within a level is lexicographically stable.
- Validation for missing deps/cycles matches topo_sort.
"""
# Fehlende Deps einsammeln (nur Modell-Refs; sources sind keine Nodes)
missing = {
n.name: sorted({d for d in (n.deps or []) if d not in nodes})
for n in nodes.values()
if any(d not in nodes for d in (n.deps or []))
}
if missing:
raise DependencyNotFoundError(missing)
indeg = {k: 0 for k in nodes}
out: dict[str, set[str]] = defaultdict(set)
for n in nodes.values():
for d in set(n.deps or []):
out[d].add(n.name)
indeg[n.name] += 1
# Start-Level: alle 0-Indegree
current = sorted([k for k, deg in indeg.items() if deg == 0])
lvls: list[list[str]] = []
seen_count = 0
while current:
lvls.append(current)
next_zero: set[str] = set()
for u in current:
seen_count += 1
for v in sorted(out.get(u, ())):
indeg[v] -= 1
if indeg[v] == 0:
next_zero.add(v)
current = sorted(next_zero)
if seen_count != len(nodes):
cyclic = [k for k, deg in indeg.items() if deg > 0]
raise ModelCycleError(f"Cycle detected among nodes: {', '.join(sorted(cyclic))}")
return lvls
|