Skip to content

fastflowtransform.executors.duckdb_exec

DuckExecutor

Bases: BaseExecutor[DataFrame]

Source code in src/fastflowtransform/executors/duckdb_exec.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class DuckExecutor(BaseExecutor[pd.DataFrame]):
    ENGINE_NAME = "duckdb"

    def __init__(self, db_path: str = ":memory:"):
        if db_path and db_path != ":memory:" and "://" not in db_path:
            with suppress(Exception):
                Path(db_path).parent.mkdir(parents=True, exist_ok=True)
        self.db_path = db_path
        self.con = duckdb.connect(db_path)

    def clone(self) -> DuckExecutor:
        """
        Generates a new Executor instance with own connection for Thread-Worker.
        """
        return DuckExecutor(self.db_path)

    def _exec_many(self, sql: str) -> None:
        """
        Execute multiple SQL statements separated by ';' on the same connection.
        DuckDB normally accepts one statement per execute(), so we split here.
        """
        # very simple splitter - good enough for what we emit in the executor
        for stmt in (part.strip() for part in sql.split(";")):
            if not stmt:
                continue
            self.con.execute(stmt)

    # ---- Frame hooks ----
    def _read_relation(self, relation: str, node: Node, deps: Iterable[str]) -> pd.DataFrame:
        try:
            return self.con.table(relation).df()
        except CatalogException as e:
            existing = [
                r[0]
                for r in self.con.execute(
                    "select table_name from information_schema.tables "
                    "where table_schema in ('main','temp')"
                ).fetchall()
            ]
            raise RuntimeError(
                f"Dependency table not found: '{relation}'\n"
                f"Deps: {list(deps)}\nExisting tables: {existing}\n"
                "Hinweis: gleiche Datei-DB/Connection für Seeding & Run verwenden."
            ) from e

    def _materialize_relation(self, relation: str, df: pd.DataFrame, node: Node) -> None:
        tmp = "_ff_py_out"
        try:
            self.con.register(tmp, df)
            self.con.execute(f'create or replace table "{relation}" as select * from "{tmp}"')
        finally:
            try:
                self.con.unregister(tmp)
            except Exception:
                self.con.execute(f'drop view if exists "{tmp}"')

    def _create_or_replace_view_from_table(
        self, view_name: str, backing_table: str, node: Node
    ) -> None:
        self.con.execute(f'create or replace view "{view_name}" as select * from "{backing_table}"')

    def _frame_name(self) -> str:
        return "pandas"

    # ---- SQL hooks ----
    def _format_relation_for_ref(self, name: str) -> str:
        return _q(relation_for(name))

    def _format_source_reference(
        self, cfg: dict[str, Any], source_name: str, table_name: str
    ) -> str:
        location = cfg.get("location")
        if location:
            raise NotImplementedError("DuckDB executor does not support path-based sources yet.")

        identifier = cfg.get("identifier")
        if not identifier:
            raise KeyError(f"Source {source_name}.{table_name} missing identifier")

        parts = [
            p
            for p in (
                cfg.get("catalog") or cfg.get("database"),
                cfg.get("schema"),
                identifier,
            )
            if p
        ]
        if not parts:
            parts = [identifier]

        return ".".join(_q(str(part)) for part in parts)

    def _create_or_replace_view(self, target_sql: str, select_body: str, node: Node) -> None:
        self.con.execute(f"create or replace view {target_sql} as {select_body}")

    def _create_or_replace_table(self, target_sql: str, select_body: str, node: Node) -> None:
        self.con.execute(f"create or replace table {target_sql} as {select_body}")

    # ---- Meta hook ----
    def on_node_built(self, node: Node, relation: str, fingerprint: str) -> None:
        """
        After successful materialization, ensure the meta table exists and upsert the row.
        """
        # Best-effort: do not let meta errors break the run
        try:
            ensure_meta_table(self)
            upsert_meta(self, node.name, relation, fingerprint, "duckdb")
        except Exception:
            pass

    # ── Incremental API ────────────────────────────────────────────────────
    def exists_relation(self, relation: str) -> bool:
        sql = """
          select 1
          from information_schema.tables
          where table_schema in ('main','temp')
            and lower(table_name) = lower(?)
          limit 1
        """
        res = self.con.execute(sql, [relation]).fetchone()
        return bool(res)

    def create_table_as(self, relation: str, select_sql: str) -> None:
        # Use only the SELECT body and strip trailing semicolons for safety.
        body = self._first_select_body(select_sql).strip().rstrip(";\n\t ")
        self.con.execute(f"create table {relation} as {body}")

    def incremental_insert(self, relation: str, select_sql: str) -> None:
        # Ensure the inner SELECT is clean (no trailing semicolon; SELECT body only).
        body = self._first_select_body(select_sql).strip().rstrip(";\n\t ")
        self.con.execute(f"insert into {relation} {body}")

    def incremental_merge(self, relation: str, select_sql: str, unique_key: list[str]) -> None:
        """
        Fallback strategy for DuckDB:
        - DELETE collisions via DELETE ... USING (<select>) s
        - INSERT all rows via INSERT ... SELECT * FROM (<select>)
        We intentionally do NOT use a CTE here, because we execute two separate
        statements and DuckDB won't see the CTE from the previous statement.
        """
        # 1) clean inner SELECT
        body = self._first_select_body(select_sql).strip().rstrip(";\n\t ")

        # 2) predicate for DELETE
        keys_pred = " AND ".join([f"t.{k}=s.{k}" for k in unique_key]) or "FALSE"

        # 3) first: delete collisions
        delete_sql = f"delete from {relation} t using ({body}) s where {keys_pred}"
        self.con.execute(delete_sql)

        # 4) then: insert fresh rows
        insert_sql = f"insert into {relation} select * from ({body}) src"
        self.con.execute(insert_sql)

    def alter_table_sync_schema(
        self, relation: str, select_sql: str, *, mode: str = "append_new_columns"
    ) -> None:
        """
        Best-effort: add new columns with inferred type.
        """
        # Probe: empty projection from the SELECT (cleaned to avoid parser issues).
        body = self._first_select_body(select_sql).strip().rstrip(";\n\t ")
        probe = self.con.execute(f"select * from ({body}) as q limit 0")
        cols = [c[0] for c in probe.description or []]
        # vorhandene Spalten
        existing = {
            r[0]
            for r in self.con.execute(
                "select column_name from information_schema.columns "
                + "where lower(table_name)=lower(?)",
                [relation],
            ).fetchall()
        }
        add = [c for c in cols if c not in existing]
        for c in add:
            # Typ heuristisch: typeof aus einer CAST-Probe; fallback VARCHAR
            try:
                # Versuche Typ aus Expression abzuleiten (best effort)
                self.con.execute(f"alter table {relation} add column {c} varchar")
            except Exception:
                self.con.execute(f"alter table {relation} add column {c} varchar")

