Skip to content

fastflowtransform.table_formats.spark_hudi

HudiFormatHandler

Bases: SparkFormatHandler

Hudi format handler using Spark's Hudi integration.

Responsibilities
  • save_df_as_table() via df.write.format("hudi").saveAsTable(...)
  • incremental_insert(): INSERT INTO ... SELECT ...
  • incremental_merge(): MERGE INTO ... USING (...) WHEN MATCHED/NOT MATCHED ... (Hudi's Spark MERGE support must be enabled in the cluster).
Source code in src/fastflowtransform/table_formats/spark_hudi.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class HudiFormatHandler(SparkFormatHandler):
    """
    Hudi format handler using Spark's Hudi integration.

    Responsibilities:
      - save_df_as_table() via df.write.format("hudi").saveAsTable(...)
      - incremental_insert(): INSERT INTO ... SELECT ...
      - incremental_merge(): MERGE INTO ... USING (...) WHEN MATCHED/NOT MATCHED ...
        (Hudi's Spark MERGE support must be enabled in the cluster).
    """

    def __init__(
        self,
        spark: SparkSession,
        *,
        default_database: str | None = None,
        table_options: dict[str, Any] | None = None,
        sql_runner: Callable[[str], Any] | None = None,
    ) -> None:
        # table_format="hudi" so the base class knows what we're dealing with
        super().__init__(
            spark,
            table_format="hudi",
            table_options=table_options or {},
            sql_runner=sql_runner,
        )
        self.default_database = default_database or spark.catalog.currentDatabase()

    # ---------- Core helpers ----------
    def _qualify_table_name(self, table_name: str, database: str | None = None) -> str:
        """
        Normalize input like "seed_events" or "db.seed_events" to "db.seed_events".

        For Hudi we normally rely on the current Spark catalog / Hive metastore,
        so there is no extra "catalog." prefix like in Iceberg.
        """
        raw = (table_name or "").strip()
        if not raw:
            raise ValueError("Empty table name for HudiFormatHandler")

        parts = [p for p in raw.split(".") if p]
        if len(parts) == 1:
            db = database or self.default_database
            return ".".join([db, parts[0]])
        # already db.table or catalog.db.table - just pass through
        if len(parts) == 2:
            return ".".join(parts)
        return ".".join(parts)

    # ---------- Identifier overrides ----------
    def qualify_identifier(self, table_name: str, *, database: str | None = None) -> str:
        # For Spark SQL we just use db.table, no extra quoting here - the caller
        # can quote if needed.
        return self._qualify_table_name(table_name, database=database)

    def allows_unmanaged_paths(self) -> bool:
        # Hudi can work as a path-based table as well, so we allow that.
        # (Your higher-level executor can still decide whether to use paths.)
        return True

    def relation_exists(self, table_name: str, *, database: str | None = None) -> bool:
        ident = self.qualify_identifier(table_name, database=database)
        try:
            return self.spark.catalog.tableExists(ident)
        except Exception:
            return False

    # ---------- Required API ----------
    def save_df_as_table(self, table_name: str, df: SDF) -> None:
        """
        Save DataFrame as a Hudi table registered in the current catalog.

        Typical Hudi options you might pass via table_options include:
          - hoodie.datasource.write.recordkey.field
          - hoodie.datasource.write.precombine.field
          - hoodie.table.name (optional when using saveAsTable)
        """
        full_name = self._qualify_table_name(table_name)

        writer = df.write.format("hudi")
        for k, v in self.table_options.items():
            writer = writer.option(str(k), str(v))

        # Full refresh semantics: overwrite the Hudi table
        writer.mode("overwrite").saveAsTable(full_name)

    # ---------- Incremental API ----------
    def incremental_insert(self, table_name: str, select_body_sql: str) -> None:
        """
        Append-only incremental load.

        Uses Spark SQL INSERT INTO; the Hudi connector will handle this as an
        insert/upsert depending on table configuration.
        """
        body = select_body_sql.strip().rstrip(";\n\t ")
        if not body.lower().startswith("select"):
            raise ValueError(f"incremental_insert expects SELECT body, got: {body[:40]!r}")

        full_name = self._qualify_table_name(table_name)
        self.run_sql(f"INSERT INTO {full_name} {body}")

    def incremental_merge(
        self,
        table_name: str,
        select_body_sql: str,
        unique_key: list[str],
    ) -> None:
        """
        Hudi MERGE implementation.

            MERGE INTO db.table AS t
            USING (<select_body_sql>) AS s
            ON  AND-joined equality on unique_key
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *

        This requires Hudi's MERGE support to be enabled on your Spark cluster.
        """
        body = select_body_sql.strip().rstrip(";\n\t ")
        if not unique_key:
            # No key - fall back to simple insert
            self.incremental_insert(table_name, body)
            return

        full_name = self._qualify_table_name(table_name)
        pred = " AND ".join([f"t.`{k}` = s.`{k}`" for k in unique_key])

        self.run_sql(
            f"""
            MERGE INTO {full_name} AS t
            USING ({body}) AS s
            ON {pred}
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
            """
        )

save_df_as_table

save_df_as_table(table_name, df)

Save DataFrame as a Hudi table registered in the current catalog.

Typical Hudi options you might pass via table_options include
  • hoodie.datasource.write.recordkey.field
  • hoodie.datasource.write.precombine.field
  • hoodie.table.name (optional when using saveAsTable)
Source code in src/fastflowtransform/table_formats/spark_hudi.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def save_df_as_table(self, table_name: str, df: SDF) -> None:
    """
    Save DataFrame as a Hudi table registered in the current catalog.

    Typical Hudi options you might pass via table_options include:
      - hoodie.datasource.write.recordkey.field
      - hoodie.datasource.write.precombine.field
      - hoodie.table.name (optional when using saveAsTable)
    """
    full_name = self._qualify_table_name(table_name)

    writer = df.write.format("hudi")
    for k, v in self.table_options.items():
        writer = writer.option(str(k), str(v))

    # Full refresh semantics: overwrite the Hudi table
    writer.mode("overwrite").saveAsTable(full_name)

incremental_insert

incremental_insert(table_name, select_body_sql)

Append-only incremental load.

Uses Spark SQL INSERT INTO; the Hudi connector will handle this as an insert/upsert depending on table configuration.

Source code in src/fastflowtransform/table_formats/spark_hudi.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def incremental_insert(self, table_name: str, select_body_sql: str) -> None:
    """
    Append-only incremental load.

    Uses Spark SQL INSERT INTO; the Hudi connector will handle this as an
    insert/upsert depending on table configuration.
    """
    body = select_body_sql.strip().rstrip(";\n\t ")
    if not body.lower().startswith("select"):
        raise ValueError(f"incremental_insert expects SELECT body, got: {body[:40]!r}")

    full_name = self._qualify_table_name(table_name)
    self.run_sql(f"INSERT INTO {full_name} {body}")

incremental_merge

incremental_merge(table_name, select_body_sql, unique_key)

Hudi MERGE implementation.

MERGE INTO db.table AS t
USING (<select_body_sql>) AS s
ON  AND-joined equality on unique_key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

This requires Hudi's MERGE support to be enabled on your Spark cluster.

Source code in src/fastflowtransform/table_formats/spark_hudi.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def incremental_merge(
    self,
    table_name: str,
    select_body_sql: str,
    unique_key: list[str],
) -> None:
    """
    Hudi MERGE implementation.

        MERGE INTO db.table AS t
        USING (<select_body_sql>) AS s
        ON  AND-joined equality on unique_key
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *

    This requires Hudi's MERGE support to be enabled on your Spark cluster.
    """
    body = select_body_sql.strip().rstrip(";\n\t ")
    if not unique_key:
        # No key - fall back to simple insert
        self.incremental_insert(table_name, body)
        return

    full_name = self._qualify_table_name(table_name)
    pred = " AND ".join([f"t.`{k}` = s.`{k}`" for k in unique_key])

    self.run_sql(
        f"""
        MERGE INTO {full_name} AS t
        USING ({body}) AS s
        ON {pred}
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """
    )

run_sql

run_sql(sql)

Execute SQL via the injected runner (guardable in the executor).

Source code in src/fastflowtransform/table_formats/base.py
45
46
47
def run_sql(self, sql: str) -> Any:
    """Execute SQL via the injected runner (guardable in the executor)."""
    return self._sql_runner(sql)

format_identifier_for_sql

format_identifier_for_sql(table_name, *, database=None)

Return a SQL-safe identifier (per-part quoted) for the table.

Source code in src/fastflowtransform/table_formats/base.py
54
55
56
57
58
59
60
def format_identifier_for_sql(self, table_name: str, *, database: str | None = None) -> str:
    """Return a SQL-safe identifier (per-part quoted) for the table."""
    ident = self.qualify_identifier(table_name, database=database)
    parts = [p for p in ident.split(".") if p]
    if not parts:
        return self._quote_part(ident)
    return ".".join(self._quote_part(part) for part in parts)