Skip to content

fastflowtransform.table_formats.base

SparkFormatHandler

Bases: ABC

Abstract base for Spark table format handlers (Delta, Parquet, Iceberg, ...).

Responsibilities
  • Saving a DataFrame as a managed table.
  • Incremental INSERT semantics.
  • Optional incremental MERGE semantics (can raise NotImplementedError).

This is intentionally minimal so that engines (DatabricksSparkExecutor) can: - Delegate managed table handling to the handler. - Still implement engine-level fallbacks for merge semantics.

Source code in src/fastflowtransform/table_formats/base.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class SparkFormatHandler(ABC):
    """
    Abstract base for Spark table format handlers (Delta, Parquet, Iceberg, ...).

    Responsibilities:
      - Saving a DataFrame as a managed table.
      - Incremental INSERT semantics.
      - Optional incremental MERGE semantics (can raise NotImplementedError).

    This is intentionally minimal so that engines (DatabricksSparkExecutor)
    can:
      - Delegate managed table handling to the handler.
      - Still implement engine-level fallbacks for merge semantics.
    """

    def __init__(
        self,
        spark: SparkSession,
        *,
        table_format: str | None = None,
        table_options: dict[str, Any] | None = None,
        sql_runner: Callable[[str], Any] | None = None,
    ) -> None:
        self.spark = spark
        self.table_format: str | None = (table_format or "").lower() or None
        # Normalize options to strings for Spark
        self.table_options: dict[str, str] = {
            str(k): str(v) for k, v in (table_options or {}).items()
        }

        # central hook for executing SQL (can be engine-guarded)
        self._sql_runner: Callable[[str], Any] = sql_runner or spark.sql

    # ---- SQL helper ----
    def run_sql(self, sql: str) -> Any:
        """Execute SQL via the injected runner (guardable in the executor)."""
        return self._sql_runner(sql)

    # ---- Identifier helpers ----
    def qualify_identifier(self, table_name: str, *, database: str | None = None) -> str:
        """Return the physical table identifier for Spark APIs (unquoted)."""
        return (table_name or "").strip()

    def format_identifier_for_sql(self, table_name: str, *, database: str | None = None) -> str:
        """Return a SQL-safe identifier (per-part quoted) for the table."""
        ident = self.qualify_identifier(table_name, database=database)
        parts = [p for p in ident.split(".") if p]
        if not parts:
            return self._quote_part(ident)
        return ".".join(self._quote_part(part) for part in parts)

    def format_test_table(
        self, table_name: str | None, *, database: str | None = None
    ) -> str | None:
        if table_name is None:
            return None
        return self.format_identifier_for_sql(table_name, database=database)

    def allows_unmanaged_paths(self) -> bool:
        """Whether storage.path overrides should be honored for this format."""
        return True

    def relation_exists(self, table_name: str, *, database: str | None = None) -> bool:
        ident = self.qualify_identifier(table_name, database=database)
        try:
            return bool(self.spark.catalog.tableExists(ident))
        except Exception:
            return False

    @staticmethod
    def _quote_part(value: str) -> str:
        inner = (value or "").replace("`", "``")
        return f"`{inner}`"

    # ---- Required API ----
    @abstractmethod
    def save_df_as_table(self, table_name: str, df: SDF) -> None:
        """
        Save the given DataFrame as a (managed) table.

        The input name is the *fully-qualified* identifier Spark should use,
        e.g. "db.table" or just "table".
        """
        raise NotImplementedError

    # ---- Optional / defaulted API ----
    def incremental_insert(self, table_name: str, select_body_sql: str) -> None:
        """
        Default incremental INSERT implementation, format-agnostic.

        `select_body_sql` must be a *SELECT-able* body (no trailing semicolon),
        e.g. "SELECT ... FROM ...".
        """
        body = select_body_sql.strip().rstrip(";\n\t ")
        if not body.lower().startswith("select"):
            # This is a guard; DatabricksSparkExecutor uses _selectable_body already.
            raise ValueError(f"incremental_insert expects SELECT body, got: {body[:40]!r}")
        self.run_sql(f"INSERT INTO {table_name} {body}")

    def incremental_merge(
        self,
        table_name: str,
        select_body_sql: str,
        unique_key: list[str],
    ) -> None:
        """
        Optional: incremental MERGE semantics (UPSERT-like).
        Subclasses may override this. Default: not supported.

        Engines using this handler MUST be prepared to handle NotImplementedError
        and fall back to a more generic strategy.
        """
        raise NotImplementedError(
            f"incremental_merge is not implemented for format '{self.table_format or 'default'}'"
        )

