Skip to content

fastflowtransform.executors.query_stats.runtime.databricks_spark

DatabricksSparkQueryStatsRuntime

Bases: BaseQueryStatsRuntime[QueryStatsExecutor]

Spark-specific stats helpers.

Source code in src/fastflowtransform/executors/query_stats/runtime/databricks_spark.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class DatabricksSparkQueryStatsRuntime(BaseQueryStatsRuntime[QueryStatsExecutor]):
    """Spark-specific stats helpers."""

    def rowcount_from_result(self, result: Any) -> int | None:
        # Avoid triggering extra Spark actions; rely on estimates instead.
        rc = getattr(result, "count", None)
        if isinstance(rc, int) and rc >= 0:
            return rc
        return None

    def record_dataframe(self, df: Any, duration_ms: int) -> QueryStats:
        budget_runtime = getattr(self.executor, "runtime_budget", None)
        adapter = (
            budget_runtime.spark_stats_adapter("<dataframe>")
            if budget_runtime is not None
            else SparkDataFrameStatsAdapter(lambda _: None)
        )
        stats = adapter.collect(df, duration_ms=duration_ms, estimated_bytes=None)
        self.executor._record_query_stats(stats)
        return stats

record_result

record_result(result, *, timer=None, duration_ms=None, estimated_bytes=None, adapter=None, sql=None, rowcount_extractor=None, extra_stats=None, post_estimate_fn=None)

Collect stats from a result object and record them on the executor.

Either pass a timer (from start_timer) or an explicit duration_ms. If no adapter is given, a simple QueryStats with bytes/duration is recorded.

Source code in src/fastflowtransform/executors/query_stats/runtime/base.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def record_result(
    self,
    result: Any,
    *,
    timer: QueryTimer | None = None,
    duration_ms: int | None = None,
    estimated_bytes: int | None = None,
    adapter: QueryStatsAdapter | None = None,
    sql: str | None = None,
    rowcount_extractor: Callable[[Any], int | None] | None = None,
    extra_stats: Callable[[Any], QueryStats | None] | None = None,
    post_estimate_fn: Callable[[str, Any], int | None] | None = None,
) -> QueryStats:
    """
    Collect stats from a result object and record them on the executor.

    Either pass a timer (from start_timer) or an explicit duration_ms.
    If no adapter is given, a simple QueryStats with bytes/duration is recorded.
    """
    if duration_ms is None and timer is not None:
        duration_ms = int((perf_counter() - timer.started_at) * 1000)

    stats_adapter = adapter
    if stats_adapter is None and (rowcount_extractor or extra_stats or post_estimate_fn):
        stats_adapter = RowcountStatsAdapter(
            rowcount_extractor=rowcount_extractor,
            extra_stats=extra_stats,
            post_estimate_fn=post_estimate_fn,
            sql=sql,
        )

    if stats_adapter is None:
        stats = QueryStats(bytes_processed=estimated_bytes, rows=None, duration_ms=duration_ms)
    else:
        stats = stats_adapter.collect(
            result, duration_ms=duration_ms, estimated_bytes=estimated_bytes
        )

    self.executor._record_query_stats(stats)
    return stats