Skip to content

fastflowtransform.config.sources

FreshnessWindow

Bases: BaseModel

Time window: e.g. {count: 12, period: 'hour'}.

Source code in src/fastflowtransform/config/sources.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class FreshnessWindow(BaseModel):
    """Time window: e.g. {count: 12, period: 'hour'}."""

    model_config = ConfigDict(extra="forbid")

    count: int
    period: FreshnessPeriod

    @field_validator("count")
    @classmethod
    def _count_positive(cls, v: int) -> int:
        if v <= 0:
            raise ValueError("freshness.count must be > 0")
        return v

SourceFreshnessConfig

Bases: BaseModel

Freshness configuration for a source or table.

freshness: loaded_at_field: my_ts_col warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour}

Source code in src/fastflowtransform/config/sources.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class SourceFreshnessConfig(BaseModel):
    """
    Freshness configuration for a source or table.

      freshness:
        loaded_at_field: my_ts_col
        warn_after:  {count: 12, period: hour}
        error_after: {count: 24, period: hour}
    """

    model_config = ConfigDict(extra="forbid")

    loaded_at_field: str | None = None
    warn_after: FreshnessWindow | None = None
    error_after: FreshnessWindow | None = None

    def merged_with(self, other: SourceFreshnessConfig | None) -> SourceFreshnessConfig:
        """
        Return a new config where self overrides 'other'.
        Useful for table-level override over source-level defaults.
        """
        if other is None:
            return self
        return SourceFreshnessConfig(
            loaded_at_field=self.loaded_at_field or other.loaded_at_field,
            warn_after=self.warn_after or other.warn_after,
            error_after=self.error_after or other.error_after,
        )

merged_with

merged_with(other)

Return a new config where self overrides 'other'. Useful for table-level override over source-level defaults.

Source code in src/fastflowtransform/config/sources.py
165
166
167
168
169
170
171
172
173
174
175
176
def merged_with(self, other: SourceFreshnessConfig | None) -> SourceFreshnessConfig:
    """
    Return a new config where self overrides 'other'.
    Useful for table-level override over source-level defaults.
    """
    if other is None:
        return self
    return SourceFreshnessConfig(
        loaded_at_field=self.loaded_at_field or other.loaded_at_field,
        warn_after=self.warn_after or other.warn_after,
        error_after=self.error_after or other.error_after,
    )

SourceTableConfig

Bases: BaseModel

Schema for an individual table entry under a source group.

We allow extra keys so that future metadata (e.g. owner) doesn't break users, but we only use the known ones below when normalizing.

Source code in src/fastflowtransform/config/sources.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class SourceTableConfig(BaseModel):
    """
    Schema for an individual table entry under a source group.

    We allow extra keys so that future metadata (e.g. owner) doesn't break users,
    but we only *use* the known ones below when normalizing.
    """

    model_config = ConfigDict(extra="allow")

    name: str
    identifier: str | None = None

    # core location fields
    schema_: str | None = Field(default=None, alias="schema")
    database: str | None = None
    catalog: str | None = None
    project: str | None = None
    dataset: str | None = None
    location: str | None = None
    format: str | None = None
    options: dict[str, Any] | None = None

    overrides: dict[str, dict[str, Any]] | None = None

    # metadata
    description: str | None = None
    columns: Any | None = None
    meta: dict[str, Any] | None = None

    freshness: SourceFreshnessConfig | None = None

    @field_validator("options", mode="before")
    @classmethod
    def _normalize_opts(cls, v: Any) -> dict[str, Any] | None:
        if v is None:
            return None
        if isinstance(v, Mapping):
            return {str(k): v for k, v in v.items()}
        raise TypeError("options must be a mapping if provided")

SourceGroupConfig

Bases: BaseModel

Schema for each entry under top-level sources: in sources.yml.

