Skip to content

fastflowtransform.cli

FingerprintCache dataclass

Lightweight, project-scoped fingerprint store.

The cache is persisted under

/.fastflowtransform/cache/-.json

Schema: { "version": 1, "engine": "", "profile": "", "entries": { "": "", ... } }

Source code in src/fastflowtransform/cache.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@dataclass
class FingerprintCache:
    """
    Lightweight, project-scoped fingerprint store.

    The cache is persisted under:
        <project>/.fastflowtransform/cache/<profile>-<engine>.json

    Schema:
    {
      "version": 1,
      "engine": "<engine>",
      "profile": "<profile>",
      "entries": { "<node_name>": "<sha256-hex>", ... }
    }
    """

    project_dir: Path
    profile: str
    engine: str
    version: int = 1
    entries: dict[str, str] = field(default_factory=dict)

    @property
    def path(self) -> Path:
        base = self.project_dir / ".fastflowtransform" / "cache"
        base.mkdir(parents=True, exist_ok=True)
        filename = f"{self.profile}-{self.engine}.json"
        return base / filename

    def load(self) -> None:
        """Load cache file if present; silently do nothing when missing or corrupt."""
        try:
            raw = json.loads(self.path.read_text(encoding="utf-8"))
            if isinstance(raw, dict) and raw.get("version") == self.version:
                self.entries = dict(raw.get("entries") or {})
        except Exception:
            # On any error, start with an empty cache
            self.entries = {}

    def save(self) -> None:
        """Persist cache atomically."""
        payload = {
            "version": self.version,
            "engine": self.engine,
            "profile": self.profile,
            "entries": self.entries,
        }
        tmp_fd, tmp_name = tempfile.mkstemp(prefix=".ff-cache-", dir=str(self.path.parent))
        try:
            with os.fdopen(tmp_fd, "w", encoding="utf-8") as fh:
                json.dump(payload, fh, ensure_ascii=False, sort_keys=True, indent=2)
            os.replace(tmp_name, self.path)
        finally:
            try:
                if os.path.exists(tmp_name):
                    os.remove(tmp_name)
            except Exception:
                pass

    def get(self, node_name: str) -> str | None:
        """Return cached fingerprint for a node or None."""
        return self.entries.get(node_name)

    def set(self, node_name: str, fingerprint: str) -> None:
        """Set cached fingerprint for a node name."""
        self.entries[node_name] = fingerprint

    def update_many(self, fps: Mapping[str, str]) -> None:
        """Bulk update cache entries."""
        for k, v in fps.items():
            self.entries[k] = v

    # ------------------------ read-only fingerprint computation ------------------------
    def _env_ctx_blob(self) -> str:
        """
        Build a stable JSON blob for environment context used in the fingerprint:
          - engine (from cache instance)
          - profile (from cache instance)
          - all FF_* environment variables (key+value)
          - normalized sources.yml (best-effort)
        """
        ff_env = {k: v for k, v in os.environ.items() if k.startswith("FF_")}
        try:
            src_norm = yaml.safe_dump(REGISTRY.sources or {}, sort_keys=True)
        except Exception:
            src_norm = ""
        payload = {
            "engine": self.engine,
            "profile": self.profile,
            "ff_env": ff_env,
            "sources": src_norm,
        }
        return json.dumps(payload, sort_keys=True)

    def compute_current(self, env: Environment, executor: Any) -> dict[str, str]:
        """
        Compute CURRENT fingerprints for all registered nodes (read-only).
        Uses the documented formula:
          - SQL: rendered SQL (via executor.render_sql to mirror real run)
          - Python: function source (inspect.getsource) with file-content fallback
          - env_ctx blob (engine/profile/FF_* vars/sources.yml)
          - dependency fingerprints chained downstream
        Does NOT write to disk.
        """
        env_ctx_blob = self._env_ctx_blob()

        # Preload sources for SQL / Python
        sql_render: dict[str, str] = {}
        py_src: dict[str, str] = {}

        for name, node in REGISTRY.nodes.items():
            if node.kind == "sql":
                try:
                    # Render with same substitutions as in run()
                    sql_render[name] = executor.render_sql(node, env)
                except Exception:
                    # Fallback: raw template content to still capture file changes
                    try:
                        raw = node.path.read_text(encoding="utf-8") if node.path else ""
                    except Exception:
                        raw = ""
                    sql_render[name] = raw
            else:
                func = REGISTRY.py_funcs.get(name)
                src = ""
                if func is not None:
                    try:
                        src = inspect.getsource(func)
                    except Exception:
                        try:
                            src = node.path.read_text(encoding="utf-8") if node.path else ""
                        except Exception:
                            src = ""
                py_src[name] = src

        def _hash(parts: list[str]) -> str:
            h = hashlib.sha256()
            for part in parts:
                h.update(part.encode("utf-8"))
                h.update(b"\x00")
            return h.hexdigest()

        current: dict[str, str] = {}
        order = topo_sort(REGISTRY.nodes)
        for name in order:
            node = REGISTRY.nodes[name]
            dep_fps = [current[d] for d in (node.deps or []) if d in current]
            if node.kind == "sql":
                blob = ["sql", name, sql_render.get(name, ""), env_ctx_blob, *dep_fps]
            else:
                blob = ["py", name, py_src.get(name, ""), env_ctx_blob, *dep_fps]
            current[name] = _hash(blob)
        return current

    def modified_set(self, env: Environment, executor: Any) -> builtins.set[str]:
        """
        Return the set of nodes whose CURRENT fingerprint differs from saved cache.
        Missing saved entries count as modified.
        """
        # Ensure we have the saved entries loaded
        if not self.entries:
            self.load()
        current = self.compute_current(env, executor)
        modified = {n for n, fp in current.items() if self.entries.get(n) != fp}
        return modified

