Skip to content

fastflowtransform.config.project

IncrementalModelConfig

Bases: BaseModel

Per-model incremental overlay from project.yml, for example:

models:
  incremental:
    fct_events_sql_inline:
      incremental: true
      unique_key: ["event_id"]
      updated_at: "updated_at"
      delta_sql: |
        select ...
      schema_sync: append_new_columns

    fct_events_py_incremental:
      incremental:
        enabled: true
        strategy: merge
        unique_key: ["event_id"]
        updated_at_column: "updated_at"

This is intentionally compatible with the fields on ModelConfig.

Source code in src/fastflowtransform/config/project.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class IncrementalModelConfig(BaseModel):
    """
    Per-model incremental overlay from project.yml, for example:

        models:
          incremental:
            fct_events_sql_inline:
              incremental: true
              unique_key: ["event_id"]
              updated_at: "updated_at"
              delta_sql: |
                select ...
              schema_sync: append_new_columns

            fct_events_py_incremental:
              incremental:
                enabled: true
                strategy: merge
                unique_key: ["event_id"]
                updated_at_column: "updated_at"

    This is intentionally compatible with the fields on ModelConfig.
    """

    model_config = ConfigDict(extra="forbid")

    # Master switch / structured config
    incremental: bool | IncrementalConfig | None = None

    # Shortcuts (later merged into ModelConfig)
    unique_key: list[str] | None = None
    primary_key: list[str] | None = None

    updated_at: str | None = None
    updated_at_column: str | None = None
    updated_at_columns: list[str] | None = None
    timestamp_columns: list[str] | None = None

    delta_sql: str | None = None
    delta_config: str | None = None
    delta_python: str | None = None

    schema_sync: Literal["none", "append_new_columns", "sync_all_columns"] | None = None

    @field_validator(
        "unique_key",
        "primary_key",
        "updated_at_columns",
        "timestamp_columns",
        mode="before",
    )
    @classmethod
    def _normalize_key_lists(cls, v: Any) -> list[str] | None:
        if v is None:
            return None
        if isinstance(v, str):
            return [v]
        if isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
            return [str(x) for x in v]
        raise TypeError("must be a string or a sequence of strings")

ModelsBlock

Bases: BaseModel

project.yml:

models:
  storage:
    users:
      path: ".local/spark/users"
      format: parquet
    ...

  incremental:
    my_model:
      incremental: true
      unique_key: ["id"]
      updated_at: "updated_at"
Source code in src/fastflowtransform/config/project.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class ModelsBlock(BaseModel):
    """
    project.yml:

        models:
          storage:
            users:
              path: ".local/spark/users"
              format: parquet
            ...

          incremental:
            my_model:
              incremental: true
              unique_key: ["id"]
              updated_at: "updated_at"
    """

    model_config = ConfigDict(extra="forbid")

    storage: dict[str, StorageConfig] = Field(default_factory=dict)
    incremental: dict[str, IncrementalModelConfig] = Field(default_factory=dict)

SeedsBlock

Bases: BaseModel

project.yml:

seeds:
  storage:
    seed_users:
      path: ".local/spark/seed_users"
      format: parquet
Source code in src/fastflowtransform/config/project.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class SeedsBlock(BaseModel):
    """
    project.yml:

        seeds:
          storage:
            seed_users:
              path: ".local/spark/seed_users"
              format: parquet
    """

    model_config = ConfigDict(extra="forbid")

    storage: dict[str, StorageConfig] = Field(default_factory=dict)

DocsConfig

Bases: BaseModel

Optional documentation-related configuration.

Example:

docs

dag_dir: "site/dag"

Source code in src/fastflowtransform/config/project.py
135
136
137
138
139
140
141
142
143
144
145
146
147
class DocsConfig(BaseModel):
    """
    Optional documentation-related configuration.

    Example:

    docs:
      dag_dir: "site/dag"
    """

    model_config = ConfigDict(extra="forbid")

    dag_dir: str | None = None

HookSpec

Bases: BaseModel

One hook entry from project.yml -> hooks.* lists. Example: - name: audit_run_start kind: sql sql: "insert into ..."

  • name: python_banner kind: python callable: "hooks_demo.hooks.notify:on_run_start" select: "tag:example:hooks_demo"
