Skip to content

Incremental, Delta & Iceberg Demo

This example project shows how to use incremental models and Delta-/Iceberg-style merges in FastFlowTransform across DuckDB, Postgres, Databricks Spark (Parquet, Delta & Iceberg), BigQuery (pandas or BigFrames), and Snowflake Snowpark.

It is intentionally small and self-contained so you can copy/paste patterns into your own project.


Location & Layout

The example lives under:

examples/incremental_demo/
````

Directory structure:

```text
incremental_demo/
  .env
  .env.dev_duckdb
  .env.dev_postgres
  .env.dev_databricks_parquet
  .env.dev_databricks_delta
  .env.dev_databricks_iceberg
  .env.dev_bigquery_pandas
  .env.dev_bigquery_bigframes
  .env.dev_snowflake
  Makefile
  profiles.yml
  project.yml
  sources.yml

  seeds/
    seed_events.csv

  models/
    common/
      events_base.ff.sql
      fct_events_sql_inline.ff.sql
      fct_events_sql_yaml.ff.sql
    engines/
      duckdb/
        fct_events_py_incremental.ff.py
      postgres/
        fct_events_py_incremental.ff.py
      databricks_spark/
        fct_events_py_incremental.ff.py
      bigquery/
        pandas/
          fct_events_py_incremental.ff.py
        bigframes/
          fct_events_py_incremental.ff.py
      snowflake_snowpark/
        fct_events_py_incremental.ff.py

Your actual filenames may differ slightly; the concepts are the same.


What the demo shows

The demo revolves around a tiny events dataset and three different ways to build an incremental fact table:

  1. SQL incremental model with inline delta SQL

  2. models/common/fct_events_sql_inline.ff.sql

  3. All incremental logic (how to find “new/changed” rows) is defined directly in the model’s config(meta=...) block.

  4. SQL incremental model with YAML config in project.yml

  5. models/common/fct_events_sql_yaml.ff.sql

  6. The base SELECT lives in the model, but all incremental hints (incremental.enabled, unique_key, updated_at_column, …) are configured in project.yml → models.incremental.

  7. Python incremental model

  8. models/engines/*/fct_events_py_incremental.ff.py

  9. A Python model that returns a DataFrame; the executor applies incremental behaviour based on model meta (unique key + updated-at timestamp) and the target engine:

    • DuckDB / Postgres: incremental insert/merge in SQL
    • Databricks Spark: MERGE INTO for Delta or Iceberg where available (Spark 4), with a fallback full-refresh strategy for other formats
    • BigQuery: pandas- or BigFrames-backed DataFrame models with incremental merge logic handled by the BigQuery executor
    • Snowflake Snowpark: Snowpark DataFrame operations with merges handled by the Snowflake executor
  10. Iceberg profile for Spark 4

  11. Optional Databricks/Spark profile that uses the built-in Iceberg catalog.

  12. Seeds and models are materialized as Iceberg tables in a local warehouse directory.
  13. ref() and source() automatically point to the Iceberg catalog when the databricks_spark.table_format is set to iceberg.

Seed data

The demo uses a simple seed file:

examples/incremental_demo/seeds/seed_events.csv

Example contents (conceptually):

event_id,updated_at,value
1,2024-01-01T10:00:00,10
2,2024-01-01T10:05:00,20
3,2024-01-01T10:10:00,30

Running:

fft seed examples/incremental_demo --env dev_duckdb

(or with your engine/env of choice) will materialize this seed into the warehouse (e.g. a DuckDB table or Postgres table).


Base model: events_base

The base staging model simply exposes the events from the seed:

models/common/events_base.ff.sql

Conceptually:

{{ config(
    materialized='table',
    tags=[
        'example:incremental_demo',
        'scope:common',
        'engine:duckdb',
        'engine:postgres',
        'engine:databricks_spark',
        'engine:bigquery',
        'engine:snowflake_snowpark'
    ],
) }}

select
  event_id,
  updated_at,
  value
from {{ source('raw', 'events') }};

All incremental models build on top of this base table.


Incremental configuration (high-level)

All three incremental models share the same core idea:

  • Mark the model as incremental
  • Provide a unique key (e.g. event_id)
  • Provide an updated-at / timestamp column (e.g. updated_at)
  • Optionally specify a delta strategy:

  • Inline SQL (in the model)

  • External YAML (referenced from the model)
  • Python (engine-specific model that returns the delta dataset)

There are two ways to express this in the demo:

  1. Inline on the model (used by fct_events_sql_inline.ff.sql), via config(...):
{{ config(
    materialized='incremental',
    unique_key='event_id',
    incremental={'updated_at_column': 'updated_at'},
    tags=['example:incremental_demo'],
) }}
  1. As an overlay in project.yml (used by fct_events_sql_yaml.ff.sql and the Python model):
models:
  incremental:
    fct_events_sql_yaml.ff:
      unique_key: "event_id"
      incremental:
        enabled: true
        updated_at_column: "updated_at"

    fct_events_py_incremental.ff:
      unique_key: "event_id"
      incremental:
        enabled: true
        updated_at_column: "updated_at"

The incremental engine then uses these meta fields to decide whether to:

  • create the table (create_table_as) for the first run
  • perform an incremental insert or merge for subsequent runs

1) SQL incremental with inline delta SQL

File:

models/common/fct_events_sql_inline.ff.sql

In this variant, both incremental configuration and the delta filter live directly in the model:

{{ config(
    materialized='incremental',
    unique_key='event_id',
    incremental={'updated_at_column': 'updated_at'},
    tags=[
        'example:incremental_demo',
        'scope:common',
        'kind:incremental',
        'inc:type:inline-sql',
        'engine:duckdb',
        'engine:postgres',
        'engine:databricks_spark',
    ],
) }}

with base as (
  select *
  from {{ ref('events_base.ff') }}
)
select
  event_id,
  updated_at,
  value
from base
{% if is_incremental() %}
where updated_at > (
  select coalesce(max(updated_at), timestamp '1970-01-01 00:00:00')
  from {{ this }}
)
{% endif %};

On the first run, the engine sees no existing relation, so it materializes the full select ... from events_base.

On subsequent runs, the engine evaluates the delta.sql snippet and:

  • DuckDB / Postgres: inserts or merges the resulting rows into the target table
  • Databricks Spark: tries a MERGE INTO (Delta) and falls back to a full-refresh if necessary
  • BigQuery: applies incremental insert/merge logic in SQL via the BigQuery executor

2) SQL incremental with YAML delta config

File:

models/common/fct_events_sql_yaml.ff.sql

Here the model body only defines the canonical SELECT and does not contain any incremental hints:

{{ config(
    materialized='incremental',
    tags=[
        'example:incremental_demo',
        'scope:common',
        'kind:incremental',
        'inc:type:yaml-config',
        'engine:duckdb',
        'engine:postgres',
        'engine:databricks_spark',
        'engine:bigquery',
        'engine:snowflake_snowpark',
    ],
) }}

with base as (
  select *
  from {{ ref('events_base.ff') }}
)
select
  event_id,
  updated_at,
  value
from base;

All incremental behaviour for this model is driven by project.yml:

models:
  incremental:
    fct_events_sql_yaml.ff:
      unique_key: "event_id"
      incremental:
        enabled: true
        updated_at_column: "updated_at"

The registry merges this overlay into the model at load time, so the incremental runtime sees effectively the same config as for the inline model (unique_key + updated_at_column) – only the source of truth is different.


Inline vs YAML config at a glance

Model Where is incremental configured? What lives in the SQL file?
fct_events_sql_inline.ff Inline in config(...) on the model Full SELECT + is_incremental() filter
fct_events_sql_yaml.ff project.yml → models.incremental Full SELECT only (no incremental hints)

Both end up with the same runtime meta, only the location of config differs.

3) Python incremental model

Files:

models/engines/duckdb/fct_events_py_incremental.ff.py
models/engines/postgres/fct_events_py_incremental.ff.py
models/engines/databricks_spark/fct_events_py_incremental.ff.py
models/engines/bigquery/pandas/fct_events_py_incremental.ff.py
models/engines/bigquery/bigframes/fct_events_py_incremental.ff.py

Each engine variant uses the same logical signature:

from fastflowtransform import engine_model
import pandas as pd  # or pyspark.sql.DataFrame for Databricks Spark


@engine_model(
    only="duckdb",  # or "postgres" / "databricks_spark"
    name="fct_events_py_incremental",
    deps=["events_base.ff"],
    tags=[
        "example:incremental_demo",
        "scope:engine",
        "engine:duckdb",  # or engine-specific
    ],
    meta={
        "incremental": True,
        "unique_key": ["event_id"],
        "updated_at": "updated_at",
    },
)
def build(df_events):
    # 'df_events' is either a pandas.DataFrame or Spark DataFrame
    # depending on the engine.
    # The function returns either:
    #   - a full canonical result, or
    #   - only the delta rows, depending on your design.
    #
    # In the simplest version, you just return the full dataset and let the
    # executor handle incremental logic based on meta.
    return df_events[["event_id", "updated_at", "value"]]

The executor uses the meta.incremental / meta.unique_key / meta.updated_at hints to run:

  • A full-refresh on the first run
  • A delta merge on subsequent runs:

  • For DuckDB / Postgres: insert/merge SQL

  • For Databricks Spark:

    • MERGE INTO for Delta tables, or
    • a full-refresh fallback strategy that rewrites the table based on the union of existing + delta rows

Delta & Iceberg variants (Databricks / Spark)

The Databricks/Spark executor can materialize the same incremental models as managed parquet, Delta Lake, or Iceberg tables. Instead of wiring storage paths in project.yml, each table format gets its own Databricks profile with a dedicated database and warehouse.

Managed storage per format

profiles.yml ships with three out-of-the-box profiles:

dev_databricks_parquet:
  engine: databricks_spark
  databricks_spark:
    database: incremental_demo_parquet
    warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_parquet"

dev_databricks_delta:
  engine: databricks_spark
  databricks_spark:
    database: incremental_demo_delta
    warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_delta"

dev_databricks_iceberg:
  engine: databricks_spark
  databricks_spark:
    database: incremental_demo_iceberg
    warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_iceberg"
    table_format: iceberg
    extra_conf:
      spark.sql.catalog.iceberg: org.apache.iceberg.spark.SparkCatalog
      spark.sql.catalog.iceberg.type: hadoop
      spark.sql.catalog.iceberg.warehouse: "file://{{ project_dir() }}/.local/iceberg_warehouse"

Because every format has its own schema/warehouse, you can flip FF_DBR_TABLE_FORMAT or swap profiles without worrying about leftover Parquet metadata interfering with a Delta run (and vice versa).

Running the Spark formats

Pick the .env.dev_databricks_* file that matches your target format, export it, and run the demo:

# From the repo root
cd examples/incremental_demo

# Parquet tables (default)
set -a; source .env.dev_databricks_parquet; set +a
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft run . \
  --select tag:example:incremental_demo --select tag:engine:databricks_spark

# Delta Lake tables
set -a; source .env.dev_databricks_delta; set +a
FFT_ACTIVE_ENV=dev_databricks_delta FF_DBR_TABLE_FORMAT=delta fft run . \
  --select tag:example:incremental_demo --select tag:engine:databricks_spark

# Iceberg tables
set -a; source .env.dev_databricks_iceberg; set +a
FFT_ACTIVE_ENV=dev_databricks_iceberg FF_DBR_TABLE_FORMAT=iceberg fft run . \
  --select tag:example:incremental_demo --select tag:engine:databricks_spark

FF_DBR_TABLE_FORMAT is optional when the profile already sets databricks_spark.table_format, but passing it explicitly makes the intent clear in logs and CI.


Running the demo

From the project root:

cd examples/incremental_demo

DuckDB

# Seed
FFT_ACTIVE_ENV=dev_duckdb fft seed . 

# Initial full run
FFT_ACTIVE_ENV=dev_duckdb fft run . \
  --select tag:example:incremental_demo --select tag:engine:duckdb

# Incremental run (after modifying seed_events.csv to add later events)
FFT_ACTIVE_ENV=dev_duckdb fft run . \
  --select tag:example:incremental_demo --select tag:engine:duckdb \
  --cache rw

# Data-quality tests (if configured in project.yml / schema YAML)
FFT_ACTIVE_ENV=dev_duckdb fft test . \
  --select tag:example:incremental_demo

Postgres

FFT_ACTIVE_ENV=dev_postgres fft seed .
FFT_ACTIVE_ENV=dev_postgres fft run . \
  --select tag:example:incremental_demo --select tag:engine:postgres
FFT_ACTIVE_ENV=dev_postgres fft test . \
  --select tag:example:incremental_demo

BigQuery

# pandas
FF_ENGINE=bigquery FF_ENGINE_VARIANT=pandas FFT_ACTIVE_ENV=dev_bigquery_pandas fft seed .
FF_ENGINE=bigquery FF_ENGINE_VARIANT=pandas FFT_ACTIVE_ENV=dev_bigquery_pandas fft run . \
  --select tag:example:incremental_demo --select tag:engine:bigquery --cache rw
FF_ENGINE=bigquery FF_ENGINE_VARIANT=pandas FFT_ACTIVE_ENV=dev_bigquery_pandas fft test . \
  --select tag:example:incremental_demo

# BigFrames
FF_ENGINE=bigquery FF_ENGINE_VARIANT=bigframes FFT_ACTIVE_ENV=dev_bigquery_bigframes fft seed .
FF_ENGINE=bigquery FF_ENGINE_VARIANT=bigframes FFT_ACTIVE_ENV=dev_bigquery_bigframes fft run . \
  --select tag:example:incremental_demo --select tag:engine:bigquery --cache rw
FF_ENGINE=bigquery FF_ENGINE_VARIANT=bigframes FFT_ACTIVE_ENV=dev_bigquery_bigframes fft test . \
  --select tag:example:incremental_demo

Ensure the service account credentials pointed to by GOOGLE_APPLICATION_CREDENTIALS can create/drop tables in the target dataset.

Snowflake Snowpark

# Seed / run / test (Snowflake profile)
FFT_ACTIVE_ENV=dev_snowflake FF_ENGINE=snowflake_snowpark fft seed .
FFT_ACTIVE_ENV=dev_snowflake FF_ENGINE=snowflake_snowpark fft run . \
  --select tag:example:incremental_demo --select tag:engine:snowflake_snowpark --cache rw
FFT_ACTIVE_ENV=dev_snowflake FF_ENGINE=snowflake_snowpark fft test . \
  --select tag:example:incremental_demo

Make sure .env.dev_snowflake sets the required FF_SF_* variables and install fastflowtransform[snowflake] so the Snowpark executor and client libraries are available.

Databricks Spark

FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft seed .
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft run . \
  --select tag:example:incremental_demo --select tag:engine:databricks_spark
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft test . \
  --select tag:example:incremental_demo
````

### Databricks Spark (parquet vs Delta)

You can run the incremental demo on Databricks/Spark against either **parquet** or **Delta** tables.

Each format has its own profile/database (`dev_databricks_parquet`  `incremental_demo_parquet`,
`dev_databricks_delta`  `incremental_demo_delta`), so cleanup and reruns never reuse stale metadata.
`FF_DBR_TABLE_FORMAT` still overrides `databricks_spark.table_format` if you want to switch formats
without changing profiles.

Run with **parquet** tables (default):

```bash
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft seed .
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft run . \
  --select tag:example:incremental_demo --select tag:engine:databricks_spark
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft test . \
  --select tag:example:incremental_demo

