Skip to content

fastflowtransform.utest

UnitInput

Bases: BaseModel

Single relation input: either inline rows or a CSV file.

Source code in src/fastflowtransform/utest.py
32
33
34
35
36
37
38
class UnitInput(BaseModel):
    """Single relation input: either inline rows or a CSV file."""

    model_config = ConfigDict(extra="forbid")

    rows: list[dict[str, Any]] | None = None
    csv: str | None = None

UnitExpect

Bases: BaseModel

Expected result configuration for a unit-test case.

Extra keys are forbidden so YAML specs are tightly validated.

Source code in src/fastflowtransform/utest.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class UnitExpect(BaseModel):
    """
    Expected result configuration for a unit-test case.

    Extra keys are forbidden so YAML specs are tightly validated.
    """

    model_config = ConfigDict(extra="forbid")

    relation: str | None = None
    rows: list[dict[str, Any]] = Field(default_factory=list)
    order_by: list[str] | None = None
    any_order: bool = False
    approx: dict[str, float] | None = None
    ignore_columns: list[str] | None = None
    subset: bool = False

UnitDefaults

Bases: BaseModel

Defaults that apply to all cases in a spec unless overridden.

Source code in src/fastflowtransform/utest.py
59
60
61
62
63
64
65
class UnitDefaults(BaseModel):
    """Defaults that apply to all cases in a spec unless overridden."""

    model_config = ConfigDict(extra="forbid")

    inputs: dict[str, UnitInput] = Field(default_factory=dict)
    expect: UnitExpect = Field(default_factory=UnitExpect)

UnitCase

Bases: BaseModel

A single unit-test case within a spec.

Source code in src/fastflowtransform/utest.py
68
69
70
71
72
73
74
75
class UnitCase(BaseModel):
    """A single unit-test case within a spec."""

    model_config = ConfigDict(extra="forbid")

    name: str
    inputs: dict[str, UnitInput] = Field(default_factory=dict)
    expect: UnitExpect = Field(default_factory=UnitExpect)

UnitSpec

Bases: BaseModel

Top-level unit-test specification loaded from YAML.

path and project_dir are runtime-only and are not populated from YAML (we set them in discovery).

Source code in src/fastflowtransform/utest.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class UnitSpec(BaseModel):
    """
    Top-level unit-test specification loaded from YAML.

    `path` and `project_dir` are runtime-only and are not populated from YAML
    (we set them in discovery).
    """

    model_config = ConfigDict(extra="forbid")

    model: str
    engine: str | None = None
    defaults: UnitDefaults = Field(default_factory=UnitDefaults)
    cases: list[UnitCase] = Field(default_factory=list)

    path: Path | None = Field(default=None, exclude=True)
    project_dir: Path | None = Field(default=None, exclude=True)

    # ---- defaults merging helpers -------------------------------------
    def _merge_expect(self, case_expect: UnitExpect) -> UnitExpect:
        """
        Merge spec-level default.expect with case.expect.

        Only fields explicitly set on the case override the defaults.
        """
        base = self.defaults.expect.model_dump()
        override = case_expect

        for field_name in override.model_fields_set:
            base[field_name] = getattr(override, field_name)

        return UnitExpect(**base)

    def _merge_inputs(self, case_inputs: dict[str, UnitInput]) -> dict[str, UnitInput]:
        """
        Merge spec-level default.inputs with case.inputs (case wins per relation).
        """
        merged: dict[str, UnitInput] = dict(self.defaults.inputs)
        merged.update(case_inputs or {})
        return merged

    def merged_case(self, case: UnitCase) -> UnitCase:
        """
        Return a new UnitCase where defaults have been applied (inputs + expect).
        """
        return UnitCase(
            name=case.name,
            inputs=self._merge_inputs(case.inputs),
            expect=self._merge_expect(case.expect),
        )

merged_case

merged_case(case)

Return a new UnitCase where defaults have been applied (inputs + expect).

Source code in src/fastflowtransform/utest.py
119
120
121
122
123
124
125
126
127
def merged_case(self, case: UnitCase) -> UnitCase:
    """
    Return a new UnitCase where defaults have been applied (inputs + expect).
    """
    return UnitCase(
        name=case.name,
        inputs=self._merge_inputs(case.inputs),
        expect=self._merge_expect(case.expect),
    )

run_unit_specs

run_unit_specs(specs, executor, jenv, only_case=None, *, cache_mode='off', reuse_meta=False)

Execute discovered unit-test specs. Returns the number of failed cases.

Parameters:

Name Type Description Default
cache_mode str

'off' | 'ro' | 'rw'. Default 'off' for deterministic runs.

'off'
reuse_meta bool

reserved (no-op).

False
Source code in src/fastflowtransform/utest.py
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
def run_unit_specs(
    specs: list[UnitSpec],
    executor: Any,
    jenv: Any,
    only_case: str | None = None,
    *,
    cache_mode: str = "off",
    reuse_meta: bool = False,
) -> int:
    """
    Execute discovered unit-test specs. Returns the number of failed cases.

    Args:
        cache_mode: 'off' | 'ro' | 'rw'. Default 'off' for deterministic runs.
        reuse_meta: reserved (no-op).
    """
    cache_mode = _normalize_cache_mode(cache_mode)

    project_dir = _get_project_dir_safe()
    engine_name = _detect_engine_name(executor)
    env_ctx = _make_env_ctx(engine_name)
    cache = _make_cache(project_dir, engine_name)

    ctx = UtestCtx(
        executor=executor,
        jenv=jenv,
        engine_name=engine_name,
        env_ctx=env_ctx,
        cache=cache,
        cache_mode=cache_mode,
    )

    for spec in specs:
        if spec.engine and spec.engine != engine_name:
            continue

        node = REGISTRY.nodes.get(spec.model)
        if not node:
            print(f"⚠️  Model '{spec.model}' not found (in {spec.path})")
            ctx.failures += 1
            continue

        for raw_case in spec.cases:
            # Apply spec.defaults to each case (merged view)
            case = spec.merged_case(raw_case)

            if only_case and case.name != only_case:
                continue
            print(f"→ {spec.model} :: {case.name}")

            if not reuse_meta:
                with suppress(Exception):
                    delete_meta_for_node(executor, node.name)

            cand_fp = _fingerprint_case(node, spec, case, ctx)

            before_failures = ctx.failures
            ctx.failures += _load_inputs_for_case(executor, spec, case, node)

            # If any input failed to load, skip execution & assertion for this case.
            if ctx.failures > before_failures:
                print("   ⚠️ skipping execution due to input load failure")
                continue

            if _maybe_skip_by_cache(node, cand_fp, ctx):
                _read_and_assert(spec, case, ctx)
                _cleanup_inputs_for_case(executor, case)
                continue

            target_rel_cfg = getattr(case, "expect", None)
            if isinstance(target_rel_cfg, UnitExpect):
                target_rel = target_rel_cfg.relation or relation_for(spec.model)
            elif isinstance(target_rel_cfg, Mapping):
                target_rel = target_rel_cfg.get("relation") or relation_for(spec.model)
            else:
                target_rel = relation_for(spec.model)

            _reset_utest_relation(executor, target_rel)

            if not _execute_and_update_cache(node, cand_fp, ctx):
                _cleanup_inputs_for_case(executor, case)
                continue

            _read_and_assert(spec, case, ctx)
            _cleanup_inputs_for_case(executor, case)

    if ctx.cache and ctx.computed_fps and ctx.cache_mode == "rw":  # pragma: no cover
        ctx.cache.update_many(ctx.computed_fps)
        ctx.cache.save()

    return ctx.failures