clone

clone()

Generates a new Executor instance with own connection for Thread-Worker.

Source code in src/fastflowtransform/executors/duckdb_exec.py
32
33
34
35
36
def clone(self) -> DuckExecutor:
    """
    Generates a new Executor instance with own connection for Thread-Worker.
    """
    return DuckExecutor(self.db_path)

on_node_built

on_node_built(node, relation, fingerprint)

After successful materialization, ensure the meta table exists and upsert the row.

Source code in src/fastflowtransform/executors/duckdb_exec.py
122
123
124
125
126
127
128
129
130
131
def on_node_built(self, node: Node, relation: str, fingerprint: str) -> None:
    """
    After successful materialization, ensure the meta table exists and upsert the row.
    """
    # Best-effort: do not let meta errors break the run
    try:
        ensure_meta_table(self)
        upsert_meta(self, node.name, relation, fingerprint, "duckdb")
    except Exception:
        pass

incremental_merge

incremental_merge(relation, select_sql, unique_key)

Fallback strategy for DuckDB: - DELETE collisions via DELETE ... USING () We intentionally do NOT use a CTE here, because we execute two separate statements and DuckDB won't see the CTE from the previous statement.

Source code in src/fastflowtransform/executors/duckdb_exec.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def incremental_merge(self, relation: str, select_sql: str, unique_key: list[str]) -> None:
    """
    Fallback strategy for DuckDB:
    - DELETE collisions via DELETE ... USING (<select>) s
    - INSERT all rows via INSERT ... SELECT * FROM (<select>)
    We intentionally do NOT use a CTE here, because we execute two separate
    statements and DuckDB won't see the CTE from the previous statement.
    """
    # 1) clean inner SELECT
    body = self._first_select_body(select_sql).strip().rstrip(";\n\t ")

    # 2) predicate for DELETE
    keys_pred = " AND ".join([f"t.{k}=s.{k}" for k in unique_key]) or "FALSE"

    # 3) first: delete collisions
    delete_sql = f"delete from {relation} t using ({body}) s where {keys_pred}"
    self.con.execute(delete_sql)

    # 4) then: insert fresh rows
    insert_sql = f"insert into {relation} select * from ({body}) src"
    self.con.execute(insert_sql)