Source code in src/fastflowtransform/config/sources.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
class SourceGroupConfig(BaseModel):
    """
    Schema for each entry under top-level `sources:` in sources.yml.
    """

    model_config = ConfigDict(extra="forbid")

    name: str

    # group-level location defaults
    schema_: str | None = Field(default=None, alias="schema")
    database: str | None = None
    catalog: str | None = None
    project: str | None = None
    dataset: str | None = None
    location: str | None = None
    format: str | None = None
    options: dict[str, Any] | None = None

    overrides: dict[str, dict[str, Any]] | None = None
    description: str | None = None
    meta: dict[str, Any] | None = None
    freshness: SourceFreshnessConfig | None = None

    tables: list[SourceTableConfig]

    @field_validator("options", mode="before")
    @classmethod
    def _normalize_opts(cls, v: Any) -> dict[str, Any] | None:
        if v is None:
            return None
        if isinstance(v, Mapping):
            return {str(k): v for k, v in v.items()}
        raise TypeError("options must be a mapping if provided")

SourcesFileConfig

Bases: BaseModel

Strict representation of sources.yml (version 2).

Source code in src/fastflowtransform/config/sources.py
301
302
303
304
305
306
307
308
309
class SourcesFileConfig(BaseModel):
    """
    Strict representation of sources.yml (version 2).
    """

    model_config = ConfigDict(extra="forbid")

    version: Literal[2]
    sources: list[SourceGroupConfig] = Field(default_factory=list)

load_sources_config

load_sources_config(project_dir)

Read sources.yml under project_dir, validate it with Pydantic, and return the normalized dict that Registry expects.

This function is the direct analogue of parse_project_yaml_config.

Source code in src/fastflowtransform/config/sources.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def load_sources_config(project_dir: Path) -> dict[str, dict[str, dict[str, Any]]]:
    """
    Read `sources.yml` under `project_dir`, validate it with Pydantic, and
    return the normalized dict that Registry expects.

    This function is the direct analogue of `parse_project_yaml_config`.
    """
    cfg_path = project_dir / "sources.yml"
    raw = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) or {}

    try:
        parsed = SourcesFileConfig.model_validate(raw)
    except Exception:  # pydantic.ValidationError, yaml issues bubbled up earlier
        # Let the caller wrap this into a friendlier "Failed to parse sources.yml" message
        raise

    return _normalize_sources(parsed)

resolve_source_entry

resolve_source_entry(entry, engine, *, default_identifier=None)

Apply engine overrides to a normalized entry ("base" + "overrides").

This is unchanged from your current implementation.

Source code in src/fastflowtransform/config/sources.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def resolve_source_entry(
    entry: Mapping[str, Any],
    engine: str | None,
    *,
    default_identifier: str | None = None,
) -> dict[str, Any]:
    """
    Apply engine overrides to a normalized entry ("base" + "overrides").

    This is unchanged from your current implementation.
    """
    base = entry.get("base") if isinstance(entry, Mapping) else None
    if not isinstance(base, Mapping):
        base = {}

    cfg = dict(base)
    cfg.setdefault("identifier", None)
    cfg.setdefault("schema", None)
    cfg.setdefault("database", None)
    cfg.setdefault("catalog", None)
    cfg.setdefault("project", None)
    cfg.setdefault("dataset", None)
    cfg.setdefault("location", None)
    cfg.setdefault("format", None)
    cfg.setdefault("options", {})

    overrides = entry.get("overrides") if isinstance(entry, Mapping) else None
    if isinstance(overrides, Mapping):
        # wildcard/default overrides
        for wildcard_key in ("*", "default", "any"):
            if wildcard_key in overrides:
                cfg = _merge_source_configs(cfg, overrides[wildcard_key])
        # engine-specific overrides
        if engine and engine in overrides:
            cfg = _merge_source_configs(cfg, overrides[engine])

    ident = cfg.get("identifier")
    if (ident is None or ident == "") and not cfg.get("location"):
        if default_identifier:
            cfg["identifier"] = default_identifier
        else:
            raise KeyError("Source configuration missing identifier or location")

    return cfg