Skip to content

fastflowtransform.executors.query_stats.runtime

BaseQueryStatsRuntime

Base runtime for collecting per-query stats.

Executors compose this (like runtime contracts) and delegate stat recording so the run engine can aggregate per-node metrics.

Source code in src/fastflowtransform/executors/query_stats/runtime/base.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class BaseQueryStatsRuntime[E: QueryStatsExecutor]:
    """
    Base runtime for collecting per-query stats.

    Executors compose this (like runtime contracts) and delegate stat recording
    so the run engine can aggregate per-node metrics.
    """

    executor: E

    def __init__(self, executor: E):
        self.executor = executor

    def start_timer(self) -> QueryTimer:
        return QueryTimer(started_at=perf_counter())

    def record_result(
        self,
        result: Any,
        *,
        timer: QueryTimer | None = None,
        duration_ms: int | None = None,
        estimated_bytes: int | None = None,
        adapter: QueryStatsAdapter | None = None,
        sql: str | None = None,
        rowcount_extractor: Callable[[Any], int | None] | None = None,
        extra_stats: Callable[[Any], QueryStats | None] | None = None,
        post_estimate_fn: Callable[[str, Any], int | None] | None = None,
    ) -> QueryStats:
        """
        Collect stats from a result object and record them on the executor.

        Either pass a timer (from start_timer) or an explicit duration_ms.
        If no adapter is given, a simple QueryStats with bytes/duration is recorded.
        """
        if duration_ms is None and timer is not None:
            duration_ms = int((perf_counter() - timer.started_at) * 1000)

        stats_adapter = adapter
        if stats_adapter is None and (rowcount_extractor or extra_stats or post_estimate_fn):
            stats_adapter = RowcountStatsAdapter(
                rowcount_extractor=rowcount_extractor,
                extra_stats=extra_stats,
                post_estimate_fn=post_estimate_fn,
                sql=sql,
            )

        if stats_adapter is None:
            stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
        else:
            stats = stats_adapter.collect(
                result, duration_ms=duration_ms, estimated_bytes=estimated_bytes
            )

        self.executor._record_query_stats(stats)
        return stats

    def record_job(self, job: Any) -> QueryStats:
        adapter = JobStatsAdapter()
        stats = adapter.collect(job)
        self.executor._record_query_stats(stats)
        return stats

record_result

record_result(result, *, timer=None, duration_ms=None, estimated_bytes=None, adapter=None, sql=None, rowcount_extractor=None, extra_stats=None, post_estimate_fn=None)

Collect stats from a result object and record them on the executor.

Either pass a timer (from start_timer) or an explicit duration_ms. If no adapter is given, a simple QueryStats with bytes/duration is recorded.

Source code in src/fastflowtransform/executors/query_stats/runtime/base.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def record_result(
    self,
    result: Any,
    *,
    timer: QueryTimer | None = None,
    duration_ms: int | None = None,
    estimated_bytes: int | None = None,
    adapter: QueryStatsAdapter | None = None,
    sql: str | None = None,
    rowcount_extractor: Callable[[Any], int | None] | None = None,
    extra_stats: Callable[[Any], QueryStats | None] | None = None,
    post_estimate_fn: Callable[[str, Any], int | None] | None = None,
) -> QueryStats:
    """
    Collect stats from a result object and record them on the executor.

    Either pass a timer (from start_timer) or an explicit duration_ms.
    If no adapter is given, a simple QueryStats with bytes/duration is recorded.
    """
    if duration_ms is None and timer is not None:
        duration_ms = int((perf_counter() - timer.started_at) * 1000)

    stats_adapter = adapter
    if stats_adapter is None and (rowcount_extractor or extra_stats or post_estimate_fn):
        stats_adapter = RowcountStatsAdapter(
            rowcount_extractor=rowcount_extractor,
            extra_stats=extra_stats,
            post_estimate_fn=post_estimate_fn,
            sql=sql,
        )

    if stats_adapter is None:
        stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
    else:
        stats = stats_adapter.collect(
            result, duration_ms=duration_ms, estimated_bytes=estimated_bytes
        )

    self.executor._record_query_stats(stats)
    return stats

BigQueryQueryStatsRuntime

Bases: BaseQueryStatsRuntime[QueryStatsExecutor]

BigQuery-specific stats helpers.

