Skip to content

fastflowtransform.executors.budget.runtime.postgres

PostgresBudgetRuntime

Bases: BaseBudgetRuntime[PostgresBudgetExecutor]

Postgres-specific budget runtime with EXPLAIN-based estimation.

Source code in src/fastflowtransform/executors/budget/runtime/postgres.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class PostgresBudgetRuntime(BaseBudgetRuntime[PostgresBudgetExecutor]):
    """Postgres-specific budget runtime with EXPLAIN-based estimation."""

    DEFAULT_GUARD = BudgetGuard(
        env_var="FF_PG_MAX_BYTES",
        estimator_attr="_estimate_query_bytes",
        engine_label="Postgres",
        what="query",
    )

    _DEFAULT_PG_ROW_WIDTH = 128

    def __init__(self, executor: PostgresBudgetExecutor, guard: BudgetGuard | None = None):
        super().__init__(executor, guard)

    def estimate_query_bytes(self, sql: str) -> int | None:
        body = self.executor._extract_select_like(sql)
        lower = body.lstrip().lower()
        if not lower.startswith(("select", "with")):
            return None

        explain_sql = f"EXPLAIN (FORMAT JSON) {body}"

        try:
            raw = self.executor._execute_sql_maintenance(explain_sql, set_search_path=False)
        except Exception:
            return None

        if raw is None:
            return None

        try:
            data = json.loads(raw)
        except Exception:
            data = raw

        # Postgres JSON format: list with a single object
        if isinstance(data, list) and data:
            root = data[0]
        elif isinstance(data, dict):
            root = data
        else:
            return None

        plan = root.get("Plan")
        if not isinstance(plan, dict):
            if isinstance(root, dict) and "Node Type" in root:
                plan = root
            else:
                return None

        return self._estimate_bytes_from_plan(plan)

    def _estimate_bytes_from_plan(self, plan: dict[str, Any]) -> int | None:
        def _to_int(node: dict[str, Any], keys: tuple[str, ...]) -> int | None:
            for key in keys:
                val = node.get(key)
                if val is None:
                    continue
                try:
                    return int(val)
                except (TypeError, ValueError):
                    continue
            return None

        rows = _to_int(plan, ("Plan Rows", "Plan_Rows", "Rows"))
        width = _to_int(plan, ("Plan Width", "Plan_Width", "Width"))

        if rows is None and width is None:
            return None

        candidate: int | None

        if rows is not None and width is not None:
            candidate = rows * width
        elif rows is not None:
            candidate = rows * self._DEFAULT_PG_ROW_WIDTH
        else:
            candidate = width

        if candidate is None or candidate <= 0:
            return None

        return int(candidate)