Skip to content

fastflowtransform.dag

levels

levels(nodes)

Returns a level-wise topological ordering. - Each inner list contains nodes with no prerequisites inside the remaining graph (i.e. eligible to run in parallel). - Ordering within a level is lexicographically stable. - Validation for missing deps/cycles matches topo_sort.

Source code in src/fastflowtransform/dag.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def levels(nodes: dict[str, Node]) -> list[list[str]]:
    """
    Returns a level-wise topological ordering.
    - Each inner list contains nodes with no prerequisites inside the remaining
      graph (i.e. eligible to run in parallel).
    - Ordering within a level is lexicographically stable.
    - Validation for missing deps/cycles matches topo_sort.
    """
    # Fehlende Deps einsammeln (nur Modell-Refs; sources sind keine Nodes)
    missing = {
        n.name: sorted({d for d in (n.deps or []) if d not in nodes})
        for n in nodes.values()
        if any(d not in nodes for d in (n.deps or []))
    }
    if missing:
        raise DependencyNotFoundError(missing)

    indeg = {k: 0 for k in nodes}
    out: dict[str, set[str]] = defaultdict(set)
    for n in nodes.values():
        for d in set(n.deps or []):
            out[d].add(n.name)
            indeg[n.name] += 1

    # Start-Level: alle 0-Indegree
    current = sorted([k for k, deg in indeg.items() if deg == 0])
    lvls: list[list[str]] = []
    seen_count = 0

    while current:
        lvls.append(current)
        next_zero: set[str] = set()
        for u in current:
            seen_count += 1
            for v in sorted(out.get(u, ())):
                indeg[v] -= 1
                if indeg[v] == 0:
                    next_zero.add(v)
        current = sorted(next_zero)

    if seen_count != len(nodes):
        cyclic = [k for k, deg in indeg.items() if deg > 0]
        raise ModelCycleError(f"Cycle detected among nodes: {', '.join(sorted(cyclic))}")
    return lvls

spa_graph

spa_graph(nodes, *, sources_by_key=None, model_source_refs=None, direction='LR', rank_spacing=280, node_spacing=84, padding=24)

Build a lightweight graph payload for the SPA: - Layout computed here (no JS graph libs needed) - Browser renders SVG + pan/zoom + navigation

sources_by_key values may be SourceDoc or dict-like; we only access: .source_name, .table_name, .relation

Source code in src/fastflowtransform/dag.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
def spa_graph(
    nodes: dict[str, "Node"],
    *,
    sources_by_key: dict[tuple[str, str], Any] | None = None,
    model_source_refs: dict[str, list[tuple[str, str]]] | None = None,
    direction: str = "LR",  # LR or TB
    rank_spacing: int = 280,
    node_spacing: int = 84,
    padding: int = 24,
) -> dict[str, Any]:
    """
    Build a lightweight graph payload for the SPA:
    - Layout computed here (no JS graph libs needed)
    - Browser renders SVG + pan/zoom + navigation

    sources_by_key values may be SourceDoc or dict-like; we only access:
      .source_name, .table_name, .relation
    """
    sources_by_key = sources_by_key or {}
    model_source_refs = model_source_refs or {}

    # Levels for models only (sources aren't Nodes)
    lvls = levels(nodes)  # raises on cycles/missing deps like topo_sort

    source_keys = _collect_source_keys(
        sources_by_key=sources_by_key,
        model_source_refs=model_source_refs,
    )
    has_sources = bool(source_keys)
    model_rank_offset = 1 if has_sources else 0

    parents = _build_parents(nodes)
    ordered_levels = _build_ordered_levels(
        lvls=lvls,
        parents=parents,
        model_rank_offset=model_rank_offset,
        source_keys=source_keys,
    )
    max_count = max((len(v) for v in ordered_levels.values()), default=1)

    out_nodes = _emit_nodes(
        nodes=nodes,
        sources_by_key=sources_by_key,
        ordered_levels=ordered_levels,
        max_count=max_count,
        rank_spacing=rank_spacing,
        node_spacing=node_spacing,
        padding=padding,
    )
    out_edges = _emit_edges(
        nodes=nodes,
        sources_by_key=sources_by_key,
        model_source_refs=model_source_refs,
    )

    normalized_direction = _apply_direction(direction, out_nodes)
    bounds = _bounds(out_nodes, padding)

    return {
        "direction": normalized_direction,
        "nodes": out_nodes,
        "edges": out_edges,
        "bounds": bounds,
    }