Source code in src/fastflowtransform/executors/query_stats/runtime/bigquery.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class BigQueryQueryStatsRuntime(BaseQueryStatsRuntime[QueryStatsExecutor]):
    """BigQuery-specific stats helpers."""

    def wrap_job(self, job: Any) -> _TrackedQueryJob:
        return _TrackedQueryJob(job, on_complete=self.record_job)

    def record_dataframe(self, df: pd.DataFrame, duration_ms: int) -> QueryStats:
        rows = len(df)
        bytes_estimate = int(df.memory_usage(deep=True).sum()) if rows > 0 else 0
        bytes_val = bytes_estimate if bytes_estimate > 0 else None
        stats = QueryStats(
            bytes_processed=bytes_val,
            rows=rows if rows > 0 else None,
            duration_ms=duration_ms,
        )
        self.executor._record_query_stats(stats)
        return stats

record_result

record_result(result, *, timer=None, duration_ms=None, estimated_bytes=None, adapter=None, sql=None, rowcount_extractor=None, extra_stats=None, post_estimate_fn=None)

Collect stats from a result object and record them on the executor.

Either pass a timer (from start_timer) or an explicit duration_ms. If no adapter is given, a simple QueryStats with bytes/duration is recorded.

Source code in src/fastflowtransform/executors/query_stats/runtime/base.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def record_result(
    self,
    result: Any,
    *,
    timer: QueryTimer | None = None,
    duration_ms: int | None = None,
    estimated_bytes: int | None = None,
    adapter: QueryStatsAdapter | None = None,
    sql: str | None = None,
    rowcount_extractor: Callable[[Any], int | None] | None = None,
    extra_stats: Callable[[Any], QueryStats | None] | None = None,
    post_estimate_fn: Callable[[str, Any], int | None] | None = None,
) -> QueryStats:
    """
    Collect stats from a result object and record them on the executor.

    Either pass a timer (from start_timer) or an explicit duration_ms.
    If no adapter is given, a simple QueryStats with bytes/duration is recorded.
    """
    if duration_ms is None and timer is not None:
        duration_ms = int((perf_counter() - timer.started_at) * 1000)

    stats_adapter = adapter
    if stats_adapter is None and (rowcount_extractor or extra_stats or post_estimate_fn):
        stats_adapter = RowcountStatsAdapter(
            rowcount_extractor=rowcount_extractor,
            extra_stats=extra_stats,
            post_estimate_fn=post_estimate_fn,
            sql=sql,
        )

    if stats_adapter is None:
        stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
    else:
        stats = stats_adapter.collect(
            result, duration_ms=duration_ms, estimated_bytes=estimated_bytes
        )

    self.executor._record_query_stats(stats)
    return stats

DatabricksSparkQueryStatsRuntime

Bases: BaseQueryStatsRuntime[QueryStatsExecutor]

Spark-specific stats helpers.

Source code in src/fastflowtransform/executors/query_stats/runtime/databricks_spark.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class DatabricksSparkQueryStatsRuntime(BaseQueryStatsRuntime[QueryStatsExecutor]):
    """Spark-specific stats helpers."""

    def rowcount_from_result(self, result: Any) -> int | None:
        # Avoid triggering extra Spark actions; rely on estimates instead.
        rc = getattr(result, "count", None)
        if isinstance(rc, int) and rc >= 0:
            return rc
        return None

    def record_dataframe(self, df: Any, duration_ms: int) -> QueryStats:
        budget_runtime = getattr(self.executor, "runtime_budget", None)
        adapter = (
            budget_runtime.spark_stats_adapter("<dataframe>")
            if budget_runtime is not None
            else SparkDataFrameStatsAdapter(lambda _: None)
        )
        stats = adapter.collect(df, duration_ms=duration_ms, estimated_bytes=None)
        self.executor._record_query_stats(stats)
        return stats

record_result

record_result(result, *, timer=None, duration_ms=None, estimated_bytes=None, adapter=None, sql=None, rowcount_extractor=None, extra_stats=None, post_estimate_fn=None)

Collect stats from a result object and record them on the executor.

Either pass a timer (from start_timer) or an explicit duration_ms. If no adapter is given, a simple QueryStats with bytes/duration is recorded.

Source code in src/fastflowtransform/executors/query_stats/runtime/base.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def record_result(
    self,
    result: Any,
    *,
    timer: QueryTimer | None = None,
    duration_ms: int | None = None,
    estimated_bytes: int | None = None,
    adapter: QueryStatsAdapter | None = None,
    sql: str | None = None,
    rowcount_extractor: Callable[[Any], int | None] | None = None,
    extra_stats: Callable[[Any], QueryStats | None] | None = None,
    post_estimate_fn: Callable[[str, Any], int | None] | None = None,
) -> QueryStats:
    """
    Collect stats from a result object and record them on the executor.

    Either pass a timer (from start_timer) or an explicit duration_ms.
    If no adapter is given, a simple QueryStats with bytes/duration is recorded.
    """
    if duration_ms is None and timer is not None:
        duration_ms = int((perf_counter() - timer.started_at) * 1000)

    stats_adapter = adapter
    if stats_adapter is None and (rowcount_extractor or extra_stats or post_estimate_fn):
        stats_adapter = RowcountStatsAdapter(
            rowcount_extractor=rowcount_extractor,
            extra_stats=extra_stats,
            post_estimate_fn=post_estimate_fn,
            sql=sql,
        )

    if stats_adapter is None:
        stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
    else:
        stats = stats_adapter.collect(
            result, duration_ms=duration_ms, estimated_bytes=estimated_bytes
        )

    self.executor._record_query_stats(stats)
    return stats