run_sql

run_sql(sql)

Execute SQL via the injected runner (guardable in the executor).

Source code in src/fastflowtransform/table_formats/base.py
45
46
47
def run_sql(self, sql: str) -> Any:
    """Execute SQL via the injected runner (guardable in the executor)."""
    return self._sql_runner(sql)

qualify_identifier

qualify_identifier(table_name, *, database=None)

Return the physical table identifier for Spark APIs (unquoted).

Source code in src/fastflowtransform/table_formats/base.py
50
51
52
def qualify_identifier(self, table_name: str, *, database: str | None = None) -> str:
    """Return the physical table identifier for Spark APIs (unquoted)."""
    return (table_name or "").strip()

format_identifier_for_sql

format_identifier_for_sql(table_name, *, database=None)

Return a SQL-safe identifier (per-part quoted) for the table.

Source code in src/fastflowtransform/table_formats/base.py
54
55
56
57
58
59
60
def format_identifier_for_sql(self, table_name: str, *, database: str | None = None) -> str:
    """Return a SQL-safe identifier (per-part quoted) for the table."""
    ident = self.qualify_identifier(table_name, database=database)
    parts = [p for p in ident.split(".") if p]
    if not parts:
        return self._quote_part(ident)
    return ".".join(self._quote_part(part) for part in parts)

allows_unmanaged_paths

allows_unmanaged_paths()

Whether storage.path overrides should be honored for this format.

Source code in src/fastflowtransform/table_formats/base.py
69
70
71
def allows_unmanaged_paths(self) -> bool:
    """Whether storage.path overrides should be honored for this format."""
    return True

save_df_as_table abstractmethod

save_df_as_table(table_name, df)

Save the given DataFrame as a (managed) table.

The input name is the fully-qualified identifier Spark should use, e.g. "db.table" or just "table".

Source code in src/fastflowtransform/table_formats/base.py
86
87
88
89
90
91
92
93
94
@abstractmethod
def save_df_as_table(self, table_name: str, df: SDF) -> None:
    """
    Save the given DataFrame as a (managed) table.

    The input name is the *fully-qualified* identifier Spark should use,
    e.g. "db.table" or just "table".
    """
    raise NotImplementedError

incremental_insert

incremental_insert(table_name, select_body_sql)

Default incremental INSERT implementation, format-agnostic.

select_body_sql must be a SELECT-able body (no trailing semicolon), e.g. "SELECT ... FROM ...".

Source code in src/fastflowtransform/table_formats/base.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
def incremental_insert(self, table_name: str, select_body_sql: str) -> None:
    """
    Default incremental INSERT implementation, format-agnostic.

    `select_body_sql` must be a *SELECT-able* body (no trailing semicolon),
    e.g. "SELECT ... FROM ...".
    """
    body = select_body_sql.strip().rstrip(";\n\t ")
    if not body.lower().startswith("select"):
        # This is a guard; DatabricksSparkExecutor uses _selectable_body already.
        raise ValueError(f"incremental_insert expects SELECT body, got: {body[:40]!r}")
    self.run_sql(f"INSERT INTO {table_name} {body}")

incremental_merge

incremental_merge(table_name, select_body_sql, unique_key)

Optional: incremental MERGE semantics (UPSERT-like). Subclasses may override this. Default: not supported.

Engines using this handler MUST be prepared to handle NotImplementedError and fall back to a more generic strategy.

Source code in src/fastflowtransform/table_formats/base.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def incremental_merge(
    self,
    table_name: str,
    select_body_sql: str,
    unique_key: list[str],
) -> None:
    """
    Optional: incremental MERGE semantics (UPSERT-like).
    Subclasses may override this. Default: not supported.

    Engines using this handler MUST be prepared to handle NotImplementedError
    and fall back to a more generic strategy.
    """
    raise NotImplementedError(
        f"incremental_merge is not implemented for format '{self.table_format or 'default'}'"
    )