Skip to content

fastflowtransform.snapshots.runtime.duckdb

DuckSnapshotRuntime

Bases: BaseSnapshotRuntime[DuckSnapshotExecutor]

Snapshot runtime for DuckDB, extracted from the old SnapshotSqlMixin.

Source code in src/fastflowtransform/snapshots/runtime/duckdb.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class DuckSnapshotRuntime(BaseSnapshotRuntime[DuckSnapshotExecutor]):
    """
    Snapshot runtime for DuckDB, extracted from the old SnapshotSqlMixin.
    """

    # ---- Engine hooks -----------------------------------------------------
    def _snapshot_target_identifier(self, rel_name: str) -> str:
        return self.executor._qualified(rel_name)

    def _snapshot_current_timestamp(self) -> str:
        return "current_timestamp"

    def _snapshot_null_timestamp(self) -> str:
        return "cast(null as timestamp)"

    def _snapshot_null_hash(self) -> str:
        return "cast(null as varchar)"

    def _snapshot_hash_expr(self, check_cols: list[str], src_alias: str) -> str:
        concat_expr = self._snapshot_concat_expr(check_cols, src_alias)
        return f"cast(md5({concat_expr}) as varchar)"

    def _snapshot_cast_as_string(self, expr: str) -> str:
        return f"cast({expr} as varchar)"

    def _snapshot_source_ref(
        self, rel_name: str, select_body: str
    ) -> tuple[str, Callable[[], None]]:
        src_view_name = f"__ff_snapshot_src_{rel_name}".replace(".", "_")
        src_quoted = self.executor._quote_identifier(src_view_name)
        self.executor._execute_sql(f"create or replace temp view {src_quoted} as {select_body}")

        def _cleanup() -> None:
            self.executor._execute_sql(f"drop view if exists {src_quoted}")

        return src_quoted, _cleanup

snapshot_prune

snapshot_prune(relation, unique_key, keep_last, *, dry_run=False)

Delete older snapshot versions while keeping the most recent keep_last rows per business key (including the current row).

Source code in src/fastflowtransform/snapshots/runtime/base.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
    def snapshot_prune(
        self,
        relation: str,
        unique_key: list[str],
        keep_last: int,
        *,
        dry_run: bool = False,
    ) -> None:
        """
        Delete older snapshot versions while keeping the most recent `keep_last`
        rows per business key (including the current row).
        """
        ex = self.executor

        if keep_last <= 0:
            return

        keys = [k for k in unique_key if k]
        if not keys:
            return

        target = self._snapshot_target_identifier(relation)
        vf = self.SNAPSHOT_VALID_FROM_COL

        key_select = ", ".join(keys)
        part_by = ", ".join(keys)

        ranked_sql = f"""
SELECT
  {key_select},
  {vf},
  ROW_NUMBER() OVER (
    PARTITION BY {part_by}
    ORDER BY {vf} DESC
  ) AS rn
FROM {target}
"""

        if dry_run:
            sql = f"""
WITH ranked AS (
  {ranked_sql}
)
SELECT COUNT(*) AS rows_to_delete
FROM ranked
WHERE rn > {int(keep_last)}
"""
            res = ex._execute_sql(sql)
            count = self._snapshot_fetch_count(res)
            echo(
                f"[DRY-RUN] snapshot_prune({relation}): would delete {count} row(s) "
                f"(keep_last={keep_last})"
            )
            return

        join_pred = " AND ".join([f"t.{k} = r.{k}" for k in keys])
        delete_sql = f"""
DELETE FROM {target} t
USING (
  {ranked_sql}
) r
WHERE
  r.rn > {int(keep_last)}
  AND {join_pred}
  AND t.{vf} = r.{vf}
"""
        ex._execute_sql(delete_sql)