Source code in src/fastflowtransform/config/project.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class HookSpec(BaseModel):
    """
    One hook entry from project.yml -> hooks.* lists.
    Example:
      - name: audit_run_start
        kind: sql
        sql: "insert into ..."

      - name: python_banner
        kind: python
        callable: "hooks_demo.hooks.notify:on_run_start"
        select: "tag:example:hooks_demo"
    """

    model_config = ConfigDict(extra="forbid")

    name: str | None = None
    kind: Literal["sql", "python"]

    # SQL hook body (for kind == "sql")
    sql: str | None = None

    # Python callable path (for kind == "python"), "pkg.mod:func" or "pkg.mod.func"
    callable: str | None = None

    # Optional selection filter (for before_model / after_model)
    select: str | None = None

    # Optional free-form params if you want them later
    params: Mapping[str, Any] | None = None

    engines: list[str] | None = None  # e.g. ["duckdb", "databricks_spark"]
    envs: list[str] | None = None  # e.g. ["dev_duckdb", "prod_duckdb"]

HooksConfig

Bases: BaseModel

Top-level hooks section in project.yml.

Source code in src/fastflowtransform/config/project.py
190
191
192
193
194
195
196
197
198
199
200
201
202
class HooksConfig(BaseModel):
    """
    Top-level hooks section in project.yml.
    """

    model_config = ConfigDict(extra="forbid")

    on_run_start: list[HookSpec] = Field(default_factory=list)
    on_run_end: list[HookSpec] = Field(default_factory=list)

    # Per-model hooks are optional but allowed
    before_model: list[HookSpec] = Field(default_factory=list)
    after_model: list[HookSpec] = Field(default_factory=list)

BaseProjectTestConfig

Bases: BaseModel

Common fields for all project-level tests declared in project.yml under tests:.

NOTE
  • For table/column-level tests (not_null, unique, ...), table and/or column are required in the concrete subclasses.
  • For reconciliation tests, table and column are optional and used only for display/grouping in summaries.
Source code in src/fastflowtransform/config/project.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class BaseProjectTestConfig(BaseModel):
    """
    Common fields for all project-level tests declared in project.yml under `tests:`.

    NOTE:
      - For table/column-level tests (not_null, unique, ...), `table` and/or `column`
        are required in the concrete subclasses.
      - For reconciliation tests, `table` and `column` are optional and used only
        for display/grouping in summaries.
    """

    model_config = ConfigDict(extra="forbid")

    type: str  # discriminated in concrete subclasses

    severity: Literal["error", "warn"] = "error"
    tags: list[str] = Field(default_factory=list)

    # Optional human-readable label, especially for reconciliations
    name: str | None = None

NotNullTestConfig

Bases: BaseProjectTestConfig

not_null test: assert that a column contains no NULL values.

Source code in src/fastflowtransform/config/project.py
232
233
234
235
236
237
238
239
240
241
242
243
244
class NotNullTestConfig(BaseProjectTestConfig):
    """
    not_null test: assert that a column contains no NULL values.
    """

    type: Literal["not_null"]

    # required for this test
    table: str
    column: str

    # optional WHERE predicate
    where: str | None = None

UniqueTestConfig

Bases: BaseProjectTestConfig

unique test: detect duplicate values within a column.

Source code in src/fastflowtransform/config/project.py
247
248
249
250
251
252
253
254
255
256
257
class UniqueTestConfig(BaseProjectTestConfig):
    """
    unique test: detect duplicate values within a column.
    """

    type: Literal["unique"]

    table: str
    column: str

    where: str | None = None

AcceptedValuesTestConfig

Bases: BaseProjectTestConfig

accepted_values test: ensure all non-NULL values are inside an allowed set.

Behaviour
  • If values is None or an empty list, the test is treated as a no-op (always passes), but still appears in summaries.
Source code in src/fastflowtransform/config/project.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class AcceptedValuesTestConfig(BaseProjectTestConfig):
    """
    accepted_values test: ensure all non-NULL values are inside an allowed set.

    Behaviour:
      - If `values` is None or an empty list, the test is treated as a no-op
        (always passes), but still appears in summaries.
    """

    type: Literal["accepted_values"]

    table: str
    column: str

    # allowed literals (strings, numbers, ...)
    values: list[Any] | None = None
    where: str | None = None

GreaterEqualTestConfig

Bases: BaseProjectTestConfig

greater_equal test: require all values to be >= threshold.

Source code in src/fastflowtransform/config/project.py
279
280
281
282
283
284
285
286
287
288
289
class GreaterEqualTestConfig(BaseProjectTestConfig):
    """
    greater_equal test: require all values to be >= threshold.
    """

    type: Literal["greater_equal"]

    table: str
    column: str

    threshold: float = 0.0

