Skip to content

fastflowtransform.table_formats.spark_default

DefaultSparkFormatHandler

Bases: SparkFormatHandler

Default Spark format handler for non-Delta managed tables (e.g. Parquet, ORC, generic catalog tables).

Responsibilities
  • save_df_as_table() using DataFrameWriter.saveAsTable.
  • incremental_insert() uses the base implementation (INSERT INTO ...).
  • incremental_merge() is intentionally NOT implemented and is expected to be handled by the executor via a generic fallback.
Source code in src/fastflowtransform/table_formats/spark_default.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class DefaultSparkFormatHandler(SparkFormatHandler):
    """
    Default Spark format handler for non-Delta managed tables
    (e.g. Parquet, ORC, generic catalog tables).

    Responsibilities:
      - save_df_as_table() using DataFrameWriter.saveAsTable.
      - incremental_insert() uses the base implementation (INSERT INTO ...).
      - incremental_merge() is intentionally NOT implemented and is expected
        to be handled by the executor via a generic fallback.
    """

    def __init__(
        self,
        spark: SparkSession,
        *,
        table_format: str | None = None,
        table_options: dict[str, Any] | None = None,
        sql_runner: Callable[[str], Any] | None = None,
    ) -> None:
        super().__init__(
            spark,
            table_format=table_format,
            table_options=table_options,
            sql_runner=sql_runner,
        )

    def save_df_as_table(self, table_name: str, df: SDF) -> None:
        """
        Save DataFrame as a managed table using Spark's built-in formats.

        - Overwrites the table content.
        - Uses self.table_format (if provided) as the writer format.
        - Applies self.table_options as writer options.
        """
        writer = df.write.mode("overwrite")

        if self.table_format:
            writer = writer.format(self.table_format)

        if self.table_options:
            writer = writer.options(**self.table_options)

        writer.saveAsTable(table_name)

save_df_as_table

save_df_as_table(table_name, df)

Save DataFrame as a managed table using Spark's built-in formats.

  • Overwrites the table content.
  • Uses self.table_format (if provided) as the writer format.
  • Applies self.table_options as writer options.
Source code in src/fastflowtransform/table_formats/spark_default.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def save_df_as_table(self, table_name: str, df: SDF) -> None:
    """
    Save DataFrame as a managed table using Spark's built-in formats.

    - Overwrites the table content.
    - Uses self.table_format (if provided) as the writer format.
    - Applies self.table_options as writer options.
    """
    writer = df.write.mode("overwrite")

    if self.table_format:
        writer = writer.format(self.table_format)

    if self.table_options:
        writer = writer.options(**self.table_options)

    writer.saveAsTable(table_name)

run_sql

run_sql(sql)

Execute SQL via the injected runner (guardable in the executor).

Source code in src/fastflowtransform/table_formats/base.py
45
46
47
def run_sql(self, sql: str) -> Any:
    """Execute SQL via the injected runner (guardable in the executor)."""
    return self._sql_runner(sql)

qualify_identifier

qualify_identifier(table_name, *, database=None)

Return the physical table identifier for Spark APIs (unquoted).

Source code in src/fastflowtransform/table_formats/base.py
50
51
52
def qualify_identifier(self, table_name: str, *, database: str | None = None) -> str:
    """Return the physical table identifier for Spark APIs (unquoted)."""
    return (table_name or "").strip()

format_identifier_for_sql

format_identifier_for_sql(table_name, *, database=None)

Return a SQL-safe identifier (per-part quoted) for the table.

Source code in src/fastflowtransform/table_formats/base.py
54
55
56
57
58
59
60
def format_identifier_for_sql(self, table_name: str, *, database: str | None = None) -> str:
    """Return a SQL-safe identifier (per-part quoted) for the table."""
    ident = self.qualify_identifier(table_name, database=database)
    parts = [p for p in ident.split(".") if p]
    if not parts:
        return self._quote_part(ident)
    return ".".join(self._quote_part(part) for part in parts)

allows_unmanaged_paths

allows_unmanaged_paths()

Whether storage.path overrides should be honored for this format.

Source code in src/fastflowtransform/table_formats/base.py
69
70
71
def allows_unmanaged_paths(self) -> bool:
    """Whether storage.path overrides should be honored for this format."""
    return True

incremental_insert

incremental_insert(table_name, select_body_sql)

Default incremental INSERT implementation, format-agnostic.

select_body_sql must be a SELECT-able body (no trailing semicolon), e.g. "SELECT ... FROM ...".

Source code in src/fastflowtransform/table_formats/base.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
def incremental_insert(self, table_name: str, select_body_sql: str) -> None:
    """
    Default incremental INSERT implementation, format-agnostic.

    `select_body_sql` must be a *SELECT-able* body (no trailing semicolon),
    e.g. "SELECT ... FROM ...".
    """
    body = select_body_sql.strip().rstrip(";\n\t ")
    if not body.lower().startswith("select"):
        # This is a guard; DatabricksSparkExecutor uses _selectable_body already.
        raise ValueError(f"incremental_insert expects SELECT body, got: {body[:40]!r}")
    self.run_sql(f"INSERT INTO {table_name} {body}")

incremental_merge

incremental_merge(table_name, select_body_sql, unique_key)

Optional: incremental MERGE semantics (UPSERT-like). Subclasses may override this. Default: not supported.

Engines using this handler MUST be prepared to handle NotImplementedError and fall back to a more generic strategy.

Source code in src/fastflowtransform/table_formats/base.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def incremental_merge(
    self,
    table_name: str,
    select_body_sql: str,
    unique_key: list[str],
) -> None:
    """
    Optional: incremental MERGE semantics (UPSERT-like).
    Subclasses may override this. Default: not supported.

    Engines using this handler MUST be prepared to handle NotImplementedError
    and fall back to a more generic strategy.
    """
    raise NotImplementedError(
        f"incremental_merge is not implemented for format '{self.table_format or 'default'}'"
    )