Skip to content

fastflowtransform.executors.budget.runtime.databricks_spark

DatabricksSparkBudgetRuntime

Bases: BaseBudgetRuntime[DatabricksSparkBudgetExecutor]

Databricks/Spark budget runtime using logical-plan stats for estimation.

Source code in src/fastflowtransform/executors/budget/runtime/databricks_spark.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class DatabricksSparkBudgetRuntime(BaseBudgetRuntime[DatabricksSparkBudgetExecutor]):
    """Databricks/Spark budget runtime using logical-plan stats for estimation."""

    DEFAULT_GUARD = BudgetGuard(
        env_var="FF_SPK_MAX_BYTES",
        estimator_attr="runtime_budget_estimate_query_bytes",
        engine_label="Databricks/Spark",
        what="query",
    )

    def __init__(self, executor: DatabricksSparkBudgetExecutor, guard: BudgetGuard | None = None):
        super().__init__(executor, guard)
        self._default_size: int | None = self.detect_default_size()

    def estimate_query_bytes(self, sql: str) -> int | None:
        return self._spark_plan_bytes(sql)

    def detect_default_size(self) -> int:
        """
        Detect Spark's defaultSizeInBytes sentinel.

        - Prefer spark.sql.defaultSizeInBytes if available.
        - Fall back to Long.MaxValue (2^63 - 1) otherwise.
        """
        try:
            conf_val = self.executor.spark.conf.get("spark.sql.defaultSizeInBytes")
            if conf_val is not None:
                return int(conf_val)
        except Exception:
            # config not set / older Spark / weird environment
            pass

        # Fallback: Spark uses Long.MaxValue by default
        return 2**63 - 1  # 9223372036854775807

    def spark_stats_adapter(self, sql: str) -> SparkDataFrameStatsAdapter:
        """
        Build a SparkDataFrameStatsAdapter tied to this runtime's estimation logic.
        """

        def _bytes(df: Any) -> int | None:
            estimate = self.dataframe_bytes(df)
            if estimate is not None:
                return estimate
            return self.estimate_query_bytes(sql)

        return SparkDataFrameStatsAdapter(_bytes)

    # ---- Shared helpers for Spark stats ----
    def dataframe_bytes(self, df: Any) -> int | None:
        try:
            jdf = getattr(df, "_jdf", None)
            if jdf is None:
                return None

            qe = jdf.queryExecution()
            jplan = qe.optimizedPlan()

            if self._jplan_uses_default_size(jplan):
                return None

            stats = jplan.stats()
            size_attr = getattr(stats, "sizeInBytes", None)
            size_val = size_attr() if callable(size_attr) else size_attr
            return self._parse_spark_stats_size(size_val)
        except Exception:
            return None

    def _spark_plan_bytes(self, sql: str) -> int | None:
        """
        Inspect the optimized logical plan via the JVM and return sizeInBytes
        as an integer, or None if not available. This does not execute the query.
        """
        try:
            normalized = self.executor._selectable_body(sql).rstrip(";\n\t ")
            if not normalized:
                normalized = sql
        except Exception:
            normalized = sql

        stmt = normalized.lstrip().lower()
        if not stmt.startswith(("select", "with")):
            # DDL/DML statements should not be executed twice.
            return None

        try:
            df = self.executor.spark.sql(normalized)

            jdf = getattr(df, "_jdf", None)
            if jdf is None:
                return None

            qe = jdf.queryExecution()
            jplan = qe.optimizedPlan()

            if self._jplan_uses_default_size(jplan):
                return None

            stats = jplan.stats()
            size_attr = getattr(stats, "sizeInBytes", None)
            size_val = size_attr() if callable(size_attr) else size_attr

            return self._parse_spark_stats_size(size_val)
        except Exception:
            return None

    def _jplan_uses_default_size(self, jplan: Any) -> bool:
        """
        Recursively walk a JVM LogicalPlan and return True if any node's
        stats.sizeInBytes equals spark.sql.defaultSizeInBytes.
        """
        spark_default_size = self._default_size
        if spark_default_size is None:
            return False

        try:
            stats = jplan.stats()
            size_val = stats.sizeInBytes()
            size_int = int(str(size_val))
            if size_int == spark_default_size:
                return True
        except Exception:
            # ignore stats errors and keep walking
            pass

        # children() is a Scala Seq[LogicalPlan]; iterate via .size() / .apply(i)
        try:
            children = jplan.children()
            n = children.size()
            for idx in range(n):
                child = children.apply(idx)
                if self._jplan_uses_default_size(child):
                    return True
        except Exception:
            # if we can't inspect children, stop here
            pass

        return False

    def _parse_spark_stats_size(self, size_val: Any) -> int | None:
        if size_val is None:
            return None
        try:
            size_int = int(str(size_val))
        except Exception:
            return None
        return size_int if size_int > 0 else None

detect_default_size

detect_default_size()

Detect Spark's defaultSizeInBytes sentinel.

  • Prefer spark.sql.defaultSizeInBytes if available.
  • Fall back to Long.MaxValue (2^63 - 1) otherwise.
Source code in src/fastflowtransform/executors/budget/runtime/databricks_spark.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def detect_default_size(self) -> int:
    """
    Detect Spark's defaultSizeInBytes sentinel.

    - Prefer spark.sql.defaultSizeInBytes if available.
    - Fall back to Long.MaxValue (2^63 - 1) otherwise.
    """
    try:
        conf_val = self.executor.spark.conf.get("spark.sql.defaultSizeInBytes")
        if conf_val is not None:
            return int(conf_val)
    except Exception:
        # config not set / older Spark / weird environment
        pass

    # Fallback: Spark uses Long.MaxValue by default
    return 2**63 - 1  # 9223372036854775807

spark_stats_adapter

spark_stats_adapter(sql)

Build a SparkDataFrameStatsAdapter tied to this runtime's estimation logic.

Source code in src/fastflowtransform/executors/budget/runtime/databricks_spark.py
51
52
53
54
55
56
57
58
59
60
61
62
def spark_stats_adapter(self, sql: str) -> SparkDataFrameStatsAdapter:
    """
    Build a SparkDataFrameStatsAdapter tied to this runtime's estimation logic.
    """

    def _bytes(df: Any) -> int | None:
        estimate = self.dataframe_bytes(df)
        if estimate is not None:
            return estimate
        return self.estimate_query_bytes(sql)

    return SparkDataFrameStatsAdapter(_bytes)