NonNegativeSumTestConfig

Bases: BaseProjectTestConfig

non_negative_sum test: validate that SUM(column) is not negative.

Source code in src/fastflowtransform/config/project.py
292
293
294
295
296
297
298
299
300
class NonNegativeSumTestConfig(BaseProjectTestConfig):
    """
    non_negative_sum test: validate that SUM(column) is not negative.
    """

    type: Literal["non_negative_sum"]

    table: str
    column: str

RowCountBetweenTestConfig

Bases: BaseProjectTestConfig

row_count_between test: ensure row count is between [min_rows, max_rows].

  • min_rows defaults to 1.
  • max_rows is optional (open-ended upper bound).
Source code in src/fastflowtransform/config/project.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
class RowCountBetweenTestConfig(BaseProjectTestConfig):
    """
    row_count_between test: ensure row count is between [min_rows, max_rows].

    - `min_rows` defaults to 1.
    - `max_rows` is optional (open-ended upper bound).
    """

    type: Literal["row_count_between"]

    table: str

    min_rows: int = 1
    max_rows: int | None = None

    @model_validator(mode="after")
    def validate_bounds(self) -> RowCountBetweenTestConfig:
        """
        Ensure that min_rows is less than or equal to max_rows when both are set.
        """
        if self.max_rows is not None and self.min_rows > self.max_rows:
            raise ValueError(
                f"row_count_between: min_rows ({self.min_rows}) "
                f"must be less than or equal to max_rows ({self.max_rows})."
            )
        return self

validate_bounds

validate_bounds()

Ensure that min_rows is less than or equal to max_rows when both are set.

Source code in src/fastflowtransform/config/project.py
318
319
320
321
322
323
324
325
326
327
328
@model_validator(mode="after")
def validate_bounds(self) -> RowCountBetweenTestConfig:
    """
    Ensure that min_rows is less than or equal to max_rows when both are set.
    """
    if self.max_rows is not None and self.min_rows > self.max_rows:
        raise ValueError(
            f"row_count_between: min_rows ({self.min_rows}) "
            f"must be less than or equal to max_rows ({self.max_rows})."
        )
    return self

FreshnessTestConfig

Bases: BaseProjectTestConfig

freshness test: warn or fail when latest timestamp is older than max_delay_minutes.

Source code in src/fastflowtransform/config/project.py
331
332
333
334
335
336
337
338
339
340
341
342
class FreshnessTestConfig(BaseProjectTestConfig):
    """
    freshness test: warn or fail when latest timestamp is older
    than `max_delay_minutes`.
    """

    type: Literal["freshness"]

    table: str
    column: str  # timestamp column

    max_delay_minutes: int

ReconcileExprSide

Bases: BaseModel

Expression-based reconciliation side (left/right):

left/right: table: str expr: str where: optional filter condition

Source code in src/fastflowtransform/config/project.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
class ReconcileExprSide(BaseModel):
    """
    Expression-based reconciliation side (left/right):

      left/right:
        table: str
        expr:  str
        where: optional filter condition
    """

    model_config = ConfigDict(extra="forbid")

    table: str
    expr: str
    where: str | None = None

ReconcileKeySide

Bases: BaseModel

Key-based reconciliation side for coverage checks:

source/target: table: str key: str

Source code in src/fastflowtransform/config/project.py
362
363
364
365
366
367
368
369
370
371
372
373
374
class ReconcileKeySide(BaseModel):
    """
    Key-based reconciliation side for coverage checks:

      source/target:
        table: str
        key:   str
    """

    model_config = ConfigDict(extra="forbid")

    table: str
    key: str

ReconcileEqualTestConfig

Bases: BaseProjectTestConfig

reconcile_equal test: compare two scalar expressions with optional tolerances.

Attributes:

Name Type Description
left ReconcileExprSide

Left-hand expression (table, expr, optional where).

right ReconcileExprSide

Right-hand expression.

abs_tolerance float | None

Maximum absolute difference allowed.

rel_tolerance_pct float | None

Maximum relative difference (percent).

Notes

The top-level table/column fields are optional and only used for display in summaries.

Example (YAML):

- type: reconcile_equal
  left:  { table: a_tbl, expr: "sum(x)" }
  right: { table: b_tbl, expr: "sum(y)", where: "dt >= current_date - interval '7 days'" }
  abs_tolerance: 0.01
  rel_tolerance_pct: 1.0
