Skip to content

fastflowtransform.executors._query_stats_adapter

QueryStatsAdapter

Bases: Protocol

Adapter interface to normalize stats extraction across engines.

Source code in src/fastflowtransform/executors/_query_stats_adapter.py
10
11
12
13
14
15
class QueryStatsAdapter(Protocol):
    """Adapter interface to normalize stats extraction across engines."""

    def collect(
        self, result: Any, *, duration_ms: int | None, estimated_bytes: int | None
    ) -> QueryStats: ...

RowcountStatsAdapter

Default stats adapter for DB-API style executors that expose rowcount. Preserves existing post_estimate/extra_stats hook behavior.

Source code in src/fastflowtransform/executors/_query_stats_adapter.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class RowcountStatsAdapter:
    """
    Default stats adapter for DB-API style executors that expose rowcount.
    Preserves existing post_estimate/extra_stats hook behavior.
    """

    def __init__(
        self,
        *,
        rowcount_extractor: Callable[[Any], int | None] | None = None,
        post_estimate_fn: Callable[[str, Any], int | None] | None = None,
        extra_stats: Callable[[Any], QueryStats | None] | None = None,
        sql: str | None = None,
    ) -> None:
        self.rowcount_extractor = rowcount_extractor
        self.post_estimate_fn = post_estimate_fn
        self.extra_stats = extra_stats
        self.sql = sql

    def collect(
        self, result: Any, *, duration_ms: int | None, estimated_bytes: int | None
    ) -> QueryStats:
        rows: int | None = None
        if self.rowcount_extractor is not None:
            try:
                rows = self.rowcount_extractor(result)
            except Exception:
                rows = None

        stats = QueryStats(bytes_processed=estimated_bytes, rows=rows, duration_ms=duration_ms)

        if stats.bytes_processed is None and self.post_estimate_fn is not None:
            try:
                post_estimate = self.post_estimate_fn(self.sql or "", result)
            except Exception:
                post_estimate = None
            if post_estimate is not None:
                stats = QueryStats(
                    bytes_processed=post_estimate,
                    rows=stats.rows,
                    duration_ms=stats.duration_ms,
                )

        if self.extra_stats is not None:
            try:
                extra = self.extra_stats(result)
            except Exception:
                extra = None
            if extra:
                stats = QueryStats(
                    bytes_processed=extra.bytes_processed
                    if extra.bytes_processed is not None
                    else stats.bytes_processed,
                    rows=extra.rows if extra.rows is not None else stats.rows,
                    duration_ms=(
                        extra.duration_ms if extra.duration_ms is not None else stats.duration_ms
                    ),
                )

        return stats

JobStatsAdapter

Generic job-handle stats extractor (BigQuery/Snowflake/Spark-like objects). Mirrors the previous _record_query_job_stats heuristics.

Source code in src/fastflowtransform/executors/_query_stats_adapter.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class JobStatsAdapter:
    """
    Generic job-handle stats extractor (BigQuery/Snowflake/Spark-like objects).
    Mirrors the previous `_record_query_job_stats` heuristics.
    """

    def collect(self, job: Any) -> QueryStats:
        def _safe_int(val: Any) -> int | None:
            try:
                if val is None:
                    return None
                return int(val)
            except Exception:
                return None

        bytes_processed = _safe_int(
            getattr(job, "total_bytes_processed", None) or getattr(job, "bytes_processed", None)
        )

        rows = _safe_int(
            getattr(job, "num_dml_affected_rows", None)
            or getattr(job, "total_rows", None)
            or getattr(job, "rowcount", None)
        )

        duration_ms: int | None = None
        try:
            started = getattr(job, "started", None)
            ended = getattr(job, "ended", None)
            if started is not None and ended is not None:
                duration_ms = int((ended - started).total_seconds() * 1000)
        except Exception:
            duration_ms = None

        return QueryStats(bytes_processed=bytes_processed, rows=rows, duration_ms=duration_ms)

SparkDataFrameStatsAdapter

Adapter for Spark DataFrames that mirrors existing Databricks behaviour: - bytes via provided bytes_fn (plan-based best effort) - rows left as None - duration passed through

Source code in src/fastflowtransform/executors/_query_stats_adapter.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class SparkDataFrameStatsAdapter:
    """
    Adapter for Spark DataFrames that mirrors existing Databricks behaviour:
    - bytes via provided bytes_fn (plan-based best effort)
    - rows left as None
    - duration passed through
    """

    def __init__(self, bytes_fn: Callable[[Any], int | None]) -> None:
        self.bytes_fn = bytes_fn

    def collect(
        self, df: Any, *, duration_ms: int | None, estimated_bytes: int | None = None
    ) -> QueryStats:
        bytes_val = estimated_bytes
        if bytes_val is None:
            try:
                bytes_val = self.bytes_fn(df)
            except Exception:
                bytes_val = None

        return QueryStats(
            bytes_processed=bytes_val if bytes_val is not None and bytes_val > 0 else None,
            rows=None,
            duration_ms=duration_ms,
        )