Bases: BaseBudgetRuntime[DatabricksSparkBudgetExecutor]
Databricks/Spark budget runtime using logical-plan stats for estimation.
Source code in src/fastflowtransform/executors/budget/runtime/databricks_spark.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 | class DatabricksSparkBudgetRuntime(BaseBudgetRuntime[DatabricksSparkBudgetExecutor]):
"""Databricks/Spark budget runtime using logical-plan stats for estimation."""
DEFAULT_GUARD = BudgetGuard(
env_var="FF_SPK_MAX_BYTES",
estimator_attr="runtime_budget_estimate_query_bytes",
engine_label="Databricks/Spark",
what="query",
)
def __init__(self, executor: DatabricksSparkBudgetExecutor, guard: BudgetGuard | None = None):
super().__init__(executor, guard)
self._default_size: int | None = self.detect_default_size()
def estimate_query_bytes(self, sql: str) -> int | None:
return self._spark_plan_bytes(sql)
def detect_default_size(self) -> int:
"""
Detect Spark's defaultSizeInBytes sentinel.
- Prefer spark.sql.defaultSizeInBytes if available.
- Fall back to Long.MaxValue (2^63 - 1) otherwise.
"""
try:
conf_val = self.executor.spark.conf.get("spark.sql.defaultSizeInBytes")
if conf_val is not None:
return int(conf_val)
except Exception:
# config not set / older Spark / weird environment
pass
# Fallback: Spark uses Long.MaxValue by default
return 2**63 - 1 # 9223372036854775807
def spark_stats_adapter(self, sql: str) -> SparkDataFrameStatsAdapter:
"""
Build a SparkDataFrameStatsAdapter tied to this runtime's estimation logic.
"""
def _bytes(df: Any) -> int | None:
estimate = self.dataframe_bytes(df)
if estimate is not None:
return estimate
return self.estimate_query_bytes(sql)
return SparkDataFrameStatsAdapter(_bytes)
# ---- Shared helpers for Spark stats ----
def dataframe_bytes(self, df: Any) -> int | None:
try:
jdf = getattr(df, "_jdf", None)
if jdf is None:
return None
qe = jdf.queryExecution()
jplan = qe.optimizedPlan()
if self._jplan_uses_default_size(jplan):
return None
stats = jplan.stats()
size_attr = getattr(stats, "sizeInBytes", None)
size_val = size_attr() if callable(size_attr) else size_attr
return self._parse_spark_stats_size(size_val)
except Exception:
return None
def _spark_plan_bytes(self, sql: str) -> int | None:
"""
Inspect the optimized logical plan via the JVM and return sizeInBytes
as an integer, or None if not available. This does not execute the query.
"""
try:
normalized = self.executor._selectable_body(sql).rstrip(";\n\t ")
if not normalized:
normalized = sql
except Exception:
normalized = sql
stmt = normalized.lstrip().lower()
if not stmt.startswith(("select", "with")):
# DDL/DML statements should not be executed twice.
return None
try:
df = self.executor.spark.sql(normalized)
jdf = getattr(df, "_jdf", None)
if jdf is None:
return None
qe = jdf.queryExecution()
jplan = qe.optimizedPlan()
if self._jplan_uses_default_size(jplan):
return None
stats = jplan.stats()
size_attr = getattr(stats, "sizeInBytes", None)
size_val = size_attr() if callable(size_attr) else size_attr
return self._parse_spark_stats_size(size_val)
except Exception:
return None
def _jplan_uses_default_size(self, jplan: Any) -> bool:
"""
Recursively walk a JVM LogicalPlan and return True if any node's
stats.sizeInBytes equals spark.sql.defaultSizeInBytes.
"""
spark_default_size = self._default_size
if spark_default_size is None:
return False
try:
stats = jplan.stats()
size_val = stats.sizeInBytes()
size_int = int(str(size_val))
if size_int == spark_default_size:
return True
except Exception:
# ignore stats errors and keep walking
pass
# children() is a Scala Seq[LogicalPlan]; iterate via .size() / .apply(i)
try:
children = jplan.children()
n = children.size()
for idx in range(n):
child = children.apply(idx)
if self._jplan_uses_default_size(child):
return True
except Exception:
# if we can't inspect children, stop here
pass
return False
def _parse_spark_stats_size(self, size_val: Any) -> int | None:
if size_val is None:
return None
try:
size_int = int(str(size_val))
except Exception:
return None
return size_int if size_int > 0 else None
|
Detect Spark's defaultSizeInBytes sentinel.
- Prefer spark.sql.defaultSizeInBytes if available.
- Fall back to Long.MaxValue (2^63 - 1) otherwise.
Source code in src/fastflowtransform/executors/budget/runtime/databricks_spark.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 | def detect_default_size(self) -> int:
"""
Detect Spark's defaultSizeInBytes sentinel.
- Prefer spark.sql.defaultSizeInBytes if available.
- Fall back to Long.MaxValue (2^63 - 1) otherwise.
"""
try:
conf_val = self.executor.spark.conf.get("spark.sql.defaultSizeInBytes")
if conf_val is not None:
return int(conf_val)
except Exception:
# config not set / older Spark / weird environment
pass
# Fallback: Spark uses Long.MaxValue by default
return 2**63 - 1 # 9223372036854775807
|
Build a SparkDataFrameStatsAdapter tied to this runtime's estimation logic.
Source code in src/fastflowtransform/executors/budget/runtime/databricks_spark.py
51
52
53
54
55
56
57
58
59
60
61
62 | def spark_stats_adapter(self, sql: str) -> SparkDataFrameStatsAdapter:
"""
Build a SparkDataFrameStatsAdapter tied to this runtime's estimation logic.
"""
def _bytes(df: Any) -> int | None:
estimate = self.dataframe_bytes(df)
if estimate is not None:
return estimate
return self.estimate_query_bytes(sql)
return SparkDataFrameStatsAdapter(_bytes)
|