Skip to content

fastflowtransform.executors.budget

BudgetGuard

Shared implementation for per-query budget enforcement.

Source code in src/fastflowtransform/executors/budget.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class BudgetGuard:
    """
    Shared implementation for per-query budget enforcement.
    """

    def __init__(
        self,
        *,
        env_var: str | None,
        estimator_attr: str,
        engine_label: str,
        what: str = "query",
    ):
        self.env_var = env_var
        self.estimator_attr = estimator_attr
        self.engine_label = engine_label
        self.what = what
        self._env_limit: int | None = None
        self._env_limit_populated = False

    def _env_limit_value(self) -> int | None:
        if not self.env_var:
            return None
        if not self._env_limit_populated:
            self._env_limit = parse_max_bytes_env(self.env_var)
            self._env_limit_populated = True
        return self._env_limit

    def resolve_limit(self, override_limit: int | None = None) -> tuple[int | None, str | None]:
        env_limit = self._env_limit_value()
        if env_limit is not None and env_limit > 0:
            source = f"{self.env_var}" if self.env_var else "environment"
            return env_limit, source
        if override_limit is None:
            return None, None
        try:
            limit = int(override_limit)
        except Exception:
            return None, None
        if limit <= 0:
            return None, None
        return limit, "budgets.yml (query_limits)"

    def enforce(self, sql: str, executor: Any, *, limit: int, source: str | None) -> int | None:
        estimator_obj = getattr(executor, self.estimator_attr, None)

        if not callable(estimator_obj):
            echo(
                f"{self.engine_label} cost guard misconfigured: "
                f"missing estimator '{self.estimator_attr}'. Guard ignored."
            )
            return None

        estimator = cast(EstimatorFn, estimator_obj)

        try:
            estimated = estimator(sql)
        except Exception as exc:
            echo(
                f"{self.engine_label} cost estimation failed "
                f"(limit ignored for this {self.what}): {exc}"
            )
            return None

        if estimated is None:
            return None

        value = estimated

        if value <= 0:
            return None

        if value > limit:
            label = source or "configured limit"
            msg = (
                f"Aborting {self.engine_label} {self.what}: estimated scanned bytes "
                f"{value} ({format_bytes(value)}) exceed "
                f"{label}={limit} ({format_bytes(limit)}).\n"
                f"Adjust {label} to allow this {self.what}."
            )
            raise RuntimeError(msg)

        return value

parse_max_bytes_env

parse_max_bytes_env(env_var)

Parse an env var like FF_BQ_MAX_BYTES into an integer byte count.

Source code in src/fastflowtransform/executors/budget.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def parse_max_bytes_env(env_var: str) -> int | None:
    """
    Parse an env var like FF_BQ_MAX_BYTES into an integer byte count.
    """
    raw = os.getenv(env_var)
    if not raw:
        return None

    text = raw.strip().lower().replace("_", "").replace(",", "")
    if not text:
        return None

    multiplier = 1
    suffixes = [
        ("tb", 1024**4),
        ("t", 1024**4),
        ("gb", 1024**3),
        ("g", 1024**3),
        ("mb", 1024**2),
        ("m", 1024**2),
        ("kb", 1024),
        ("k", 1024),
    ]
    for suf, factor in suffixes:
        if text.endswith(suf):
            text = text[: -len(suf)].strip()
            multiplier = factor
            break

    try:
        value = float(text)
    except ValueError:
        echo(
            f"Warning: invalid {env_var}={raw!r}; expected integer bytes or "
            "a number with unit suffix like '10GB'. Ignoring limit."
        )
        return None

    bytes_val = int(value * multiplier)
    if bytes_val <= 0:
        return None
    return bytes_val

format_bytes

format_bytes(num)

Human-readable byte formatting for error messages.

Source code in src/fastflowtransform/executors/budget.py
54
55
56
57
58
59
60
61
62
63
64
def format_bytes(num: int) -> str:
    """Human-readable byte formatting for error messages."""
    units = ["B", "KB", "MB", "GB", "TB", "PB"]
    size = float(num)
    for unit in units:
        if size < 1024.0 or unit == units[-1]:
            if size >= 10:
                return f"{size:.1f} {unit}"
            return f"{size:.2g} {unit}"
        size /= 1024.0
    return f"{num} B"