Macros Demo¶
Goal: Showcase
- SQL Jinja macros (
models/macros/*.sql) - Python render-time macros (
models/macros_py/*.py) - Engine-aware stdlib helpers (
fastflowtransform.stdlib.*wired in asff_*Jinja globals)
working together across engines:
- DuckDB
- Postgres
- Databricks Spark
- BigQuery (pandas & BigFrames)
- Snowflake Snowpark
You’ll see reusable SQL helpers, engine-aware SQL generation, and Python functions exposed as Jinja globals/filters.
Directory structure¶
examples/macros_demo/
.env.dev_bigquery_bigframes
.env.dev_bigquery_pandas
.env.dev_databricks
.env.dev_duckdb
.env.dev_postgres
.env.dev_snowflake
Makefile
profiles.yml
project.yml
sources.yml
seeds/
seed_users.csv
seed_orders.csv
models/
macros/
utils.sql
star.sql
macros_py/
helpers.py
common/
stg_users.ff.sql
stg_orders.ff.sql
dim_users.ff.sql
fct_user_sales.ff.sql
fct_user_sales_by_country.ff.sql
fct_user_sales_partitioned.ff.sql
engines/
duckdb/
py_example.ff.py
postgres/
py_example.ff.py
databricks_spark/
py_example.ff.py
bigquery/
bigframes/
py_example.ff.py
pandas/
py_example.ff.py
snowflake_snowpark/
py_example.ff.py
tests/
unit/
...
(Exact engine folders may vary slightly depending on your checkout, but conceptually this is the layout.)
What this demo shows¶
1. SQL Jinja macros (models/macros/*.sql)¶
utils.sql¶
High-level helpers used by stg_users / stg_orders:
-
email_domain(expr)Engine-aware extraction of the domain part of an email address. -
BigQuery: uses
split(...)[SAFE_OFFSET(1)] -
Other engines: uses
split_part(..., '@', 2) -
safe_cast_amount(expr)A convenience wrapper around the stdlibff_safe_castto cast arbitrary expressions into a numeric type safely and consistently across engines. -
coalesce_any(expr, default)Tiny convenience macro wrappingcoalesce(...). -
default_country()Reads a default country fromproject.yml → vars.default_country.
star.sql¶
-
star_except(relation, exclude_cols)Selects all columns ofrelationexcept a list of exclusions. -
Uses
adapter_columns(relation)if the executor can describe the table. - Falls back to
*if columns are unknown.
2. Python macros (models/macros_py/helpers.py)¶
Render-time Python helpers exposed as Jinja globals/filters:
-
slugify(value: str) -> strSimple URL-friendly slug: lower-case, replace non-alphanumerics with-, squash duplicates. -
mask_email(email: str) -> strRedacts the local part of the email, e.g.a@example.com→a***@example.com. -
csv_values(rows: list[dict[str, Any]], cols: list[str]) -> strRenders aVALUES(...)list for small inline lookup tables in SQL.
These run at render time (inside the Jinja phase), not as SQL UDFs.
3. Stdlib helpers (fastflowtransform.stdlib → ff_* Jinja globals)¶
The demo also showcases the built-in, engine-aware SQL stdlib. These are wired into Jinja as globals like ff_safe_cast(...).
Key helpers used in this demo:
-
ff_engine()/ff_engine_family()Return the normalized engine name / family ("duckdb","postgres","bigquery","snowflake","spark","generic"). -
ff_is_engine(*candidates)Convenience predicate to branch on engine in Jinja:
{% if ff_is_engine('bigquery') %}
...
{% endif %}
-
ff_safe_cast(expr, target_type, default=None)Engine-aware safe casting, used in this demo to safely castamountto a numeric type: -
DuckDB:
try_cast(expr AS NUMERIC) - BigQuery:
SAFE_CAST(expr AS NUMERIC) - Spark:
TRY_CAST(expr AS NUMERIC) - Snowflake / Postgres / others:
CAST(expr AS NUMERIC) -
If
defaultis given → wrapped asCOALESCE(..., default). -
ff_date_trunc(expr, part="day")Engine-awareDATE_TRUNC, used to deriveorder_dayfromorder_ts: -
DuckDB/Postgres/Snowflake/Spark:
date_trunc('<part>', CAST(expr AS TIMESTAMP)) -
BigQuery:
DATE_TRUNC(CAST(expr AS TIMESTAMP), PART) -
ff_date_add(expr, part, amount)Engine-aware date/timestamp arithmetic, used to getorder_ts_plus_1d: -
DuckDB/Postgres:
CAST(expr AS TIMESTAMP) + INTERVAL 'amount part' - Snowflake:
DATEADD(PART, amount, TO_TIMESTAMP(expr)) - BigQuery:
DATE_ADD(CAST(expr AS TIMESTAMP), INTERVAL amount PART) -
Spark:
date_add(expr, amount)whenpart == "day", else falls back toINTERVAL. -
ff_partition_filter(column, start, end)Builds a range predicate for partitions; demo uses it infct_user_sales_partitioned: -
start&end:column BETWEEN <lit(start)> AND <lit(end)> - Only
start:column >= <lit(start)> - Only
end:column <= <lit(end)> -
Both
None:1=1(no-op) -
ff_partition_in(column, values)Builds anIN (...)predicate from Python values (date,datetime, strings, ints, etc.) via the stdlib’ssql_literal: -
Empty values →
1=0(safe “match nothing” guard). - Non-empty →
column IN (<lit1>, <lit2>, ...).
Even if only some of these are used in the demo models, they are all available to your own models and macros.
Prerequisites¶
- A working FastFlowTransform installation (CLI
fftavailable). - For Postgres / Databricks / BigQuery / Snowflake: drivers and credentials configured via the
.env.dev_*files. -
The FFT core already wires Jinja with:
-
var(name, default) env(name, default)engine(default)(legacy in macros; new code usesff_engine/ff_is_enginefrom stdlib).
Seeds¶
Two tiny CSVs materialized via fft seed:
seeds/seed_users.csv—id,email,countryseeds/seed_orders.csv—order_id,customer_id,amount,order_ts
profiles.yml and project.yml provide minimal connection config; .env.dev_* files bind environment variables like FF_DUCKDB_PATH, FF_PG_DSN, FF_BQ_PROJECT, FF_SF_*, etc.
How to run¶
From the repo root:
cd examples/macros_demo
# Choose engine: duckdb (default) | postgres | databricks_spark | bigquery | snowflake_snowpark
make ENGINE=duckdb demo
# or
make ENGINE=postgres demo
# or
make ENGINE=databricks_spark demo
# or
make ENGINE=bigquery BQ_FRAME=pandas demo # or bigframes
# or
make ENGINE=snowflake_snowpark demo
The demo target:
fft seed— loads the CSV seeds.fft run— builds all tagged models using macros & stdlib.fft dag --html— renders a DAG HTML tosite/dag/index.html.fft test— executes example tests fromproject.yml.- Prints artifact paths and (if possible) opens the DAG in your browser.
For Snowflake, copy
.env.dev_snowflaketo.envor export theFF_SF_*variables yourself, and installfastflowtransform[snowflake]so the Snowpark executor is available.
Key files (highlights)¶
SQL macros – models/macros/utils.sql¶
Conceptually:
{# Engine-aware email domain extraction using stdlib helpers #}
{%- macro email_domain(expr) -%}
{%- if ff_is_engine('bigquery') -%}
lower(split({{ expr }}, '@')[SAFE_OFFSET(1)])
{%- else -%}
lower(split_part({{ expr }}, '@', 2))
{%- endif -%}
{%- endmacro %}
{# Convenience wrapper on top of ff_safe_cast #}
{%- macro safe_cast_amount(expr) -%}
{{ ff_safe_cast(expr, "numeric", default="0") }}
{%- endmacro %}
{%- macro coalesce_any(expr, default) -%}
coalesce({{ expr }}, {{ default }})
{%- endmacro %}
{%- macro default_country() -%}
'{{ var("default_country", "DE") }}'
{%- endmacro %}
Exact implementation may differ slightly in your tree, but the idea is:
- Use stdlib (
ff_safe_cast,ff_is_engine) for the heavy lifting.- Keep project macros thin and readable.
SQL macros – models/macros/star.sql¶
{# Select * except some columns. Works across engines. #}
{%- macro star_except(relation, exclude_cols) -%}
{%- set excl = exclude_cols | map('lower') | list -%}
{%- set cols = adapter_columns(relation) -%}
{# adapter_columns is provided by FFT executors' catalog/describe (if available).
To keep demo simple, fall back to literal star if unknown. #}
{%- if cols and cols|length > 0 -%}
{{- (cols | reject('in', excl) | map('string') | join(', ')) -}}
{%- else -%}
*
{%- endif -%}
{%- endmacro %}
Models using macros & stdlib¶
stg_users.ff.sql — SQL + Python macros¶
- Uses
coalesce_any(...)+default_country()to fill missing countries. - Uses
email_domain(...)for engine-aware domain extraction. - Injects a Python macro result via
slugify(var("site_name", "My Site")).
Conceptually:
{{ config(
materialized='view',
tags=[
'example:macros_demo',
'scope:common',
'engine:duckdb',
'engine:postgres',
'engine:databricks_spark',
'engine:bigquery',
'engine:snowflake_snowpark'
]
) }}
with src as (
select
cast(id as int) as user_id,
lower(email) as email,
{{ coalesce_any("country", default_country()) }} as country
from {{ source('crm', 'users') }}
)
select
user_id,
email,
{{ email_domain("email") }} as email_domain,
country,
-- Render-time Python macro usage (literal in SQL)
'{{ slugify(var("site_name", "My Site")) }}' as site_slug
from src;
stg_orders.ff.sql — stdlib date & cast helpers¶
This model shows:
safe_cast_amount(...)andff_safe_cast(...)for robust numeric casting.ff_date_trunc(...)for engine-awareDATE_TRUNC.ff_date_add(...)for portable “+ 1 day” logic.
Conceptually:
{{ config(
materialized='view',
tags=[
'example:macros_demo',
'scope:common',
'engine:duckdb',
'engine:postgres',
'engine:databricks_spark',
'engine:bigquery',
'engine:snowflake_snowpark'
]
) }}
select
cast(order_id as int) as order_id,
cast(customer_id as int) as user_id,
{{ safe_cast_amount("amount") }} as amount,
{{ ff_safe_cast("amount", "numeric", default="0") }} as amount_safe,
{{ ff_date_trunc("order_ts", "day") }} as order_day,
{{ ff_date_add("order_ts", "day", 1) }} as order_ts_plus_1d,
cast(order_ts as timestamp) as order_ts
from {{ source('sales', 'orders') }};
dim_users.ff.sql — inline lookup (can use Python or stdlib)¶
The demo joins staged users with a tiny lookup table that maps domains to labels (e.g. "example.com" → "internal"). In your current project this may be expressed either:
- via a Python macro (
csv_values(...)→VALUES(...)), or - via an explicit SQL snippet using literal
VALUES.
Either way, it shows how to generate small lookups at render time and join them in the model.
fct_user_sales.ff.sql — base fact table¶
Joins stg_orders with dim_users and aggregates:
order_counttotal_amountfirst_order_tslast_order_ts
This is the “plain” fact model without partitioning.
fct_user_sales_by_country.ff.sql — grouping example¶
Demonstrates using the staged and dim models to build a simple grouped fact table, usually aggregating at (country, user_segment) or similar.
fct_user_sales_partitioned.ff.sql — partition range filter with stdlib¶
This model demonstrates ff_partition_filter in action. It filters a date/timestamp column with a portable predicate built from variables:
{{ config(
materialized='table',
tags=[
'example:macros_demo',
'scope:common',
'engine:duckdb',
'engine:postgres',
'engine:databricks_spark',
'engine:bigquery',
'engine:snowflake_snowpark'
]
) }}
with sales as (
select
u.user_id,
u.user_segment,
o.order_ts,
o.amount
from {{ ref('stg_orders.ff') }} as o
join {{ ref('dim_users.ff') }} as u
on u.user_id = o.user_id
where
-- demo: engine-aware partition predicate using stdlib
{{ ff_partition_filter(
'o.order_ts',
var('from_date', '2025-10-01'),
var('to_date', '2025-10-31')
) }}
)
select
user_id,
user_segment,
count(*) as order_count,
sum(amount) as total_amount
from sales
group by user_id, user_segment;
You can use the same pattern with ff_partition_in if you prefer a discrete partition list driven by var("partitions", [...]).
Tests (examples)¶
In project.yml you’ll see example tests like:
tests:
- type: not_null
table: dim_users
column: user_id
tags: [batch]
- type: row_count_between
table: fct_user_sales
min_rows: 1
tags: [batch]
- type: not_null
table: fct_user_sales_by_country
column: user_id
tags: [batch]
- type: row_count_between
table: fct_user_sales_by_country
min_rows: 1
tags: [batch]
- type: not_null
table: fct_user_sales_partitioned
column: user_id
tags: [batch]
- type: row_count_between
table: fct_user_sales_partitioned
min_rows: 1
tags: [batch]
- type: not_null
table: stg_orders
column: user_id
tags: [batch]
- type: row_count_between
table: stg_orders
min_rows: 1
tags: [batch]
- type: not_null
table: stg_users
column: user_id
tags: [batch]
- type: row_count_between
table: stg_users
min_rows: 1
tags: [batch]
Run them using the appropriate profile, e.g.:
fft test examples/macros_demo --env dev_duckdb --select tag:example:macros_demo
Troubleshooting¶
jinja2.exceptions.UndefinedError: 'var'/'env'/'engine' is undefined
Ensure your FFT core registers these Jinja globals before rendering models:
env.globals.update(var=..., env=..., engine=...)
and that fastflowtransform.stdlib.register_jinja(...) is called to inject ff_* helpers.
- Engine differences (types & functions)
Use ff_engine() / ff_is_engine(...) or your own macros to branch on engine where syntax differs (e.g. split_part vs split()[SAFE_OFFSET(...)], SAFE_CAST vs TRY_CAST).
adapter_columns(...)returns None
In that case star_except falls back to *. If you need strict column lists for some engines, replace that macro with explicit column sets or configure your executor to provide catalog metadata.
Extending this demo¶
- Add more helpers to
helpers.py(e.g. JSON formatting, list formatting). - Create more engine-aware macros for date handling or SCDs, potentially layered on top of stdlib (
ff_date_trunc,ff_date_add). - Add new models that use
ff_partition_inor more elaborateff_safe_castcombinations. - Use
var(...)to parameterize from/to dates, partition lists, or feature flags per environment.
Happy macro-ing 🚀