Skip to content

fastflowtransform.contracts.runtime.base

ContractExecutor

Bases: Protocol

Minimal surface that runtime contracts are allowed to use on an executor.

Every engine that wants runtime contract support should conform to this.

Source code in src/fastflowtransform/contracts/runtime/base.py
17
18
19
20
21
22
23
24
25
26
27
28
class ContractExecutor(Protocol):
    """
    Minimal surface that runtime contracts are allowed to use on an executor.

    Every engine that wants runtime contract support should conform to this.
    """

    ENGINE_NAME: str

    def _execute_sql(self, sql: str, *args: Any, **kwargs: Any) -> Any: ...
    def introspect_column_physical_type(self, table: str, column: str) -> str | None: ...
    def introspect_table_physical_schema(self, table: str) -> dict[str, str]: ...

BaseRuntimeContracts

Base class for engine-specific runtime contract implementations.

Executors use this via composition: self.runtime_contracts = ....

Source code in src/fastflowtransform/contracts/runtime/base.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
class BaseRuntimeContracts[E: ContractExecutor]:
    """
    Base class for engine-specific runtime contract implementations.

    Executors use this via composition: `self.runtime_contracts = ...`.
    """

    executor: E

    def __init__(self, executor: E):
        self.executor = executor

    # ------------------------------------------------------------------ #
    #  Context builder used by the run-engine                            #
    # ------------------------------------------------------------------ #

    def build_context(
        self,
        *,
        node: Node,
        relation: str,
        physical_table: str,
        contract: ContractsFileModel | None,
        project_contracts: ProjectContractsModel | None,
        is_incremental: bool = False,
    ) -> RuntimeContractContext:
        """
        Build a RuntimeContractContext with the correct RuntimeContractConfig.

        The caller (run-engine) decides which contract applies and passes:
          - node:          the fft Node being built
          - relation:      logical name (typically node.name)
          - physical_table: fully-qualified identifier used in SQL
          - contract:      per-table ContractsFileModel, or None
          - project_contracts: parsed project-level contracts.yml, or None
        """
        # Use the contract's declared table name if present, otherwise fall
        # back to the logical relation name for project-level overrides.
        table_key = contract.table if contract is not None else relation

        cfg = resolve_runtime_contract_config(
            table_name=table_key,
            contract=contract,
            project_contracts=project_contracts,
        )

        return RuntimeContractContext(
            node=node,
            relation=relation,
            physical_table=physical_table,
            contract=contract,
            project_contracts=project_contracts,
            config=cfg,
            is_incremental=is_incremental,
        )

    # --- Hooks used by the run-engine ----------------------------

    def apply_sql_contracts(
        self,
        *,
        ctx: RuntimeContractContext,
        select_body: str,
    ) -> None:
        """
        Entry point for SQL models.

        Engines override this to implement verify/cast mode. The default
        implementation just does a plain CTAS (no enforcement).
        """
        # Default = "off" / do nothing special:
        self.executor._execute_sql(f"create or replace table {ctx.physical_table} as {select_body}")

    def verify_after_materialization(self, *, ctx: RuntimeContractContext) -> None:
        """
        Optional second step (e.g. verify mode).

        Called after the model has been materialized. Default is no-op.
        """
        return

    def coerce_frame_schema(self, df: Any, ctx: RuntimeContractContext) -> Any:
        """
        Optional hook for Python models: given a DataFrame-like object and the
        RuntimeContractContext, return a new frame whose column types have been
        coerced to match the expected physical schema (where reasonable).

        Default implementation is a no-op. Engine-specific subclasses may
        override this (e.g. DuckDB + pandas).
        """
        return df

    def materialize_python(
        self,
        *,
        ctx: RuntimeContractContext,
        df: Any,
    ) -> bool:
        """
        Optional hook for Python models.

        Engines override this to take over materialization for Python
        models (e.g. to enforce contracts via explicit CASTs).

        Return True if you fully materialized ctx.physical_table yourself.
        Return False to let the executor use its normal path
        (_materialize_relation / _materialize_incremental).
        """
        return False

build_context

build_context(*, node, relation, physical_table, contract, project_contracts, is_incremental=False)

Build a RuntimeContractContext with the correct RuntimeContractConfig.

The caller (run-engine) decides which contract applies and passes: - node: the fft Node being built - relation: logical name (typically node.name) - physical_table: fully-qualified identifier used in SQL - contract: per-table ContractsFileModel, or None - project_contracts: parsed project-level contracts.yml, or None