alter_table_sync_schema

alter_table_sync_schema(relation, select_sql, *, mode='append_new_columns')

Best-effort: add new columns with inferred type.

Source code in src/fastflowtransform/executors/duckdb_exec.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def alter_table_sync_schema(
    self, relation: str, select_sql: str, *, mode: str = "append_new_columns"
) -> None:
    """
    Best-effort: add new columns with inferred type.
    """
    # Probe: empty projection from the SELECT (cleaned to avoid parser issues).
    body = self._first_select_body(select_sql).strip().rstrip(";\n\t ")
    probe = self.con.execute(f"select * from ({body}) as q limit 0")
    cols = [c[0] for c in probe.description or []]
    # vorhandene Spalten
    existing = {
        r[0]
        for r in self.con.execute(
            "select column_name from information_schema.columns "
            + "where lower(table_name)=lower(?)",
            [relation],
        ).fetchall()
    }
    add = [c for c in cols if c not in existing]
    for c in add:
        # Typ heuristisch: typeof aus einer CAST-Probe; fallback VARCHAR
        try:
            # Versuche Typ aus Expression abzuleiten (best effort)
            self.con.execute(f"alter table {relation} add column {c} varchar")
        except Exception:
            self.con.execute(f"alter table {relation} add column {c} varchar")

run_sql

run_sql(node, env)
Orchestrate SQL models

1) Render Jinja (ref/source/this) and strip leading {{ config(...) }}. 2) If the SQL is full DDL (CREATE …), execute it verbatim (passthrough). 3) Otherwise, normalize to CREATE OR REPLACE {TABLE|VIEW} AS . The body is CTE-aware (keeps WITH … SELECT … intact).

On failure, raise ModelExecutionError with a helpful snippet.

Source code in src/fastflowtransform/executors/base.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def run_sql(self, node: Node, env: Environment) -> None:
    """
    Orchestrate SQL models:
      1) Render Jinja (ref/source/this) and strip leading {{ config(...) }}.
      2) If the SQL is full DDL (CREATE …), execute it verbatim (passthrough).
      3) Otherwise, normalize to CREATE OR REPLACE {TABLE|VIEW} AS <body>.
         The body is CTE-aware (keeps WITH … SELECT … intact).
    On failure, raise ModelExecutionError with a helpful snippet.
    """
    sql_rendered = self.render_sql(
        node,
        env,
        ref_resolver=lambda name: self._resolve_ref(name, env),
        source_resolver=self._resolve_source,
    )
    sql = self._strip_leading_config(sql_rendered).strip()

    materialization = (node.meta or {}).get("materialized", "table")
    if materialization == "ephemeral":
        return

    # 1) Direct DDL passthrough (CREATE [OR REPLACE] {TABLE|VIEW} …)
    if self._looks_like_direct_ddl(sql):
        try:
            self._execute_sql_direct(sql, node)
            return
        except NotImplementedError:
            # Engine doesn't implement direct DDL → fall back to normalized materialization.
            pass
        except Exception as e:
            raise ModelExecutionError(
                node_name=node.name,
                relation=relation_for(node.name),
                message=str(e),
                sql_snippet=sql,
            ) from e

    # 2) Normalized materialization path (CTE-safe body)
    body = self._selectable_body(sql).rstrip(" ;\n\t")
    target_sql = self._format_relation_for_ref(node.name)

    # Centralized SQL preview logging (applies to ALL engines)
    preview = (
        f"=== MATERIALIZE ===\n"
        f"-- model: {node.name}\n"
        f"-- materialized: {materialization}\n"
        f"-- target: {target_sql}\n"
        f"{body}\n"
    )
    echo_debug(preview)

    try:
        self._apply_sql_materialization(node, target_sql, body, materialization)
    except Exception as e:
        preview = f"-- materialized={materialization}\n-- target={target_sql}\n{body}"
        raise ModelExecutionError(
            node_name=node.name,
            relation=relation_for(node.name),
            message=str(e),
            sql_snippet=preview,
        ) from e