Skip to content

fastflowtransform.executors.query_stats.runtime.bigquery

BigQueryQueryStatsRuntime

Bases: BaseQueryStatsRuntime[QueryStatsExecutor]

BigQuery-specific stats helpers.

Source code in src/fastflowtransform/executors/query_stats/runtime/bigquery.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class BigQueryQueryStatsRuntime(BaseQueryStatsRuntime[QueryStatsExecutor]):
    """BigQuery-specific stats helpers."""

    def wrap_job(self, job: Any) -> _TrackedQueryJob:
        return _TrackedQueryJob(job, on_complete=self.record_job)

    def record_dataframe(self, df: pd.DataFrame, duration_ms: int) -> QueryStats:
        rows = len(df)
        bytes_estimate = int(df.memory_usage(deep=True).sum()) if rows > 0 else 0
        bytes_val = bytes_estimate if bytes_estimate > 0 else None
        stats = QueryStats(
            bytes_processed=bytes_val,
            rows=rows if rows > 0 else None,
            duration_ms=duration_ms,
        )
        self.executor._record_query_stats(stats)
        return stats

record_result

record_result(result, *, timer=None, duration_ms=None, estimated_bytes=None, adapter=None, sql=None, rowcount_extractor=None, extra_stats=None, post_estimate_fn=None)

Collect stats from a result object and record them on the executor.

Either pass a timer (from start_timer) or an explicit duration_ms. If no adapter is given, a simple QueryStats with bytes/duration is recorded.

Source code in src/fastflowtransform/executors/query_stats/runtime/base.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def record_result(
    self,
    result: Any,
    *,
    timer: QueryTimer | None = None,
    duration_ms: int | None = None,
    estimated_bytes: int | None = None,
    adapter: QueryStatsAdapter | None = None,
    sql: str | None = None,
    rowcount_extractor: Callable[[Any], int | None] | None = None,
    extra_stats: Callable[[Any], QueryStats | None] | None = None,
    post_estimate_fn: Callable[[str, Any], int | None] | None = None,
) -> QueryStats:
    """
    Collect stats from a result object and record them on the executor.

    Either pass a timer (from start_timer) or an explicit duration_ms.
    If no adapter is given, a simple QueryStats with bytes/duration is recorded.
    """
    if duration_ms is None and timer is not None:
        duration_ms = int((perf_counter() - timer.started_at) * 1000)

    stats_adapter = adapter
    if stats_adapter is None and (rowcount_extractor or extra_stats or post_estimate_fn):
        stats_adapter = RowcountStatsAdapter(
            rowcount_extractor=rowcount_extractor,
            extra_stats=extra_stats,
            post_estimate_fn=post_estimate_fn,
            sql=sql,
        )

    if stats_adapter is None:
        stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
    else:
        stats = stats_adapter.collect(
            result, duration_ms=duration_ms, estimated_bytes=estimated_bytes
        )

    self.executor._record_query_stats(stats)
    return stats