Run with Delta tables:

FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks_delta fft seed .
FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks_delta fft run . \
  --select tag:example:incremental_demo --select tag:engine:databricks_spark
FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks_delta fft test . \
  --select tag:example:incremental_demo

delta-spark >= 4.0 must be installed locally so the executor can wire Delta extensions into SparkSession automatically (the profile/CLI already sets the required Spark configs).

Databricks Spark (Iceberg / Spark 4+)

If you are on Spark 4 / Databricks with Iceberg support, you can also run the incremental demo purely against Iceberg tables using dev_databricks_iceberg. That profile:

dev_databricks_iceberg:
  engine: databricks_spark
  databricks_spark:
    database: incremental_demo_iceberg
    warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_iceberg"
    table_format: iceberg
    extra_conf:
      spark.jars.packages: org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0
      spark.sql.catalog.iceberg: org.apache.iceberg.spark.SparkCatalog
      spark.sql.catalog.iceberg.type: hadoop
      spark.sql.catalog.iceberg.warehouse: "file://{{ project_dir() }}/.local/iceberg_warehouse"

From the repo root:

cd examples/incremental_demo

Run seeds and models against Iceberg:

FF_DBR_TABLE_FORMAT=iceberg FFT_ACTIVE_ENV=dev_databricks_iceberg fft seed .

FF_DBR_TABLE_FORMAT=iceberg FFT_ACTIVE_ENV=dev_databricks_iceberg fft run . \ --select tag:example:incremental_demo --select tag:engine:databricks_spark

FF_DBR_TABLE_FORMAT=iceberg FFT_ACTIVE_ENV=dev_databricks_iceberg fft test . \ --select tag:example:incremental_demo

Under this profile, all ref() / source() calls in Spark SQL and Python models are resolved against the Iceberg catalog, so seeds and incremental models operate purely on Iceberg tables.