Source code in src/fastflowtransform/config/project.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
class ReconcileEqualTestConfig(BaseProjectTestConfig):
    """
    `reconcile_equal` test: compare two scalar expressions with optional tolerances.

    Attributes:
        left (ReconcileExprSide): Left-hand expression (`table`, `expr`, optional `where`).
        right (ReconcileExprSide): Right-hand expression.
        abs_tolerance (float | None): Maximum absolute difference allowed.
        rel_tolerance_pct (float | None): Maximum relative difference (percent).

    Notes:
        The top-level `table`/`column` fields are optional and only used for display in summaries.

    Example (YAML):

    ```yaml
    - type: reconcile_equal
      left:  { table: a_tbl, expr: "sum(x)" }
      right: { table: b_tbl, expr: "sum(y)", where: "dt >= current_date - interval '7 days'" }
      abs_tolerance: 0.01
      rel_tolerance_pct: 1.0
    ```
    """

    type: Literal["reconcile_equal"]

    left: ReconcileExprSide
    right: ReconcileExprSide

    abs_tolerance: float | None = None
    rel_tolerance_pct: float | None = None

ReconcileRatioWithinTestConfig

Bases: BaseProjectTestConfig

reconcile_ratio_within test: constrain the ratio left/right within [min_ratio, max_ratio].

Attributes:

Name Type Description
left ReconcileExprSide

Left-hand expression.

right ReconcileExprSide

Right-hand expression.

min_ratio float

Minimum allowed ratio.

max_ratio float

Maximum allowed ratio.

Example (YAML):

- type: reconcile_ratio_within
  left:  { table: orders, expr: "sum(amount)" }
  right: { table: payments, expr: "sum(value)" }
  min_ratio: 0.98
  max_ratio: 1.02
Source code in src/fastflowtransform/config/project.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
class ReconcileRatioWithinTestConfig(BaseProjectTestConfig):
    """
    `reconcile_ratio_within` test: constrain the ratio `left/right` within `[min_ratio, max_ratio]`.

    Attributes:
        left (ReconcileExprSide): Left-hand expression.
        right (ReconcileExprSide): Right-hand expression.
        min_ratio (float): Minimum allowed ratio.
        max_ratio (float): Maximum allowed ratio.

    Example (YAML):

    ```yaml
    - type: reconcile_ratio_within
      left:  { table: orders, expr: "sum(amount)" }
      right: { table: payments, expr: "sum(value)" }
      min_ratio: 0.98
      max_ratio: 1.02
    ```
    """

    type: Literal["reconcile_ratio_within"]

    left: ReconcileExprSide
    right: ReconcileExprSide

    min_ratio: float
    max_ratio: float

ReconcileDiffWithinTestConfig

Bases: BaseProjectTestConfig

reconcile_diff_within test: limit the absolute difference between two aggregates.

Attributes:

Name Type Description
left ReconcileExprSide

Left-hand expression.

right ReconcileExprSide

Right-hand expression.

max_abs_diff float

Maximum allowed absolute difference.

Example (YAML):

- type: reconcile_diff_within
  left:  { table: a, expr: "count(*)" }
  right: { table: b, expr: "count(*)" }
  max_abs_diff: 10
Source code in src/fastflowtransform/config/project.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
class ReconcileDiffWithinTestConfig(BaseProjectTestConfig):
    """
    `reconcile_diff_within` test: limit the absolute difference between two aggregates.

    Attributes:
        left (ReconcileExprSide): Left-hand expression.
        right (ReconcileExprSide): Right-hand expression.
        max_abs_diff (float): Maximum allowed absolute difference.

    Example (YAML):

    ```yaml
    - type: reconcile_diff_within
      left:  { table: a, expr: "count(*)" }
      right: { table: b, expr: "count(*)" }
      max_abs_diff: 10
    ```
    """

    type: Literal["reconcile_diff_within"]

    left: ReconcileExprSide
    right: ReconcileExprSide

    max_abs_diff: float

ReconcileCoverageTestConfig

Bases: BaseProjectTestConfig

reconcile_coverage test: ensure all keys from source exist in target.

Attributes:

Name Type Description
source ReconcileKeySide

Source side (table, key).

target ReconcileKeySide

Target side (table, key).

source_where str | None

Optional filter predicate applied to the source.

target_where str | None

Optional filter predicate applied to the target.

Example (YAML):

- type: reconcile_coverage
  source: { table: crm_users, key: "user_id" }
  target: { table: fact_orders, key: "user_id" }
  source_where: "status = 'active'"
  target_where: "dt >= current_date - interval '30 days'"
