Skip to content

fastflowtransform.validation

validate_required_columns

validate_required_columns(node_name, inputs, requires)

inputs: entweder ein DataFrame (bei 1 Dep) oder dict[str, DataFrame] (bei >1 Deps) requires: Mapping physische_relations_name -> set[columns]

Source code in src/fastflowtransform/validation.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def validate_required_columns(node_name: str, inputs: Any, requires: dict[str, set[str]]) -> None:
    """
    inputs: entweder ein DataFrame (bei 1 Dep) oder dict[str, DataFrame] (bei >1 Deps)
    requires: Mapping physische_relations_name -> set[columns]
    """
    if not requires:
        return
    errors = []

    if isinstance(inputs, pd.DataFrame):
        # 1 Dep - requires muss genau 1 key haben
        need = next(iter(requires.values())) if requires else set()
        have = set(inputs.columns)
        miss = need - have
        if miss:
            errors.append(f"- missing columns: {sorted(miss)} | have={sorted(have)}")
    else:
        # >1 Deps
        for rel, need in requires.items():
            if rel not in inputs:
                errors.append(f"- missing dependency key '{rel}' in inputs dict")
                continue
            have = set(inputs[rel].columns)
            miss = need - have
            if miss:
                errors.append(f"- [{rel}] missing columns: {sorted(miss)} | have={sorted(have)}")

    if errors:
        raise ValueError(
            "Required columns check failed for Python model "
            f"'{node_name}'.\n"
            + "\n".join(errors)
            + "\nHint: define/adjust `require=` in @model or fix upstream models/seeds."
        )