22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203 | class DuckExecutor(BaseExecutor[pd.DataFrame]):
ENGINE_NAME = "duckdb"
def __init__(self, db_path: str = ":memory:"):
if db_path and db_path != ":memory:" and "://" not in db_path:
with suppress(Exception):
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.db_path = db_path
self.con = duckdb.connect(db_path)
def clone(self) -> DuckExecutor:
"""
Generates a new Executor instance with own connection for Thread-Worker.
"""
return DuckExecutor(self.db_path)
def _exec_many(self, sql: str) -> None:
"""
Execute multiple SQL statements separated by ';' on the same connection.
DuckDB normally accepts one statement per execute(), so we split here.
"""
# very simple splitter - good enough for what we emit in the executor
for stmt in (part.strip() for part in sql.split(";")):
if not stmt:
continue
self.con.execute(stmt)
# ---- Frame hooks ----
def _read_relation(self, relation: str, node: Node, deps: Iterable[str]) -> pd.DataFrame:
try:
return self.con.table(relation).df()
except CatalogException as e:
existing = [
r[0]
for r in self.con.execute(
"select table_name from information_schema.tables "
"where table_schema in ('main','temp')"
).fetchall()
]
raise RuntimeError(
f"Dependency table not found: '{relation}'\n"
f"Deps: {list(deps)}\nExisting tables: {existing}\n"
"Hinweis: gleiche Datei-DB/Connection für Seeding & Run verwenden."
) from e
def _materialize_relation(self, relation: str, df: pd.DataFrame, node: Node) -> None:
tmp = "_ff_py_out"
try:
self.con.register(tmp, df)
self.con.execute(f'create or replace table "{relation}" as select * from "{tmp}"')
finally:
try:
self.con.unregister(tmp)
except Exception:
self.con.execute(f'drop view if exists "{tmp}"')
def _create_or_replace_view_from_table(
self, view_name: str, backing_table: str, node: Node
) -> None:
self.con.execute(f'create or replace view "{view_name}" as select * from "{backing_table}"')
def _frame_name(self) -> str:
return "pandas"
# ---- SQL hooks ----
def _format_relation_for_ref(self, name: str) -> str:
return _q(relation_for(name))
def _format_source_reference(
self, cfg: dict[str, Any], source_name: str, table_name: str
) -> str:
location = cfg.get("location")
if location:
raise NotImplementedError("DuckDB executor does not support path-based sources yet.")
identifier = cfg.get("identifier")
if not identifier:
raise KeyError(f"Source {source_name}.{table_name} missing identifier")
parts = [
p
for p in (
cfg.get("catalog") or cfg.get("database"),
cfg.get("schema"),
identifier,
)
if p
]
if not parts:
parts = [identifier]
return ".".join(_q(str(part)) for part in parts)
def _create_or_replace_view(self, target_sql: str, select_body: str, node: Node) -> None:
self.con.execute(f"create or replace view {target_sql} as {select_body}")
def _create_or_replace_table(self, target_sql: str, select_body: str, node: Node) -> None:
self.con.execute(f"create or replace table {target_sql} as {select_body}")
# ---- Meta hook ----
def on_node_built(self, node: Node, relation: str, fingerprint: str) -> None:
"""
After successful materialization, ensure the meta table exists and upsert the row.
"""
# Best-effort: do not let meta errors break the run
try:
ensure_meta_table(self)
upsert_meta(self, node.name, relation, fingerprint, "duckdb")
except Exception:
pass
# ── Incremental API ────────────────────────────────────────────────────
def exists_relation(self, relation: str) -> bool:
sql = """
select 1
from information_schema.tables
where table_schema in ('main','temp')
and lower(table_name) = lower(?)
limit 1
"""
res = self.con.execute(sql, [relation]).fetchone()
return bool(res)
def create_table_as(self, relation: str, select_sql: str) -> None:
# Use only the SELECT body and strip trailing semicolons for safety.
body = self._first_select_body(select_sql).strip().rstrip(";\n\t ")
self.con.execute(f"create table {relation} as {body}")
def incremental_insert(self, relation: str, select_sql: str) -> None:
# Ensure the inner SELECT is clean (no trailing semicolon; SELECT body only).
body = self._first_select_body(select_sql).strip().rstrip(";\n\t ")
self.con.execute(f"insert into {relation} {body}")
def incremental_merge(self, relation: str, select_sql: str, unique_key: list[str]) -> None:
"""
Fallback strategy for DuckDB:
- DELETE collisions via DELETE ... USING (<select>) s
- INSERT all rows via INSERT ... SELECT * FROM (<select>)
We intentionally do NOT use a CTE here, because we execute two separate
statements and DuckDB won't see the CTE from the previous statement.
"""
# 1) clean inner SELECT
body = self._first_select_body(select_sql).strip().rstrip(";\n\t ")
# 2) predicate for DELETE
keys_pred = " AND ".join([f"t.{k}=s.{k}" for k in unique_key]) or "FALSE"
# 3) first: delete collisions
delete_sql = f"delete from {relation} t using ({body}) s where {keys_pred}"
self.con.execute(delete_sql)
# 4) then: insert fresh rows
insert_sql = f"insert into {relation} select * from ({body}) src"
self.con.execute(insert_sql)
def alter_table_sync_schema(
self, relation: str, select_sql: str, *, mode: str = "append_new_columns"
) -> None:
"""
Best-effort: add new columns with inferred type.
"""
# Probe: empty projection from the SELECT (cleaned to avoid parser issues).
body = self._first_select_body(select_sql).strip().rstrip(";\n\t ")
probe = self.con.execute(f"select * from ({body}) as q limit 0")
cols = [c[0] for c in probe.description or []]
# vorhandene Spalten
existing = {
r[0]
for r in self.con.execute(
"select column_name from information_schema.columns "
+ "where lower(table_name)=lower(?)",
[relation],
).fetchall()
}
add = [c for c in cols if c not in existing]
for c in add:
# Typ heuristisch: typeof aus einer CAST-Probe; fallback VARCHAR
try:
# Versuche Typ aus Expression abzuleiten (best effort)
self.con.execute(f"alter table {relation} add column {c} varchar")
except Exception:
self.con.execute(f"alter table {relation} add column {c} varchar")
|