Source code in src/fastflowtransform/config/project.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
class ReconcileCoverageTestConfig(BaseProjectTestConfig):
    """
    `reconcile_coverage` test: ensure all keys from `source` exist in `target`.

    Attributes:
        source (ReconcileKeySide): Source side (`table`, `key`).
        target (ReconcileKeySide): Target side (`table`, `key`).
        source_where (str | None): Optional filter predicate applied to the source.
        target_where (str | None): Optional filter predicate applied to the target.

    Example (YAML):

    ```yaml
    - type: reconcile_coverage
      source: { table: crm_users, key: "user_id" }
      target: { table: fact_orders, key: "user_id" }
      source_where: "status = 'active'"
      target_where: "dt >= current_date - interval '30 days'"
    ```
    """

    type: Literal["reconcile_coverage"]

    source: ReconcileKeySide
    target: ReconcileKeySide

    source_where: str | None = None
    target_where: str | None = None

CustomProjectTestConfig

Bases: BaseProjectTestConfig

Catch-all config for user-defined tests declared in project.yml under tests:.

  • type: any non-empty string (must NOT match a built-in test type if you want this class to be used; built-ins still win first).
  • table / column: optional; for tests that don't need them, you can omit.
  • Extra keys are allowed and preserved (e.g. threshold, pattern, window_days).
Source code in src/fastflowtransform/config/project.py
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
class CustomProjectTestConfig(BaseProjectTestConfig):
    """
    Catch-all config for user-defined tests declared in project.yml under `tests:`.

    - `type`: any non-empty string (must NOT match a built-in test type if you want
      this class to be used; built-ins still win first).
    - `table` / `column`: optional; for tests that don't need them, you can omit.
    - Extra keys are allowed and preserved (e.g. threshold, pattern, window_days).
    """

    # Allow arbitrary extra keys; they'll be visible in model_dump().
    model_config = ConfigDict(extra="allow")

    type: str
    table: str | None = None
    column: str | None = None

ProjectConfig

Bases: BaseModel

Strict representation of project.yml.

Example:

name: duckdb_api_demo
version: "0.1"

vars: {}

models:
  storage: { ... }
  incremental: { ... }

seeds:
  storage: { ... }

tests:
  - type: not_null
    table: mart_users_join
    column: user_id
    tags: [batch]
Source code in src/fastflowtransform/config/project.py
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
class ProjectConfig(BaseModel):
    """
    Strict representation of project.yml.

    Example:

        name: duckdb_api_demo
        version: "0.1"

        vars: {}

        models:
          storage: { ... }
          incremental: { ... }

        seeds:
          storage: { ... }

        tests:
          - type: not_null
            table: mart_users_join
            column: user_id
            tags: [batch]
    """

    model_config = ConfigDict(extra="forbid")

    name: str
    version: str | int

    # Models directory (in case you want this configurable)
    models_dir: str = "models"

    # Arbitrary variables that can be accessed via var('key') in Jinja
    vars: dict[str, Any] = Field(default_factory=dict)

    models: ModelsBlock = Field(default_factory=ModelsBlock)
    seeds: SeedsBlock = Field(default_factory=SeedsBlock)

    tests: list[ProjectTestConfig] = Field(default_factory=list)

    docs: DocsConfig | None = None

    hooks: HooksConfig | None = None

parse_project_yaml_config

parse_project_yaml_config(project_dir)

Read project.yml under project_dir and validate it strictly using Pydantic.

Typical usage inside core._load_project_yaml:

from fastflowtransform.config.project import parse_project_yaml_config

proj_cfg = parse_project_yaml_config(project_dir)
self.project_vars = dict(proj_cfg.vars or {})

# models.storage → storage.set_model_storage(...)
# seeds.storage  → storage.set_seed_storage(...)
# models.incremental → self.incremental_models = ...
Source code in src/fastflowtransform/config/project.py
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
def parse_project_yaml_config(project_dir: Path) -> ProjectConfig:
    """
    Read project.yml under `project_dir` and validate it strictly using Pydantic.

    Typical usage inside core._load_project_yaml:

        from fastflowtransform.config.project import parse_project_yaml_config

        proj_cfg = parse_project_yaml_config(project_dir)
        self.project_vars = dict(proj_cfg.vars or {})

        # models.storage → storage.set_model_storage(...)
        # seeds.storage  → storage.set_seed_storage(...)
        # models.incremental → self.incremental_models = ...
    """
    cfg_path = project_dir / "project.yml"
    raw = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) or {}
    return ProjectConfig.model_validate(raw)