Skip to content

fastflowtransform.utest

UnitInput

Bases: BaseModel

Single relation input: either inline rows or a CSV file.

Source code in src/fastflowtransform/utest.py
33
34
35
36
37
38
39
class UnitInput(BaseModel):
    """Single relation input: either inline rows or a CSV file."""

    model_config = ConfigDict(extra="forbid")

    rows: list[dict[str, Any]] | None = None
    csv: str | None = None

UnitExpect

Bases: BaseModel

Expected result configuration for a unit-test case.

Extra keys are forbidden so YAML specs are tightly validated.

Source code in src/fastflowtransform/utest.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class UnitExpect(BaseModel):
    """
    Expected result configuration for a unit-test case.

    Extra keys are forbidden so YAML specs are tightly validated.
    """

    model_config = ConfigDict(extra="forbid")

    relation: str | None = None
    rows: list[dict[str, Any]] = Field(default_factory=list)
    order_by: list[str] | None = None
    any_order: bool = False
    approx: dict[str, float] | None = None
    ignore_columns: list[str] | None = None
    subset: bool = False

UnitDefaults

Bases: BaseModel

Defaults that apply to all cases in a spec unless overridden.

Source code in src/fastflowtransform/utest.py
60
61
62
63
64
65
66
class UnitDefaults(BaseModel):
    """Defaults that apply to all cases in a spec unless overridden."""

    model_config = ConfigDict(extra="forbid")

    inputs: dict[str, UnitInput] = Field(default_factory=dict)
    expect: UnitExpect = Field(default_factory=UnitExpect)

UnitCase

Bases: BaseModel

A single unit-test case within a spec.

Source code in src/fastflowtransform/utest.py
69
70
71
72
73
74
75
76
class UnitCase(BaseModel):
    """A single unit-test case within a spec."""

    model_config = ConfigDict(extra="forbid")

    name: str
    inputs: dict[str, UnitInput] = Field(default_factory=dict)
    expect: UnitExpect = Field(default_factory=UnitExpect)

UnitSpec

Bases: BaseModel

Top-level unit-test specification loaded from YAML.

path and project_dir are runtime-only and are not populated from YAML (we set them in discovery).

Source code in src/fastflowtransform/utest.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class UnitSpec(BaseModel):
    """
    Top-level unit-test specification loaded from YAML.

    `path` and `project_dir` are runtime-only and are not populated from YAML
    (we set them in discovery).
    """

    model_config = ConfigDict(extra="forbid")

    model: str
    engine: str | None = None
    defaults: UnitDefaults = Field(default_factory=UnitDefaults)
    cases: list[UnitCase] = Field(default_factory=list)

    path: Path | None = Field(default=None, exclude=True)
    project_dir: Path | None = Field(default=None, exclude=True)

    # ---- defaults merging helpers -------------------------------------
    def _merge_expect(self, case_expect: UnitExpect) -> UnitExpect:
        """
        Merge spec-level default.expect with case.expect.

        Only fields explicitly set on the case override the defaults.
        """
        base = self.defaults.expect.model_dump()
        override = case_expect

        for field_name in override.model_fields_set:
            base[field_name] = getattr(override, field_name)

        return UnitExpect(**base)

    def _merge_inputs(self, case_inputs: dict[str, UnitInput]) -> dict[str, UnitInput]:
        """
        Merge spec-level default.inputs with case.inputs (case wins per relation).
        """
        merged: dict[str, UnitInput] = dict(self.defaults.inputs)
        merged.update(case_inputs or {})
        return merged

    def merged_case(self, case: UnitCase) -> UnitCase:
        """
        Return a new UnitCase where defaults have been applied (inputs + expect).
        """
        return UnitCase(
            name=case.name,
            inputs=self._merge_inputs(case.inputs),
            expect=self._merge_expect(case.expect),
        )

merged_case

merged_case(case)

Return a new UnitCase where defaults have been applied (inputs + expect).

Source code in src/fastflowtransform/utest.py
120
121
122
123
124
125
126
127
128
def merged_case(self, case: UnitCase) -> UnitCase:
    """
    Return a new UnitCase where defaults have been applied (inputs + expect).
    """
    return UnitCase(
        name=case.name,
        inputs=self._merge_inputs(case.inputs),
        expect=self._merge_expect(case.expect),
    )

run_unit_specs

run_unit_specs(specs, executor, jenv, only_case=None, *, cache_mode='off', reuse_meta=False, results_out=None)

Execute discovered unit-test specs. Returns the number of failed cases.

Parameters:

Name Type Description Default
cache_mode str

'off' | 'ro' | 'rw'. Default 'off' for deterministic runs.

'off'
reuse_meta bool

reserved (no-op).

False
Source code in src/fastflowtransform/utest.py
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
def run_unit_specs(
    specs: list[UnitSpec],
    executor: Any,
    jenv: Any,
    only_case: str | None = None,
    *,
    cache_mode: str = "off",
    reuse_meta: bool = False,
    results_out: list[dict[str, Any]] | None = None,
) -> int:
    """
    Execute discovered unit-test specs. Returns the number of failed cases.

    Args:
        cache_mode: 'off' | 'ro' | 'rw'. Default 'off' for deterministic runs.
        reuse_meta: reserved (no-op).
    """
    cache_mode = _normalize_cache_mode(cache_mode)
    ctx = _build_utest_ctx(executor, jenv, cache_mode)

    for spec in specs:
        if spec.engine and spec.engine != ctx.engine_name:
            continue

        node = REGISTRY.nodes.get(spec.model)
        if not node:
            _record_model_not_found(ctx, spec=spec, results_out=results_out)
            continue

        for case in _iter_cases(spec, only_case):
            _run_one_case(
                ctx,
                spec=spec,
                case=case,
                node=node,
                reuse_meta=reuse_meta,
                results_out=results_out,
            )

    _finalize_cache(ctx)
    return ctx.failures