Skip to content

fastflowtransform.artifacts.files

build_manifest

build_manifest(project_dir)

Build manifest.json payload with minimal compatibility: - nodes: {name, path, deps, materialized, relation, kind} - macros: {name -> path} - sources: verbatim REGISTRY.sources - generated_at

Source code in src/fastflowtransform/artifacts/files.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def build_manifest(project_dir: Path) -> dict[str, Any]:
    """
    Build manifest.json payload with minimal compatibility:
      - nodes: {name, path, deps, materialized, relation, kind}
      - macros: {name -> path}
      - sources: verbatim REGISTRY.sources
      - generated_at
    """
    project_dir = Path(project_dir)

    nodes = {}
    for name, node in sorted(REGISTRY.nodes.items(), key=lambda x: x[0]):
        nodes[name] = {
            "name": name,
            "kind": node.kind,
            # Be resilient to stubbed Nodes in tests (path may be None)
            "path": _rel_safe(getattr(node, "path", None), project_dir),
            "deps": sorted(list(node.deps or [])),
            "materialized": (node.meta or {}).get("materialized", "table"),
            "relation": relation_for(name),
        }

    macros = {}
    for mname, mpath in sorted(REGISTRY.macros.items(), key=lambda x: x[0]):
        macros[mname] = _rel_safe(mpath, project_dir)

    data = {
        "metadata": {
            "tool": "fastflowtransform",
            "generated_at": _iso_now(),
        },
        "nodes": nodes,
        "macros": macros,
        "sources": REGISTRY.sources or {},
    }
    return data

build_run_results

build_run_results(project_dir, *, started_at, finished_at, node_results, budgets=None)

Build run_results.json payload containing run envelope and per-node results. Optionally includes a 'budgets' summary block.

Source code in src/fastflowtransform/artifacts/files.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def build_run_results(
    project_dir: Path,
    *,
    started_at: str,
    finished_at: str,
    node_results: list[RunNodeResult],
    budgets: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """
    Build run_results.json payload containing run envelope and per-node results.
    Optionally includes a 'budgets' summary block.
    """
    data = {
        "metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()},
        "run_started_at": started_at,
        "run_finished_at": finished_at,
        "results": [asdict(nr) for nr in sorted(node_results, key=lambda r: r.name)],
    }

    if budgets is not None:
        data["budgets"] = budgets

    return data

build_catalog

build_catalog(project_dir, executor)

Build catalog.json payload: - relations: map of relation -> {columns:[{name,dtype,nullable}]}

Source code in src/fastflowtransform/artifacts/files.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def build_catalog(project_dir: Path, executor: Any) -> dict[str, Any]:
    """
    Build catalog.json payload:
      - relations: map of relation -> {columns:[{name,dtype,nullable}]}
    """
    project_dir = Path(project_dir)

    relations: dict[str, Any] = {}
    rel_names = sorted([relation_for(n) for n in REGISTRY.nodes])
    for rel in rel_names:
        # Per-relation guard: introspection must never break artifact emission
        try:
            cols = _try_columns_for(executor, rel)
        except Exception:
            cols = []
        relations[rel] = {"columns": cols}

    data = {
        "metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()},
        "relations": relations,
    }
    return data

load_last_run_durations

load_last_run_durations(project_dir, *, artifacts_mode=None, artifacts_store=None, env_name=None, model_engine=None)

Best-effort reader for the last run_results.json.

Returns: { model_name: duration_in_seconds }. On any error or missing file: {}.

Source code in src/fastflowtransform/artifacts/files.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def load_last_run_durations(
    project_dir: Path,
    *,
    artifacts_mode: str | None = None,
    artifacts_store: Any | None = None,
    env_name: str | None = None,
    model_engine: str | None = None,
) -> dict[str, float]:
    """
    Best-effort reader for the last run_results.json.

    Returns: { model_name: duration_in_seconds }.
    On any error or missing file: {}.
    """
    mode = (artifacts_mode or "files").strip().lower()
    raw: dict[str, Any] | None = None

    if mode == "db" and artifacts_store is not None:
        try:
            raw = artifacts_store.get_latest_artifact("run_results", env_name, model_engine)
        except Exception:
            return {}

    if raw is None:
        path = _target_dir(project_dir)
        if not path.exists():
            return {}
        try:
            raw = json.loads(path.read_text(encoding="utf-8"))
        except Exception:
            return {}

    # tolerate a few possible shapes
    items: list[dict[str, Any]] = (
        raw.get("results") or raw.get("node_results") or raw.get("nodes") or []
    )

    out: dict[str, float] = {}
    for item in items:
        if not isinstance(item, dict):
            continue
        name = item.get("name")
        dur_ms = item.get("duration_ms")
        if isinstance(name, str) and isinstance(dur_ms, (int, float)):
            out[name] = float(dur_ms) / 1000.0
    return out

build_test_results

build_test_results(project_dir, *, started_at, finished_at, results)

Build test_results.json payload containing a run envelope + individual test outcomes.

Source code in src/fastflowtransform/artifacts/files.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def build_test_results(
    project_dir: Path,
    *,
    started_at: str,
    finished_at: str,
    results: list[TestResult],
) -> dict[str, Any]:
    """
    Build test_results.json payload containing a run envelope + individual test outcomes.
    """
    data = {
        "metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()},
        "test_started_at": started_at,
        "test_finished_at": finished_at,
        "results": [asdict(r) for r in results],
    }
    return data

build_utest_results

build_utest_results(project_dir, *, started_at, finished_at, failures, results, engine=None)

Build utest_results.json payload containing a run envelope + per-case results.

Source code in src/fastflowtransform/artifacts/files.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def build_utest_results(
    project_dir: Path,
    *,
    started_at: str,
    finished_at: str,
    failures: int,
    results: list[UTestResult],
    engine: str | None = None,
) -> dict[str, Any]:
    """
    Build utest_results.json payload containing a run envelope + per-case results.
    """
    data = {
        "metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()},
        "utest_started_at": started_at,
        "utest_finished_at": finished_at,
        "engine": engine or "",
        "failures": int(failures or 0),
        "results": [asdict(r) for r in results],
    }
    return data