Skip to content

fastflowtransform.seeding

SeedTarget

Bases: NamedTuple

Resolved seed target (schema, table).

Source code in src/fastflowtransform/seeding.py
258
259
260
261
262
class SeedTarget(NamedTuple):
    """Resolved seed target (schema, table)."""

    schema: str | None
    table: str

materialize_seed

materialize_seed(table, df, executor, schema=None)

Materialize a DataFrame as a database table across engines.

Source code in src/fastflowtransform/seeding.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def materialize_seed(
    table: str, df: pd.DataFrame, executor: Any, schema: str | None = None
) -> None:
    """
    Materialize a DataFrame as a database table across engines.
    """
    t0 = perf_counter()
    result, full_name, created_schema = executor.load_seed(table, df, schema)
    dt_ms = int((perf_counter() - t0) * 1000)

    _echo_seed_line(
        full_name=full_name,
        rows=len(df),
        cols=df.shape[1],
        engine=executor.engine_name,
        ms=dt_ms,
        created_schema=created_schema,
        action="replaced",
    )
    return result

seed_project

seed_project(project_dir, executor, default_schema=None)

Load every seed file under /seeds recursively and materialize it.

Supports configuration in seeds/schema.yml (validated via Pydantic):

targets: : # e.g., "raw/users" (path-based, recommended) schema: # global target schema table: # optional rename schema_by_engine: # optional engine overrides (EngineType keys) postgres: raw duckdb: main

dtypes: : column_a: string column_b: int64

Resolution priority for (schema, table): 1) targets[] (e.g., "raw/users") 2) targets[] (e.g., "raw.users") 3) targets[] (only if stem is unique) 4) executor.schema or default_schema

Returns:

Type Description
int

Number of successfully materialized seed tables.

Raises:

Type Description
ValueError

if schema.yml uses a plain stem key while multiple files share that stem.

Source code in src/fastflowtransform/seeding.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def seed_project(project_dir: Path, executor: Any, default_schema: str | None = None) -> int:
    """
    Load every seed file under <project>/seeds recursively and materialize it.

    Supports configuration in seeds/schema.yml (validated via Pydantic):

      targets:
        <seed-id>:                 # e.g., "raw/users" (path-based, recommended)
          schema: <schema-name>    # global target schema
          table: <table-name>      # optional rename
          schema_by_engine:        # optional engine overrides (EngineType keys)
            postgres: raw
            duckdb: main

      dtypes:
        <table-key>:
          column_a: string
          column_b: int64

    Resolution priority for (schema, table):
      1) targets[<seed-id>]  (e.g., "raw/users")
      2) targets[<seed-id with dots>] (e.g., "raw.users")
      3) targets[<stem>] (*only* if stem is unique)
      4) executor.schema or default_schema

    Returns:
      Number of successfully materialized seed tables.

    Raises:
      ValueError: if schema.yml uses a plain stem key while multiple files share that stem.
    """
    seeds_dir = _resolve_seeds_dir(project_dir)
    if not seeds_dir.exists():
        return 0

    # Pydantic-validated seeds/schema.yml (or None if not present)
    schema_cfg = load_seeds_schema(project_dir, seeds_dir=seeds_dir)

    # Collect seed files recursively to allow folder-based schema conventions.
    paths: list[Path] = [
        p
        for p in sorted(seeds_dir.rglob("*"))
        if p.is_file() and p.suffix.lower() in (".csv", ".parquet", ".pq")
    ]
    if not paths:
        return 0

    # Check for ambiguous stems (same filename in different folders).
    stem_counts: dict[str, int] = {}
    for p in paths:
        stem_counts[p.stem] = stem_counts.get(p.stem, 0) + 1

    count = 0
    for path in paths:
        seedid = _seed_id(seeds_dir, path)
        stem = path.stem

        # Default schema may come from executor or caller.
        base_schema = getattr(executor, "schema", None) or default_schema
        schema, table = _resolve_schema_and_table_by_cfg(
            seedid, stem, schema_cfg, executor, base_schema
        )

        # If schema.yml uses a bare stem while that stem exists multiple times,
        # force disambiguation.
        if schema_cfg and stem in schema_cfg.targets and stem_counts.get(stem, 0) > 1:
            raise ValueError(
                f'Seed stem "{stem}" appears multiple times. '
                f"Please configure using the path-based seed ID "
                f'(e.g., "{seedid}") in seeds/schema.yml.'
            )

        df = _read_seed_file(path)
        # Use the resolved *table* key for schema enforcement (allows rename-aware mapping).
        df = _apply_schema(df, table, schema_cfg, seedid)
        df = _apply_column_schema(df, table, schema_cfg, executor, seedid)
        df = _inject_seed_metadata(df, seedid, path)

        materialize_seed(table, df, executor, schema=schema)
        count += 1

    return count