Skip to content

fastflowtransform.cli.bootstrap

CLIContext dataclass

Source code in src/fastflowtransform/cli/bootstrap.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@dataclass
class CLIContext:
    project: Path
    env_name: str
    jinja_env: Environment
    env_settings: EnvSettings
    profile: Profile
    budgets_cfg: BudgetsConfig | None = None
    artifacts_mode: str = "files"
    artifacts_pg_dsn: str | None = None
    artifacts_pg_schema: str | None = None

    def make_executor(self) -> tuple[BaseExecutor, Callable, Callable]:
        executor, run_sql, run_py = _make_executor(self.profile, self.jinja_env)
        self._configure_budget_limit(executor)
        return executor, run_sql, run_py

    def make_artifacts_store(self) -> PostgresArtifactsStore | None:
        """
        Create a PostgresArtifactsStore if artifacts.mode is 'db' or 'both'.
        Warn+continue behavior is applied at call sites (or via store.safe_call).
        """
        mode = (self.artifacts_mode or "files").lower().strip()
        if mode not in ("db", "both"):
            return None
        if not self.artifacts_pg_dsn or not self.artifacts_pg_schema:
            return None
        return PostgresArtifactsStore(dsn=self.artifacts_pg_dsn, schema=self.artifacts_pg_schema)

    def _configure_budget_limit(self, executor: Any) -> None:
        if executor is None or not hasattr(executor, "configure_query_budget_limit"):
            return
        limit = None
        engine_name = (self.profile.engine or "").lower()
        if self.budgets_cfg and engine_name:
            entry = self.budgets_cfg.query_limits.get(engine_name)
            if entry:
                limit = entry.max_bytes
        executor.configure_query_budget_limit(limit)

make_artifacts_store

make_artifacts_store()

Create a PostgresArtifactsStore if artifacts.mode is 'db' or 'both'. Warn+continue behavior is applied at call sites (or via store.safe_call).

Source code in src/fastflowtransform/cli/bootstrap.py
51
52
53
54
55
56
57
58
59
60
61
def make_artifacts_store(self) -> PostgresArtifactsStore | None:
    """
    Create a PostgresArtifactsStore if artifacts.mode is 'db' or 'both'.
    Warn+continue behavior is applied at call sites (or via store.safe_call).
    """
    mode = (self.artifacts_mode or "files").lower().strip()
    if mode not in ("db", "both"):
        return None
    if not self.artifacts_pg_dsn or not self.artifacts_pg_schema:
        return None
    return PostgresArtifactsStore(dsn=self.artifacts_pg_dsn, schema=self.artifacts_pg_schema)

configure_executor_contracts

configure_executor_contracts(project_dir, executor)

Load contracts from project_dir and attach them to the executor (if supported).

Mirrors the behaviour in fft run: parse per-table contracts and the project-level contracts.yml; on parse errors, log a warning and continue without contracts.

Source code in src/fastflowtransform/cli/bootstrap.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def configure_executor_contracts(project_dir: Path, executor: BaseExecutor | None) -> None:
    """
    Load contracts from project_dir and attach them to the executor (if supported).

    Mirrors the behaviour in `fft run`: parse per-table contracts and the
    project-level contracts.yml; on parse errors, log a warning and continue
    without contracts.
    """
    if executor is None or not hasattr(executor, "configure_contracts"):
        return

    try:
        contracts_by_table = load_contracts(project_dir)
        project_contracts = _load_project_contracts(project_dir)
    except Exception as exc:
        warn(f"[contracts] Failed to load contracts from {project_dir}: {exc}")
        contracts_by_table = {}
        project_contracts = None

    with suppress(Exception):
        executor.configure_contracts(contracts_by_table, project_contracts)