Source code in src/fastflowtransform/contracts/runtime/base.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def build_context(
    self,
    *,
    node: Node,
    relation: str,
    physical_table: str,
    contract: ContractsFileModel | None,
    project_contracts: ProjectContractsModel | None,
    is_incremental: bool = False,
) -> RuntimeContractContext:
    """
    Build a RuntimeContractContext with the correct RuntimeContractConfig.

    The caller (run-engine) decides which contract applies and passes:
      - node:          the fft Node being built
      - relation:      logical name (typically node.name)
      - physical_table: fully-qualified identifier used in SQL
      - contract:      per-table ContractsFileModel, or None
      - project_contracts: parsed project-level contracts.yml, or None
    """
    # Use the contract's declared table name if present, otherwise fall
    # back to the logical relation name for project-level overrides.
    table_key = contract.table if contract is not None else relation

    cfg = resolve_runtime_contract_config(
        table_name=table_key,
        contract=contract,
        project_contracts=project_contracts,
    )

    return RuntimeContractContext(
        node=node,
        relation=relation,
        physical_table=physical_table,
        contract=contract,
        project_contracts=project_contracts,
        config=cfg,
        is_incremental=is_incremental,
    )

apply_sql_contracts

apply_sql_contracts(*, ctx, select_body)

Entry point for SQL models.

Engines override this to implement verify/cast mode. The default implementation just does a plain CTAS (no enforcement).

Source code in src/fastflowtransform/contracts/runtime/base.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def apply_sql_contracts(
    self,
    *,
    ctx: RuntimeContractContext,
    select_body: str,
) -> None:
    """
    Entry point for SQL models.

    Engines override this to implement verify/cast mode. The default
    implementation just does a plain CTAS (no enforcement).
    """
    # Default = "off" / do nothing special:
    self.executor._execute_sql(f"create or replace table {ctx.physical_table} as {select_body}")

verify_after_materialization

verify_after_materialization(*, ctx)

Optional second step (e.g. verify mode).

Called after the model has been materialized. Default is no-op.

Source code in src/fastflowtransform/contracts/runtime/base.py
236
237
238
239
240
241
242
def verify_after_materialization(self, *, ctx: RuntimeContractContext) -> None:
    """
    Optional second step (e.g. verify mode).

    Called after the model has been materialized. Default is no-op.
    """
    return

coerce_frame_schema

coerce_frame_schema(df, ctx)

Optional hook for Python models: given a DataFrame-like object and the RuntimeContractContext, return a new frame whose column types have been coerced to match the expected physical schema (where reasonable).

Default implementation is a no-op. Engine-specific subclasses may override this (e.g. DuckDB + pandas).

Source code in src/fastflowtransform/contracts/runtime/base.py
244
245
246
247
248
249
250
251
252
253
def coerce_frame_schema(self, df: Any, ctx: RuntimeContractContext) -> Any:
    """
    Optional hook for Python models: given a DataFrame-like object and the
    RuntimeContractContext, return a new frame whose column types have been
    coerced to match the expected physical schema (where reasonable).

    Default implementation is a no-op. Engine-specific subclasses may
    override this (e.g. DuckDB + pandas).
    """
    return df

materialize_python

materialize_python(*, ctx, df)

Optional hook for Python models.

Engines override this to take over materialization for Python models (e.g. to enforce contracts via explicit CASTs).

Return True if you fully materialized ctx.physical_table yourself. Return False to let the executor use its normal path (_materialize_relation / _materialize_incremental).

Source code in src/fastflowtransform/contracts/runtime/base.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def materialize_python(
    self,
    *,
    ctx: RuntimeContractContext,
    df: Any,
) -> bool:
    """
    Optional hook for Python models.

    Engines override this to take over materialization for Python
    models (e.g. to enforce contracts via explicit CASTs).

    Return True if you fully materialized ctx.physical_table yourself.
    Return False to let the executor use its normal path
    (_materialize_relation / _materialize_incremental).
    """
    return False

expected_physical_schema

expected_physical_schema(*, executor, contract)

Build {column_name: expected_physical_type} for the given executor, using the per-table ContractsFileModel.

Source code in src/fastflowtransform/contracts/runtime/base.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def expected_physical_schema(
    *,
    executor: ContractExecutor,
    contract: ContractsFileModel | None,
) -> dict[str, str]:
    """
    Build {column_name: expected_physical_type} for the given executor,
    using the per-table ContractsFileModel.
    """
    if contract is None:
        return {}

    engine = getattr(executor, "ENGINE_NAME", "") or ""
    result: dict[str, str] = {}

    for col_name, col_model in (contract.columns or {}).items():
        phys = col_model.physical
        typ = _resolve_physical_type_for_engine(phys, engine)
        if typ:
            canon = _canonicalize_physical_type(engine, typ)
            if canon:
                result[col_name] = canon

    return result