DuckQueryStatsRuntime

Bases: BaseQueryStatsRuntime[QueryStatsExecutor]

DuckDB-specific runtime logic for stats extraction.

Source code in src/fastflowtransform/executors/query_stats/runtime/duckdb.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class DuckQueryStatsRuntime(BaseQueryStatsRuntime[QueryStatsExecutor]):
    """DuckDB-specific runtime logic for stats extraction."""

    def rowcount_from_result(self, result: Any) -> int | None:
        rc = getattr(result, "rowcount", None)
        if isinstance(rc, int) and rc >= 0:
            return rc
        return None

    def record_dataframe(self, df: pd.DataFrame, duration_ms: int) -> QueryStats:
        rows = len(df)
        bytes_estimate = int(df.memory_usage(deep=True).sum()) if rows > 0 else 0
        bytes_val = bytes_estimate if bytes_estimate > 0 else None
        stats = QueryStats(
            bytes_processed=bytes_val,
            rows=rows if rows > 0 else None,
            duration_ms=duration_ms,
        )
        self.executor._record_query_stats(stats)
        return stats

record_result

record_result(result, *, timer=None, duration_ms=None, estimated_bytes=None, adapter=None, sql=None, rowcount_extractor=None, extra_stats=None, post_estimate_fn=None)

Collect stats from a result object and record them on the executor.

Either pass a timer (from start_timer) or an explicit duration_ms. If no adapter is given, a simple QueryStats with bytes/duration is recorded.

Source code in src/fastflowtransform/executors/query_stats/runtime/base.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def record_result(
    self,
    result: Any,
    *,
    timer: QueryTimer | None = None,
    duration_ms: int | None = None,
    estimated_bytes: int | None = None,
    adapter: QueryStatsAdapter | None = None,
    sql: str | None = None,
    rowcount_extractor: Callable[[Any], int | None] | None = None,
    extra_stats: Callable[[Any], QueryStats | None] | None = None,
    post_estimate_fn: Callable[[str, Any], int | None] | None = None,
) -> QueryStats:
    """
    Collect stats from a result object and record them on the executor.

    Either pass a timer (from start_timer) or an explicit duration_ms.
    If no adapter is given, a simple QueryStats with bytes/duration is recorded.
    """
    if duration_ms is None and timer is not None:
        duration_ms = int((perf_counter() - timer.started_at) * 1000)

    stats_adapter = adapter
    if stats_adapter is None and (rowcount_extractor or extra_stats or post_estimate_fn):
        stats_adapter = RowcountStatsAdapter(
            rowcount_extractor=rowcount_extractor,
            extra_stats=extra_stats,
            post_estimate_fn=post_estimate_fn,
            sql=sql,
        )

    if stats_adapter is None:
        stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
    else:
        stats = stats_adapter.collect(
            result, duration_ms=duration_ms, estimated_bytes=estimated_bytes
        )

    self.executor._record_query_stats(stats)
    return stats

PostgresQueryStatsRuntime

Bases: BaseQueryStatsRuntime[QueryStatsExecutor]

Postgres-specific stats helpers.

Source code in src/fastflowtransform/executors/query_stats/runtime/postgres.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class PostgresQueryStatsRuntime(BaseQueryStatsRuntime[QueryStatsExecutor]):
    """Postgres-specific stats helpers."""

    def rowcount_from_result(self, result: Any) -> int | None:
        rc = getattr(result, "rowcount", None)
        if isinstance(rc, int) and rc >= 0:
            return rc
        return None

    def record_dataframe(self, df: pd.DataFrame, duration_ms: int) -> QueryStats:
        rows = len(df)
        bytes_estimate = int(df.memory_usage(deep=True).sum()) if rows > 0 else 0
        bytes_val = bytes_estimate if bytes_estimate > 0 else None
        stats = QueryStats(
            bytes_processed=bytes_val,
            rows=rows if rows > 0 else None,
            duration_ms=duration_ms,
        )
        self.executor._record_query_stats(stats)
        return stats

record_result

record_result(result, *, timer=None, duration_ms=None, estimated_bytes=None, adapter=None, sql=None, rowcount_extractor=None, extra_stats=None, post_estimate_fn=None)

