inputs: entweder ein DataFrame (bei 1 Dep) oder dict[str, DataFrame] (bei >1 Deps)
requires: Mapping physische_relations_name -> set[columns]
Source code in src/fastflowtransform/validation.py
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 | def validate_required_columns(node_name: str, inputs: Any, requires: dict[str, set[str]]) -> None:
"""
inputs: entweder ein DataFrame (bei 1 Dep) oder dict[str, DataFrame] (bei >1 Deps)
requires: Mapping physische_relations_name -> set[columns]
"""
if not requires:
return
errors = []
if isinstance(inputs, pd.DataFrame):
# 1 Dep - requires muss genau 1 key haben
need = next(iter(requires.values())) if requires else set()
have = set(inputs.columns)
miss = need - have
if miss:
errors.append(f"- missing columns: {sorted(miss)} | have={sorted(have)}")
else:
# >1 Deps
for rel, need in requires.items():
if rel not in inputs:
errors.append(f"- missing dependency key '{rel}' in inputs dict")
continue
have = set(inputs[rel].columns)
miss = need - have
if miss:
errors.append(f"- [{rel}] missing columns: {sorted(miss)} | have={sorted(have)}")
if errors:
raise ValueError(
"Required columns check failed for Python model "
f"'{node_name}'.\n"
+ "\n".join(errors)
+ "\nHint: define/adjust `require=` in @model or fix upstream models/seeds."
)
|