72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 | def infer_sql_lineage(rendered_sql: str, ref_map: dict[str, str] | None = None) -> LineageMap:
"""
Infer a mapping from output column -> upstream sources (relation.column) for common patterns:
- <alias>.<col> AS <out>
- <col> AS <out> (relation unknown)
- FUNC(<alias>.<col>) AS <out> → transformed=True
- bare <alias>.<col> → out=<col>, direct
Joins with aliases are resolved via <alias> → relation from FROM/JOIN.
"""
lineage: LineageMap = {}
if not rendered_sql:
return lineage
alias_map = ref_map or _alias_map_from_sql(rendered_sql)
m = _SEL_RE.search(rendered_sql)
if not m:
return lineage
select_clause = m.group(1)
exprs = _split_select_list(select_clause)
# Patterns
as_pat = re.compile(r"^(?P<expr>.+?)\s+as\s+(?P<alias>[a-zA-Z_][\w\$]*)$", re.IGNORECASE)
qual_col = re.compile(r"^(?P<a>[a-zA-Z_]\w*)\.(?P<c>[a-zA-Z_]\w*)$")
func_of_qual = re.compile(
r"^[a-zA-Z_]\w*\s*\(\s*(?P<a>[a-zA-Z_]\w*)\.(?P<c>[a-zA-Z_]\w*)\s*\)\s*$", re.IGNORECASE
)
for raw in exprs:
expr = raw.strip()
if expr == "*" or not expr:
continue
out_col: str | None = None
expr_only = expr
m_as = as_pat.match(expr)
if m_as:
out_col = m_as.group("alias")
expr_only = m_as.group("expr").strip()
# func(alias.col)
m_func = func_of_qual.match(expr_only)
if m_func:
a, c = m_func.group("a"), m_func.group("c")
rel = alias_map.get(a)
item = {
"from_relation": rel or "?",
"from_column": c,
"transformed": True,
}
if out_col is None:
out_col = c # best-effort
_append_lineage(lineage, out_col, item)
continue
# alias.col
m_q = qual_col.match(expr_only)
if m_q:
a, c = m_q.group("a"), m_q.group("c")
rel = alias_map.get(a)
item = {
"from_relation": rel or "?",
"from_column": c,
"transformed": False,
}
if out_col is None:
out_col = c
_append_lineage(lineage, out_col, item)
continue
# plain col (no qualifier) - we can only map column name with unknown relation
m_col = re.match(r"^[a-zA-Z_]\w*$", expr_only)
if m_col:
c = expr_only
item = {"from_relation": "?", "from_column": c, "transformed": False}
if out_col is None:
out_col = c
_append_lineage(lineage, out_col, item)
continue
# func(col) or complex expression → mark as transformed with unknown relation/col
_append_lineage(
lineage, out_col, {"from_relation": "?", "from_column": "?", "transformed": True}
)
return lineage
|
Parse optional SQL comment directives
-- @lineage email_upper: users.email (transformed)