Skip to content

fastflowtransform.cli.source_cmd

freshness

freshness(project='.', env_name='dev', engine=None, vars=None)

Check source-table freshness as configured in sources.yml.

Usage

fft source freshness . fft source freshness . --env prod --engine duckdb

Source code in src/fastflowtransform/cli/source_cmd.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def freshness(
    project: ProjectArg = ".",
    env_name: EnvOpt = "dev",
    engine: EngineOpt = None,
    vars: VarsOpt = None,
) -> None:
    """
    Check source-table freshness as configured in sources.yml.

    Usage:
        fft source freshness .
        fft source freshness . --env prod --engine duckdb
    """
    # Build CLIContext (loads project, registry, env, profile, etc.)
    ctx = _prepare_context(project, env_name, engine, vars)
    bind_context(engine=ctx.profile.engine, env=env_name)

    echo(f"[FFT] Profile: {env_name} | Engine: {ctx.profile.engine}")

    # Get a live connection / executor from the context
    execu, _run_sql, _run_py = ctx.make_executor()
    con = _get_test_con(execu)

    # Run freshness checks over all sources with a configured freshness block
    results: list[SourceFreshnessResult] = run_source_freshness(
        execu,
        con=con,
        engine=ctx.profile.engine,
    )

    if not results:
        echo("No sources with freshness configuration found in sources.yml.")
        clear_context()
        return

    # Sort for stable output
    results = sorted(results, key=lambda r: (r.source_name, r.table_name))

    echo("")
    echo("Source Freshness Summary")
    echo("───────────────────────")

    header = (
        f"{'source.table':<30}  {'status':<7}  {'delay_min':>9}  {'warn_min':>9}  {'error_min':>9}"
    )
    echo(header)

    any_error = False

    for r in results:
        key = f"{r.source_name}.{r.table_name}"
        sym = {"pass": "✓", "warn": "!", "error": "✖"}.get(r.status, "?")
        delay = format_duration_minutes(r.delay_minutes)
        warn_after = "-" if r.warn_after_minutes is None else str(r.warn_after_minutes)
        err_after = "-" if r.error_after_minutes is None else str(r.error_after_minutes)

        echo(f"{key:<30}  {sym + ' ' + r.status:<7}  {delay:>9}  {warn_after:>9}  {err_after:>9}")

        if r.error:
            # Indent the error line for readability
            echo(f"    ↳ {r.error}")

        if r.status == "error":
            any_error = True

    clear_context()

    if any_error:
        # Non-zero exit code if any source is in ERROR state
        raise typer.Exit(1)
    return

register

register(app)

Attach 'source freshness' as a sub-command, analogous to how run/test are registered.

Source code in src/fastflowtransform/cli/source_cmd.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def register(app: typer.Typer) -> None:
    """
    Attach 'source freshness' as a sub-command, analogous to how run/test are registered.
    """
    source_app = typer.Typer(
        name="source",
        help="Source metadata utilities (freshness checks, etc.).",
        no_args_is_help=True,
        add_completion=False,
    )

    source_app.command(
        "freshness",
        help="Run freshness checks defined in sources.yml.",
    )(freshness)

    app.add_typer(source_app, name="source")