Skip to content

fastflowtransform.executors._budget_runner

run_sql_with_budget

run_sql_with_budget(executor, sql, *, guard, exec_fn, rowcount_extractor=None, extra_stats=None, estimate_fn=None, post_estimate_fn=None, record_stats=True, stats_adapter=None)

Shared helper for guarded SQL execution with timing + stats recording.

executor object exposing _apply_budget_guard, _is_budget_guard_active, _record_query_stats sql statement (used for guard + optional estimator) exec_fn callable that executes the statement and returns a result/job handle rowcount_extractor(result) -> int|None best-effort row count (non-negative only) extra_stats(result) -> QueryStats|None allows engines to override/extend stats post-exec estimate_fn(sql) -> int|None optional best-effort bytes estimate when guard inactive post_estimate_fn(sql, result) -> int|None optional post-exec fallback when bytes are still None record_stats set False to skip immediate stats (e.g., when a job handle records on .result())

Source code in src/fastflowtransform/executors/_budget_runner.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def run_sql_with_budget(
    executor: Any,
    sql: str,
    *,
    guard: BudgetGuard,
    exec_fn: Callable[[], Any],
    rowcount_extractor: Callable[[Any], int | None] | None = None,
    extra_stats: Callable[[Any], QueryStats | None] | None = None,
    estimate_fn: Callable[[str], int | None] | None = None,
    post_estimate_fn: Callable[[str, Any], int | None] | None = None,
    record_stats: bool = True,
    stats_adapter: QueryStatsAdapter | None = None,
) -> Any:
    """
    Shared helper for guarded SQL execution with timing + stats recording.

    executor      object exposing _apply_budget_guard, _is_budget_guard_active, _record_query_stats
    sql           statement (used for guard + optional estimator)
    exec_fn       callable that executes the statement and returns a result/job handle
    rowcount_extractor(result) -> int|None    best-effort row count (non-negative only)
    extra_stats(result) -> QueryStats|None    allows engines to override/extend stats post-exec
    estimate_fn(sql) -> int|None              optional best-effort bytes estimate when guard
                                              inactive
    post_estimate_fn(sql, result) -> int|None optional post-exec fallback when bytes are still None
    record_stats  set False to skip immediate stats (e.g., when a job handle records on .result())
    """
    estimated_bytes = executor._apply_budget_guard(guard, sql)
    if (
        estimated_bytes is None
        and not executor._is_budget_guard_active()
        and estimate_fn is not None
    ):
        with suppress(Exception):
            estimated_bytes = estimate_fn(sql)

    # If stats should be deferred (BigQuery job handles), just run and return.
    if not record_stats:
        return exec_fn()

    started = perf_counter()
    result = exec_fn()
    duration_ms = int((perf_counter() - started) * 1000)

    adapter = stats_adapter
    if adapter is None and (rowcount_extractor or post_estimate_fn or extra_stats):
        adapter = RowcountStatsAdapter(
            rowcount_extractor=rowcount_extractor,
            post_estimate_fn=post_estimate_fn,
            extra_stats=extra_stats,
            sql=sql,
        )
    if adapter is None:
        stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
    else:
        stats = adapter.collect(result, duration_ms=duration_ms, estimated_bytes=estimated_bytes)

    executor._record_query_stats(stats)
    return result