load

load()

Load cache file if present; silently do nothing when missing or corrupt.

Source code in src/fastflowtransform/cache.py
53
54
55
56
57
58
59
60
61
def load(self) -> None:
    """Load cache file if present; silently do nothing when missing or corrupt."""
    try:
        raw = json.loads(self.path.read_text(encoding="utf-8"))
        if isinstance(raw, dict) and raw.get("version") == self.version:
            self.entries = dict(raw.get("entries") or {})
    except Exception:
        # On any error, start with an empty cache
        self.entries = {}

save

save()

Persist cache atomically.

Source code in src/fastflowtransform/cache.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def save(self) -> None:
    """Persist cache atomically."""
    payload = {
        "version": self.version,
        "engine": self.engine,
        "profile": self.profile,
        "entries": self.entries,
    }
    tmp_fd, tmp_name = tempfile.mkstemp(prefix=".ff-cache-", dir=str(self.path.parent))
    try:
        with os.fdopen(tmp_fd, "w", encoding="utf-8") as fh:
            json.dump(payload, fh, ensure_ascii=False, sort_keys=True, indent=2)
        os.replace(tmp_name, self.path)
    finally:
        try:
            if os.path.exists(tmp_name):
                os.remove(tmp_name)
        except Exception:
            pass

get

get(node_name)

Return cached fingerprint for a node or None.

Source code in src/fastflowtransform/cache.py
83
84
85
def get(self, node_name: str) -> str | None:
    """Return cached fingerprint for a node or None."""
    return self.entries.get(node_name)

set

set(node_name, fingerprint)

Set cached fingerprint for a node name.

Source code in src/fastflowtransform/cache.py
87
88
89
def set(self, node_name: str, fingerprint: str) -> None:
    """Set cached fingerprint for a node name."""
    self.entries[node_name] = fingerprint

update_many

update_many(fps)

Bulk update cache entries.

Source code in src/fastflowtransform/cache.py
91
92
93
94
def update_many(self, fps: Mapping[str, str]) -> None:
    """Bulk update cache entries."""
    for k, v in fps.items():
        self.entries[k] = v

