run_sql_with_budget(executor, sql, *, guard, exec_fn, rowcount_extractor=None, extra_stats=None, estimate_fn=None, post_estimate_fn=None, record_stats=True, stats_adapter=None)
Shared helper for guarded SQL execution with timing + stats recording.
executor object exposing _apply_budget_guard, _is_budget_guard_active, _record_query_stats
sql statement (used for guard + optional estimator)
exec_fn callable that executes the statement and returns a result/job handle
rowcount_extractor(result) -> int|None best-effort row count (non-negative only)
extra_stats(result) -> QueryStats|None allows engines to override/extend stats post-exec
estimate_fn(sql) -> int|None optional best-effort bytes estimate when guard
inactive
post_estimate_fn(sql, result) -> int|None optional post-exec fallback when bytes are still None
record_stats set False to skip immediate stats (e.g., when a job handle records on .result())
Source code in src/fastflowtransform/executors/_budget_runner.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 | def run_sql_with_budget(
executor: Any,
sql: str,
*,
guard: BudgetGuard,
exec_fn: Callable[[], Any],
rowcount_extractor: Callable[[Any], int | None] | None = None,
extra_stats: Callable[[Any], QueryStats | None] | None = None,
estimate_fn: Callable[[str], int | None] | None = None,
post_estimate_fn: Callable[[str, Any], int | None] | None = None,
record_stats: bool = True,
stats_adapter: QueryStatsAdapter | None = None,
) -> Any:
"""
Shared helper for guarded SQL execution with timing + stats recording.
executor object exposing _apply_budget_guard, _is_budget_guard_active, _record_query_stats
sql statement (used for guard + optional estimator)
exec_fn callable that executes the statement and returns a result/job handle
rowcount_extractor(result) -> int|None best-effort row count (non-negative only)
extra_stats(result) -> QueryStats|None allows engines to override/extend stats post-exec
estimate_fn(sql) -> int|None optional best-effort bytes estimate when guard
inactive
post_estimate_fn(sql, result) -> int|None optional post-exec fallback when bytes are still None
record_stats set False to skip immediate stats (e.g., when a job handle records on .result())
"""
estimated_bytes = executor._apply_budget_guard(guard, sql)
if (
estimated_bytes is None
and not executor._is_budget_guard_active()
and estimate_fn is not None
):
with suppress(Exception):
estimated_bytes = estimate_fn(sql)
# If stats should be deferred (BigQuery job handles), just run and return.
if not record_stats:
return exec_fn()
started = perf_counter()
result = exec_fn()
duration_ms = int((perf_counter() - started) * 1000)
adapter = stats_adapter
if adapter is None and (rowcount_extractor or post_estimate_fn or extra_stats):
adapter = RowcountStatsAdapter(
rowcount_extractor=rowcount_extractor,
post_estimate_fn=post_estimate_fn,
extra_stats=extra_stats,
sql=sql,
)
if adapter is None:
stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
else:
stats = adapter.collect(result, duration_ms=duration_ms, estimated_bytes=estimated_bytes)
executor._record_query_stats(stats)
return result
|