Skip to content

fastflowtransform.decorators

model

model(name=None, deps=None, require=None, *, tags=None, kind='python', materialized=None, meta=None)

Decorator to register a Python model.

Parameters:

Name Type Description Default
name str | None

Logical node name in the DAG (defaults to function name).

None
deps Sequence[str] | None

Upstream node names (e.g., ['users.ff']).

None
require Any | None
  • Single dependency: Iterable[str] of required columns from that dependency.
  • Multiple dependencies: Mapping[dep_name, Iterable[str]] (dep_name = logical name or physical relation).
None
tags Sequence[str] | None

Optional tags for selection (e.g. ['demo','env']).

None
kind str

Logical kind; defaults to 'python' (useful for selectors kind:python).

'python'
materialized str | None

Shorthand for meta['materialized']; mirrors config(materialized='...').

None
meta Mapping[str, Any] | None

Arbitrary metadata for executors/docs (merged with materialized if provided).

None
Source code in src/fastflowtransform/decorators.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def model(
    name: str | None = None,
    deps: Sequence[str] | None = None,
    require: Any | None = None,
    *,
    tags: Sequence[str] | None = None,
    kind: str = "python",
    materialized: str | None = None,
    meta: Mapping[str, Any] | None = None,
) -> Callable[[Callable[P, R_co]], HasFFMeta[P, R_co]]:
    """
    Decorator to register a Python model.

    Args:
        name: Logical node name in the DAG (defaults to function name).
        deps: Upstream node names (e.g., ['users.ff']).
        require:
            - Single dependency: Iterable[str] of required columns from that dependency.
            - Multiple dependencies: Mapping[dep_name, Iterable[str]]
              (dep_name = logical name or physical relation).
        tags: Optional tags for selection (e.g. ['demo','env']).
        kind: Logical kind; defaults to 'python' (useful for selectors kind:python).
        materialized: Shorthand for meta['materialized']; mirrors config(materialized='...').
        meta: Arbitrary metadata for executors/docs (merged with materialized if provided).
    """

    def deco(func: Callable[P, R_co]) -> HasFFMeta[P, R_co]:
        f_any = cast(Any, func)

        fname = name or f_any.__name__
        fdeps = list(deps) if deps is not None else []

        # Attach metadata to the function (keeps backward compatibility)
        f_any.__ff_name__ = fname
        f_any.__ff_deps__ = fdeps

        # Normalize require and mirror it on the function and inside the registry
        req_norm = _normalize_require(fdeps, require)
        f_any.__ff_require__ = req_norm  # useful for tooling/loaders
        REGISTRY.py_requires[fname] = req_norm  # executors read this directly

        f_any.__ff_tags__ = list(tags) if tags else []
        f_any.__ff_kind__ = kind or "python"

        metadata = dict(meta) if meta else {}
        if materialized is not None:
            metadata["materialized"] = materialized
        f_any.__ff_meta__ = metadata

        # Determine the source path (better error message if it fails)
        src: str | None = inspect.getsourcefile(func)
        if src is None:
            try:
                src = inspect.getfile(func)
            except Exception as e:
                raise ModuleLoadError(
                    f"Cannot determine source path for model '{fname}': {e}"
                ) from e

        f_any.__ff_path__ = Path(src).resolve()

        # Register the function
        REGISTRY.py_funcs[fname] = func
        return cast(HasFFMeta[P, R_co], func)

    return deco