Skip to content

fastflowtransform.lineage

infer_sql_lineage

infer_sql_lineage(rendered_sql, ref_map=None)
Infer column-level lineage for SQL
  • CTE-aware (WITH ... AS (...))
  • tracks simple transforms (lower/cast/trim/upper/etc.)
  • expands CTE edges to base relations
  • does NOT emit placeholder unknown edges; if ambiguous/unresolved -> no edge
Source code in src/fastflowtransform/lineage.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def infer_sql_lineage(rendered_sql: str, ref_map: dict[str, str] | None = None) -> LineageMap:
    """
    Infer column-level lineage for SQL:
      - CTE-aware (WITH ... AS (...))
      - tracks simple transforms (lower/cast/trim/upper/etc.)
      - expands CTE edges to base relations
      - does NOT emit placeholder unknown edges; if ambiguous/unresolved -> no edge
    """
    sql = (rendered_sql or "").strip()
    if not sql:
        return {}

    sql = _strip_to_query(sql)
    stmts = sqlparse.parse(sql)
    if not stmts:
        return {}

    # Use last statement (CREATE VIEW ...; SELECT ...; etc.)
    stmt = stmts[-1]

    # If statement still isn't query-like (e.g. CREATE ... AS SELECT ...),
    # strip again with token-based method and reparse.
    if not _contains_select(stmt):
        sql2 = _strip_to_query(str(stmt))
        stmts2 = sqlparse.parse(sql2)
        if not stmts2:
            return {}
        stmt = stmts2[-1]

    ctes, main_stmt = _split_ctes(stmt)

    # infer CTEs in order (CTEs can reference earlier CTEs)
    cte_lineage: dict[str, LineageMap] = {}
    for name, cte_sql in ctes:
        cte_map = infer_sql_lineage(cte_sql, ref_map=ref_map)
        # Expand references to already known CTEs inside this CTE (chained CTEs)
        cte_map = _expand_cte_edges(cte_map, cte_lineage)
        cte_lineage[name] = cte_map

    # infer main statement and expand through CTEs
    out = _infer_select_stmt(main_stmt, ref_map=ref_map)
    out = _expand_cte_edges(out, cte_lineage)
    return out

parse_sql_lineage_overrides

parse_sql_lineage_overrides(rendered_sql)

Parse inline overrides from SQL comments.

Supported

-- lineage: out_col <- relation.column -- lineage: out_col <- relation.column xform -- lineage: out_col <- relation.column, other_rel.other_col / lineage: out_col <- relation.column xform other <- rel.col /

Also supports JSON

-- lineage-json: {"out_col":[{"from_relation":"t","from_column":"c","transformed":true}]} / lineage-json: {...} /

Source code in src/fastflowtransform/lineage.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def parse_sql_lineage_overrides(rendered_sql: str) -> LineageMap:
    """
    Parse inline overrides from SQL comments.

    Supported:
      -- lineage: out_col <- relation.column
      -- lineage: out_col <- relation.column xform
      -- lineage: out_col <- relation.column, other_rel.other_col
      /* lineage:
            out_col <- relation.column xform
            other   <- rel.col
         */

    Also supports JSON:
      -- lineage-json: {"out_col":[{"from_relation":"t","from_column":"c","transformed":true}]}
      /* lineage-json: {...} */
    """
    sql = rendered_sql or ""
    if not sql.strip():
        return {}

    overrides: LineageMap = {}

    # JSON override blocks first (if present)
    for payload in _extract_comment_payloads(sql, keys=("lineage-json", "fft-lineage-json")):
        try:
            obj = json.loads(payload)
            if isinstance(obj, dict):
                parsed = _normalize_lineage_map(obj)
                overrides = merge_lineage(overrides, parsed)
        except Exception:
            # ignore malformed JSON override blocks
            pass

    # Text override lines
    for payload in _extract_comment_payloads(sql, keys=("lineage", "fft-lineage")):
        parsed = _parse_lineage_text_block(payload)
        overrides = merge_lineage(overrides, parsed)

    return overrides

merge_lineage

merge_lineage(base, overlay)

Union-merge two lineage maps with dedupe.

Source code in src/fastflowtransform/lineage.py
119
120
121
122
123
124
125
126
127
128
129
130
131
def merge_lineage(base: LineageMap, overlay: LineageMap) -> LineageMap:
    """
    Union-merge two lineage maps with dedupe.
    """
    out: LineageMap = {k: list(v) for k, v in (base or {}).items()}
    for col, items in (overlay or {}).items():
        if not isinstance(items, list):
            continue
        out.setdefault(col, [])
        out[col].extend(items)
        out[col] = _dedupe_items(out[col])
    # drop empties
    return {k: v for k, v in out.items() if v}

infer_py_lineage

infer_py_lineage(py_source, ref_map=None)
Minimal python lineage
  • If the file defines lineage = {...} or LINEAGE = {...}, we literal-eval it.
  • Otherwise return {}.

This keeps python models supported without forcing heavy AST/dataframe analysis.

Source code in src/fastflowtransform/lineage.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def infer_py_lineage(py_source: str, ref_map: dict[str, str] | None = None) -> LineageMap:
    """
    Minimal python lineage:
      - If the file defines __lineage__ = {...} or LINEAGE = {...}, we literal-eval it.
      - Otherwise return {}.

    This keeps python models supported without forcing heavy AST/dataframe analysis.
    """
    src = py_source or ""
    if not src.strip():
        return {}

    try:
        tree = ast.parse(src)
    except SyntaxError:
        return {}

    for node in tree.body:
        if isinstance(node, ast.Assign):
            for target in node.targets:
                if isinstance(target, ast.Name) and target.id in {
                    "__lineage__",
                    "LINEAGE",
                    "lineage",
                }:
                    try:
                        val = ast.literal_eval(node.value)
                        if isinstance(val, dict):
                            lm = _normalize_lineage_map(val)
                            # Optional: apply ref_map rewriting of relations
                            if ref_map:
                                lm = _apply_ref_map(lm, ref_map)
                            return lm
                    except Exception:
                        pass
    return {}