Skip to content

fastflowtransform.schema_loader

TestSpec dataclass

Normalisierte Test-Spezifikation für die CLI. Beispiel: not_null(users_enriched.email), unique(users_enriched.id), accepted_values(users_enriched.email, values=[...]) …

Source code in src/fastflowtransform/schema_loader.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@dataclass(frozen=True)
class TestSpec:
    """
    Normalisierte Test-Spezifikation für die CLI.
    Beispiel: not_null(users_enriched.email), unique(users_enriched.id),
              accepted_values(users_enriched.email, values=[...]) …
    """

    type: str  # "not_null" | "unique" | "accepted_values" | …
    table: str
    column: str | None = None
    params: dict[str, Any] = field(default_factory=dict)
    severity: Severity = "error"
    tags: list[str] = field(default_factory=list)

    def __post_init__(self):
        if self.params is None:
            object.__setattr__(self, "params", {})
        if self.tags is None:
            object.__setattr__(self, "tags", [])

load_schema_tests

load_schema_tests(project_dir)

Lädt dbt-ähnliche Schema-YAMLs (version: 2) unter models/**.yml (& schema.yml), und gibt normalisierte TestSpec-Objekte zurück.

Source code in src/fastflowtransform/schema_loader.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def load_schema_tests(project_dir: Path) -> list[TestSpec]:
    """
    Lädt dbt-ähnliche Schema-YAMLs (version: 2) unter models/**.yml (& schema.yml),
    und gibt normalisierte TestSpec-Objekte zurück.
    """
    project_dir = Path(project_dir)
    models_dir = project_dir / "models"
    if not models_dir.exists():
        return []

    files: list[Path] = []
    for p in models_dir.rglob("*.yml"):
        files.append(p)
    # Deduplicate
    files = sorted(set(files))

    specs: list[TestSpec] = []
    version = 2
    for yml in files:
        try:
            data = yaml.safe_load(yml.read_text(encoding="utf-8")) or {}
        except Exception:
            continue
        if (data or {}).get("version") != version:
            continue
        for model in data.get("models") or []:
            name = str(model.get("name") or "").strip()
            if not name:
                continue
            table = name[:-3] if name.endswith(".ff") else name
            # Column-Tests
            for col in model.get("columns") or []:
                col_name = str(col.get("name") or "").strip()
                if not col_name:
                    continue
                for test_entry in col.get("tests") or []:
                    _expand_test_entry(
                        specs, table, col_name, test_entry, default_tags=model.get("tags") or []
                    )
            # Model-level tests (optional
            for test_entry in model.get("tests") or []:
                _expand_test_entry(
                    specs, table, None, test_entry, default_tags=model.get("tags") or []
                )
    return specs