Collect stats from a result object and record them on the executor.

Either pass a timer (from start_timer) or an explicit duration_ms. If no adapter is given, a simple QueryStats with bytes/duration is recorded.

Source code in src/fastflowtransform/executors/query_stats/runtime/base.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def record_result(
    self,
    result: Any,
    *,
    timer: QueryTimer | None = None,
    duration_ms: int | None = None,
    estimated_bytes: int | None = None,
    adapter: QueryStatsAdapter | None = None,
    sql: str | None = None,
    rowcount_extractor: Callable[[Any], int | None] | None = None,
    extra_stats: Callable[[Any], QueryStats | None] | None = None,
    post_estimate_fn: Callable[[str, Any], int | None] | None = None,
) -> QueryStats:
    """
    Collect stats from a result object and record them on the executor.

    Either pass a timer (from start_timer) or an explicit duration_ms.
    If no adapter is given, a simple QueryStats with bytes/duration is recorded.
    """
    if duration_ms is None and timer is not None:
        duration_ms = int((perf_counter() - timer.started_at) * 1000)

    stats_adapter = adapter
    if stats_adapter is None and (rowcount_extractor or extra_stats or post_estimate_fn):
        stats_adapter = RowcountStatsAdapter(
            rowcount_extractor=rowcount_extractor,
            extra_stats=extra_stats,
            post_estimate_fn=post_estimate_fn,
            sql=sql,
        )

    if stats_adapter is None:
        stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
    else:
        stats = stats_adapter.collect(
            result, duration_ms=duration_ms, estimated_bytes=estimated_bytes
        )

    self.executor._record_query_stats(stats)
    return stats

SnowflakeSnowparkQueryStatsRuntime

Bases: BaseQueryStatsRuntime[QueryStatsExecutor]

Snowflake Snowpark stats helpers.

Source code in src/fastflowtransform/executors/query_stats/runtime/snowflake_snowpark.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class SnowflakeSnowparkQueryStatsRuntime(BaseQueryStatsRuntime[QueryStatsExecutor]):
    """Snowflake Snowpark stats helpers."""

    def rowcount_from_result(self, result: Any) -> int | None:
        rc = getattr(result, "rowcount", None)
        if isinstance(rc, int) and rc >= 0:
            return rc
        return None

    def record_dataframe(self, df: Any, duration_ms: int) -> QueryStats:
        budget_runtime = getattr(self.executor, "runtime_budget", None)

        bytes_estimate: int | None = None
        if budget_runtime is not None:
            try:
                bytes_estimate = budget_runtime.dataframe_bytes(df)
            except Exception:
                bytes_estimate = None

        if bytes_estimate is not None and bytes_estimate <= 0:
            bytes_estimate = None

        stats = QueryStats(
            bytes_processed=bytes_estimate,
            rows=None,
            duration_ms=duration_ms,
        )
        self.executor._record_query_stats(stats)
        return stats

record_result

record_result(result, *, timer=None, duration_ms=None, estimated_bytes=None, adapter=None, sql=None, rowcount_extractor=None, extra_stats=None, post_estimate_fn=None)

Collect stats from a result object and record them on the executor.

Either pass a timer (from start_timer) or an explicit duration_ms. If no adapter is given, a simple QueryStats with bytes/duration is recorded.

Source code in src/fastflowtransform/executors/query_stats/runtime/base.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def record_result(
    self,
    result: Any,
    *,
    timer: QueryTimer | None = None,
    duration_ms: int | None = None,
    estimated_bytes: int | None = None,
    adapter: QueryStatsAdapter | None = None,
    sql: str | None = None,
    rowcount_extractor: Callable[[Any], int | None] | None = None,
    extra_stats: Callable[[Any], QueryStats | None] | None = None,
    post_estimate_fn: Callable[[str, Any], int | None] | None = None,
) -> QueryStats:
    """
    Collect stats from a result object and record them on the executor.

    Either pass a timer (from start_timer) or an explicit duration_ms.
    If no adapter is given, a simple QueryStats with bytes/duration is recorded.
    """
    if duration_ms is None and timer is not None:
        duration_ms = int((perf_counter() - timer.started_at) * 1000)

    stats_adapter = adapter
    if stats_adapter is None and (rowcount_extractor or extra_stats or post_estimate_fn):
        stats_adapter = RowcountStatsAdapter(
            rowcount_extractor=rowcount_extractor,
            extra_stats=extra_stats,
            post_estimate_fn=post_estimate_fn,
            sql=sql,
        )

    if stats_adapter is None:
        stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
    else:
        stats = stats_adapter.collect(
            result, duration_ms=duration_ms, estimated_bytes=estimated_bytes
        )

    self.executor._record_query_stats(stats)
    return stats