Skip to content

fastflowtransform.snapshots

SnapshotConfigResolved dataclass

Normalised snapshot configuration usable by executors.

Supports both
  • legacy nested config: snapshot={strategy=..., updated_at=..., check_cols=...}
  • flattened config: strategy=..., updated_at=..., check_cols=...
Source code in src/fastflowtransform/snapshots.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@dataclass
class SnapshotConfigResolved:
    """
    Normalised snapshot configuration usable by executors.

    Supports both:
      - legacy nested config: snapshot={strategy=..., updated_at=..., check_cols=...}
      - flattened config:     strategy=..., updated_at=..., check_cols=...
    """

    strategy: SnapshotStrategy
    unique_key: list[str]
    updated_at: str | None
    check_cols: list[str]

resolve_snapshot_config

resolve_snapshot_config(node, meta)

Resolve and validate snapshot configuration from a model's meta dict.

Accepted shapes

{{ config( materialized='snapshot', snapshot={ 'strategy': 'timestamp', 'updated_at': 'updated_at', 'check_cols': ['col1', 'col2'], }, unique_key='id', ) }}

OR (flattened)

{{ config( materialized='snapshot', strategy='timestamp', updated_at='updated_at', check_cols=['col1', 'col2'], unique_key='id', ) }}

Source code in src/fastflowtransform/snapshots.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def resolve_snapshot_config(node: Node, meta: Mapping[str, Any]) -> SnapshotConfigResolved:
    """
    Resolve and validate snapshot configuration from a model's meta dict.

    Accepted shapes:
      {{ config(
          materialized='snapshot',
          snapshot={
            'strategy': 'timestamp',
            'updated_at': 'updated_at',
            'check_cols': ['col1', 'col2'],
          },
          unique_key='id',
        ) }}

      OR (flattened)

      {{ config(
          materialized='snapshot',
          strategy='timestamp',
          updated_at='updated_at',
          check_cols=['col1', 'col2'],
          unique_key='id',
        ) }}
    """
    meta = dict(meta or {})

    # Optional nested block
    snapshot_block = meta.get("snapshot")
    if snapshot_block is not None and not isinstance(snapshot_block, Mapping):
        raise TypeError(
            f"{node.path}: snapshot configuration must be a mapping (snapshot={{...}})."
        )
    snapshot_block = dict(snapshot_block or {})

    # ---- unique key ----------------------------------------------------
    unique_key = _normalize_unique_key(meta.get("unique_key") or meta.get("primary_key"))
    if not unique_key:
        raise ValueError(
            f"{node.path}: snapshot models require 'unique_key' (string or list of strings)."
        )

    # ---- strategy ------------------------------------------------------
    raw_strategy = snapshot_block.get("strategy") or meta.get("strategy") or "timestamp"
    strategy_str = str(raw_strategy).lower()
    if strategy_str not in ("timestamp", "check"):
        raise ValueError(
            f"{node.path}: snapshot 'strategy' must be 'timestamp' or 'check', "
            f"got {raw_strategy!r}."
        )

    # Narrow to the Literal["timestamp", "check"] type for type-checkers
    strategy: SnapshotStrategy = "timestamp" if strategy_str == "timestamp" else "check"

    # ---- updated_at ----------------------------------------------------
    updated_at = (
        snapshot_block.get("updated_at")
        or snapshot_block.get("updated_at_column")
        or meta.get("updated_at")
        or meta.get("updated_at_column")
    )

    # ---- check_cols ----------------------------------------------------
    raw_check_cols = (
        snapshot_block.get("check_cols")
        or snapshot_block.get("check_columns")
        or meta.get("check_cols")
        or meta.get("check_columns")
    )
    check_cols = _normalize_unique_key(raw_check_cols) if raw_check_cols else []

    # Per-strategy guards (extra safety besides ModelConfig)
    if strategy == "timestamp" and not updated_at:
        raise ValueError(
            f"{node.path}: strategy='timestamp' snapshots require 'updated_at' column name."
        )
    if strategy == "check" and not check_cols:
        raise ValueError(
            f"{node.path}: strategy='check' snapshots require non-empty "
            "'check_cols' (string or list)."
        )

    return SnapshotConfigResolved(
        strategy=strategy, unique_key=unique_key, updated_at=updated_at, check_cols=check_cols
    )