Skip to content

fastflowtransform.artifacts

write_manifest

write_manifest(project_dir)

Write manifest.json with minimal compatibility: - nodes: {name, path, deps, materialized, relation, kind} - macros: {name -> path} - sources: verbatim REGISTRY.sources - generated_at

Source code in src/fastflowtransform/artifacts.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def write_manifest(project_dir: Path) -> Path:
    """
    Write manifest.json with minimal compatibility:
      - nodes: {name, path, deps, materialized, relation, kind}
      - macros: {name -> path}
      - sources: verbatim REGISTRY.sources
      - generated_at
    """
    project_dir = Path(project_dir)
    out_dir = _target_dir(project_dir)
    manifest_path = out_dir / "manifest.json"

    nodes = {}
    for name, node in sorted(REGISTRY.nodes.items(), key=lambda x: x[0]):
        nodes[name] = {
            "name": name,
            "kind": node.kind,
            # Be resilient to stubbed Nodes in tests (path may be None)
            "path": _rel_safe(getattr(node, "path", None), project_dir),
            "deps": sorted(list(node.deps or [])),
            "materialized": (node.meta or {}).get("materialized", "table"),
            "relation": relation_for(name),
        }

    macros = {}
    for mname, mpath in sorted(REGISTRY.macros.items(), key=lambda x: x[0]):
        macros[mname] = _rel_safe(mpath, project_dir)

    data = {
        "metadata": {
            "tool": "fastflowtransform",
            "generated_at": _iso_now(),
        },
        "nodes": nodes,
        "macros": macros,
        "sources": REGISTRY.sources or {},
    }
    _json_dump(manifest_path, data)
    return manifest_path

write_run_results

write_run_results(project_dir, *, started_at, finished_at, node_results)

Write run_results.json containing run envelope and per-node results.

Source code in src/fastflowtransform/artifacts.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def write_run_results(
    project_dir: Path,
    *,
    started_at: str,
    finished_at: str,
    node_results: list[RunNodeResult],
) -> Path:
    """
    Write run_results.json containing run envelope and per-node results.
    """
    project_dir = Path(project_dir)
    out_dir = _target_dir(project_dir)
    results_path = out_dir / "run_results.json"

    data = {
        "metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()},
        "run_started_at": started_at,
        "run_finished_at": finished_at,
        "results": [asdict(nr) for nr in sorted(node_results, key=lambda r: r.name)],
    }
    _json_dump(results_path, data)
    return results_path

write_catalog

write_catalog(project_dir, executor)

Write catalog.json: - relations: map of relation -> {columns:[{name,dtype,nullable}]}

Source code in src/fastflowtransform/artifacts.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def write_catalog(project_dir: Path, executor: Any) -> Path:
    """
    Write catalog.json:
      - relations: map of relation -> {columns:[{name,dtype,nullable}]}
    """
    project_dir = Path(project_dir)
    out_dir = _target_dir(project_dir)
    catalog_path = out_dir / "catalog.json"

    relations: dict[str, Any] = {}
    rel_names = sorted([relation_for(n) for n in REGISTRY.nodes])
    for rel in rel_names:
        # Per-relation guard: introspection must never break artifact emission
        try:
            cols = _try_columns_for(executor, rel)
        except Exception:
            cols = []
        relations[rel] = {"columns": cols}

    data = {
        "metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()},
        "relations": relations,
    }
    _json_dump(catalog_path, data)
    return catalog_path