compute_current

compute_current(env, executor)

Compute CURRENT fingerprints for all registered nodes (read-only). Uses the documented formula: - SQL: rendered SQL (via executor.render_sql to mirror real run) - Python: function source (inspect.getsource) with file-content fallback - env_ctx blob (engine/profile/FF_* vars/sources.yml) - dependency fingerprints chained downstream Does NOT write to disk.

Source code in src/fastflowtransform/cache.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def compute_current(self, env: Environment, executor: Any) -> dict[str, str]:
    """
    Compute CURRENT fingerprints for all registered nodes (read-only).
    Uses the documented formula:
      - SQL: rendered SQL (via executor.render_sql to mirror real run)
      - Python: function source (inspect.getsource) with file-content fallback
      - env_ctx blob (engine/profile/FF_* vars/sources.yml)
      - dependency fingerprints chained downstream
    Does NOT write to disk.
    """
    env_ctx_blob = self._env_ctx_blob()

    # Preload sources for SQL / Python
    sql_render: dict[str, str] = {}
    py_src: dict[str, str] = {}

    for name, node in REGISTRY.nodes.items():
        if node.kind == "sql":
            try:
                # Render with same substitutions as in run()
                sql_render[name] = executor.render_sql(node, env)
            except Exception:
                # Fallback: raw template content to still capture file changes
                try:
                    raw = node.path.read_text(encoding="utf-8") if node.path else ""
                except Exception:
                    raw = ""
                sql_render[name] = raw
        else:
            func = REGISTRY.py_funcs.get(name)
            src = ""
            if func is not None:
                try:
                    src = inspect.getsource(func)
                except Exception:
                    try:
                        src = node.path.read_text(encoding="utf-8") if node.path else ""
                    except Exception:
                        src = ""
            py_src[name] = src

    def _hash(parts: list[str]) -> str:
        h = hashlib.sha256()
        for part in parts:
            h.update(part.encode("utf-8"))
            h.update(b"\x00")
        return h.hexdigest()

    current: dict[str, str] = {}
    order = topo_sort(REGISTRY.nodes)
    for name in order:
        node = REGISTRY.nodes[name]
        dep_fps = [current[d] for d in (node.deps or []) if d in current]
        if node.kind == "sql":
            blob = ["sql", name, sql_render.get(name, ""), env_ctx_blob, *dep_fps]
        else:
            blob = ["py", name, py_src.get(name, ""), env_ctx_blob, *dep_fps]
        current[name] = _hash(blob)
    return current

modified_set

modified_set(env, executor)

Return the set of nodes whose CURRENT fingerprint differs from saved cache. Missing saved entries count as modified.

Source code in src/fastflowtransform/cache.py
178
179
180
181
182
183
184
185
186
187
188
def modified_set(self, env: Environment, executor: Any) -> builtins.set[str]:
    """
    Return the set of nodes whose CURRENT fingerprint differs from saved cache.
    Missing saved entries count as modified.
    """
    # Ensure we have the saved entries loaded
    if not self.entries:
        self.load()
    current = self.compute_current(env, executor)
    modified = {n for n, fp in current.items() if self.entries.get(n) != fp}
    return modified

seed

seed(project='.', env_name='dev', engine=None, vars=None)
High-level entry to run seeding for a project

1) Prepare the runtime context and executor. 2) Resolve per-file targets using seeds/schema.yml. 3) Materialize each seed via the engine-specific path.

Source code in src/fastflowtransform/cli/seed_cmd.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def seed(
    project: ProjectArg = ".",
    env_name: EnvOpt = "dev",
    engine: EngineOpt = None,
    vars: VarsOpt = None,
) -> None:
    """
    High-level entry to run seeding for a project:
      1) Prepare the runtime context and executor.
      2) Resolve per-file targets using seeds/schema.yml.
      3) Materialize each seed via the engine-specific path.
    """
    ctx = _prepare_context(project, env_name, engine, vars)
    execu, _, _ = ctx.make_executor()

    # You can still pass a global default schema; per-file CFG will override it.
    default_schema: str | None = None
    if getattr(ctx.profile, "engine", None) == "postgres":
        default_schema = getattr(getattr(ctx.profile, "postgres", None), "db_schema", None)

    n = seed_project(ctx.project, execu, default_schema)
    echo(f"✓ Seeded {_human_int(n)} table(s)")

