Skip to content

fastflowtransform.executors.budget.runtime.base

BudgetExecutor

Bases: Protocol

Minimal executor surface used by budget runtimes.

Source code in src/fastflowtransform/executors/budget/runtime/base.py
13
14
15
16
17
class BudgetExecutor(Protocol):
    """Minimal executor surface used by budget runtimes."""

    def _apply_budget_guard(self, guard: BudgetGuard | None, sql: str) -> int | None: ...
    def _is_budget_guard_active(self) -> bool: ...

BaseBudgetRuntime

Base runtime for per-query budget enforcement.

Executors compose this (like runtime contracts) and delegate guarded execution through it.

Source code in src/fastflowtransform/executors/budget/runtime/base.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class BaseBudgetRuntime[E: BudgetExecutor]:
    """
    Base runtime for per-query budget enforcement.

    Executors compose this (like runtime contracts) and delegate guarded
    execution through it.
    """

    executor: E
    guard: BudgetGuard | None

    def __init__(self, executor: E, guard: BudgetGuard | None = None):
        self.executor = executor
        self.guard = guard or getattr(type(self), "DEFAULT_GUARD", None)

    def apply_guard(self, sql: str) -> int | None:
        return self.executor._apply_budget_guard(self.guard, sql)

    def run_sql(
        self,
        sql: str,
        *,
        exec_fn: Callable[[], Any],
        stats_runtime: BaseQueryStatsRuntime,
        rowcount_extractor: Callable[[Any], int | None] | None = None,
        extra_stats: Callable[[Any], Any] | None = None,
        estimate_fn: Callable[[str], int | None] | None = None,
        post_estimate_fn: Callable[[str, Any], int | None] | None = None,
        record_stats: bool = True,
        stats_adapter: QueryStatsAdapter | None = None,
    ) -> Any:
        estimated_bytes = self.apply_guard(sql)
        estimator = estimate_fn or getattr(self, "estimate_query_bytes", None)
        if (
            estimated_bytes is None
            and not self.executor._is_budget_guard_active()
            and callable(estimator)
        ):
            with suppress(Exception):
                estimated_bytes = estimator(sql)

        if not record_stats:
            return exec_fn()

        started = perf_counter()
        result = exec_fn()
        duration_ms = int((perf_counter() - started) * 1000)

        adapter = stats_adapter
        if adapter is None and (rowcount_extractor or post_estimate_fn or extra_stats):
            adapter = RowcountStatsAdapter(
                rowcount_extractor=rowcount_extractor,
                post_estimate_fn=post_estimate_fn,
                extra_stats=extra_stats,
                sql=sql,
            )

        stats_runtime.record_result(
            result,
            duration_ms=duration_ms,
            estimated_bytes=estimated_bytes,
            adapter=adapter,
            sql=sql,
        )

        return result