dag_levels

dag_levels(nodes)

Returns a level-wise topological ordering. - Each inner list contains nodes with no prerequisites inside the remaining graph (i.e. eligible to run in parallel). - Ordering within a level is lexicographically stable. - Validation for missing deps/cycles matches topo_sort.

Source code in src/fastflowtransform/dag.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def levels(nodes: dict[str, Node]) -> list[list[str]]:
    """
    Returns a level-wise topological ordering.
    - Each inner list contains nodes with no prerequisites inside the remaining
      graph (i.e. eligible to run in parallel).
    - Ordering within a level is lexicographically stable.
    - Validation for missing deps/cycles matches topo_sort.
    """
    # Fehlende Deps einsammeln (nur Modell-Refs; sources sind keine Nodes)
    missing = {
        n.name: sorted({d for d in (n.deps or []) if d not in nodes})
        for n in nodes.values()
        if any(d not in nodes for d in (n.deps or []))
    }
    if missing:
        raise DependencyNotFoundError(missing)

    indeg = {k: 0 for k in nodes}
    out: dict[str, set[str]] = defaultdict(set)
    for n in nodes.values():
        for d in set(n.deps or []):
            out[d].add(n.name)
            indeg[n.name] += 1

    # Start-Level: alle 0-Indegree
    current = sorted([k for k, deg in indeg.items() if deg == 0])
    lvls: list[list[str]] = []
    seen_count = 0

    while current:
        lvls.append(current)
        next_zero: set[str] = set()
        for u in current:
            seen_count += 1
            for v in sorted(out.get(u, ())):
                indeg[v] -= 1
                if indeg[v] == 0:
                    next_zero.add(v)
        current = sorted(next_zero)

    if seen_count != len(nodes):
        cyclic = [k for k, deg in indeg.items() if deg > 0]
        raise ModelCycleError(f"Cycle detected among nodes: {', '.join(sorted(cyclic))}")
    return lvls

schedule

schedule(levels, jobs, fail_policy, run_node, before=None, on_error=None, logger=None, engine_abbr='', name_width=28)

Run levels sequentially; within a level run up to jobs nodes in parallel.

Source code in src/fastflowtransform/run_executor.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def schedule(
    levels: list[list[str]],
    jobs: int,
    fail_policy: FailPolicy,
    run_node: Callable[[str], None],
    before: Callable[..., None] | None = None,
    on_error: Callable[[str, BaseException], None] | None = None,
    logger: LogQueue | None = None,
    engine_abbr: str = "",
    name_width: int = 28,
) -> ScheduleResult:
    """Run levels sequentially; within a level run up to `jobs` nodes in parallel."""
    per_node: dict[str, float] = {}
    failed: dict[str, BaseException] = {}
    per_node_lock = threading.Lock()
    t_total0 = perf_counter()

    for lvl_idx, lvl in enumerate(levels, start=1):
        had_error, _, _, _ = _run_level(
            lvl_idx=lvl_idx,
            names=lvl,
            jobs=jobs,
            fail_policy=fail_policy,
            before_cb=before,
            run_node=run_node,
            per_node=per_node,
            per_node_lock=per_node_lock,
            failed=failed,
            logger=logger,
            engine_abbr=engine_abbr,
            name_width=name_width,
        )
        if had_error:
            if on_error:
                # bereits pro Node best-effort gemeldet; keine Sammelmeldung hier
                pass
            break

    total = perf_counter() - t_total0
    return ScheduleResult(per_node_s=per_node, total_s=total, failed=failed)