Data Quality Demo Project¶
The Data Quality Demo shows how to use all built-in FFT data quality tests plus custom DQ tests (Python & SQL) on a small, understandable model:
-
Column checks:
-
not_null uniqueaccepted_valuesgreater_equalnon_negative_sumrow_count_betweenfreshness-
Cross-table reconciliations:
-
reconcile_equal reconcile_ratio_withinreconcile_diff_within-
reconcile_coverage -
Custom tests (demo):
-
min_positive_share(Python-based) no_future_orders(SQL-based)
It uses a simple customers / orders / mart setup so you can see exactly what each test does and how it fails when something goes wrong.
What this example demonstrates¶
-
Basic column checks on staging tables Ensure IDs are present and unique, amounts are non-negative, and status values are valid.
-
Freshness on a timestamp column Check that the most recent order in your mart is not “too old”, using
last_order_ts. -
Row count sanity checks Guard against empty tables and unexpectedly large row counts.
-
Cross-table reconciliations between staging and mart Verify that sums and counts match between
ordersand the aggregatedmart_orders_agg, and that every customer has a corresponding mart row. -
Tagged tests and selective execution All tests are tagged (e.g.
example:dq_demo,reconcile) so you can run exactly the subset you care about.
Project layout (example)¶
examples/dq_demo/
.env
.env.dev_duckdb
.env.dev_postgres
.env.dev_databricks
.env.dev_bigquery_pandas
.env.dev_bigquery_bigframes
.env.dev_snowflake
Makefile # optional, convenience wrapper around fft commands
profiles.yml
project.yml
sources.yml
seeds/
customers.csv
orders.csv
models/
staging/
customers.ff.sql
orders.ff.sql
marts/
mart_orders_agg.ff.sql
tests/
dq/
min_positive_share.ff.py
no_future_orders.ff.sql
Seeds¶
-
seeds/customers.csvSimple customer dimension with a creation timestamp:customer_id,name,status,created_at(ISO-8601, e.g.2025-01-01T10:00:00). The demo ships with three rows (Alice, Bob, Carol) so it’s easy to reason about failures. -
seeds/orders.csvOrder fact data with per-order timestamps:order_id,customer_id,amount,order_ts(ISO-8601, e.g.2025-01-10T09:00:00). One order hasamount = 0.00so the custommin_positive_sharetest has something to complain about.
Models¶
1. Staging: customers.ff.sql
- Materialized as a table.
- Casts IDs and other fields into proper types.
- Used as the “clean” customer dimension for downstream checks.
{{ config(
materialized='table',
tags=[
'example:dq_demo',
'scope:staging',
'engine:duckdb',
'engine:postgres',
'engine:databricks_spark',
'engine:bigquery',
'engine:snowflake_snowpark'
],
) }}
select
cast(customer_id as int) as customer_id,
name,
status,
cast(created_at as timestamp) as created_at
from {{ source('crm', 'customers') }};
2. Staging: orders.ff.sql
- Materialized as a table.
- Casts fields to proper types so DQ tests work reliably:
{{ config(
materialized='table',
tags=[
'example:dq_demo',
'scope:staging',
'engine:duckdb',
'engine:postgres',
'engine:databricks_spark',
'engine:bigquery',
'engine:snowflake_snowpark'
],
) }}
select
cast(order_id as int) as order_id,
cast(customer_id as int) as customer_id,
cast(amount as double) as amount,
cast(order_ts as timestamp) as order_ts
from {{ source('crm', 'orders') }};
This is important for:
- numeric checks (
greater_equal,non_negative_sum) - timestamp-based
freshnesschecks
3. Mart: mart_orders_agg.ff.sql
Aggregates orders per customer and prepares data for reconciliation + freshness:
{{ config(
materialized='table',
tags=[
'example:dq_demo',
'scope:mart',
'engine:duckdb',
'engine:postgres',
'engine:databricks_spark',
'engine:bigquery',
'engine:snowflake_snowpark'
],
) }}
-- Aggregate orders per customer for DQ & reconciliation tests
with base as (
select
o.order_id,
o.customer_id,
-- Ensure numeric and timestamp types for downstream DQ checks
cast(o.amount as double) as amount,
cast(o.order_ts as timestamp) as order_ts,
c.name as customer_name,
c.status as customer_status
from {{ ref('orders.ff') }} o
join {{ ref('customers.ff') }} c
on o.customer_id = c.customer_id
)
select
customer_id,
customer_name,
customer_status as status,
count(*) as order_count,
sum(amount) as total_amount,
min(order_ts) as first_order_ts,
max(order_ts) as last_order_ts
from base
group by customer_id, customer_name, customer_status;
The important columns for DQ tests are:
status→ used foraccepted_valuesorder_countandtotal_amount→ used for numeric and reconciliation testslast_order_ts→ used forfreshness
Data quality configuration (project.yml)¶
All tests live under project.yml → tests:.
This example uses the tag example:dq_demo for easy selection.
Column-level checks¶
tests:
# 1) IDs must be present and unique
- type: not_null
table: customers
column: customer_id
tags: [example:dq_demo, batch]
- type: unique
table: customers
column: customer_id
tags: [example:dq_demo, batch]
# 2) Order amounts must be >= 0
- type: greater_equal
table: orders
column: amount
threshold: 0
tags: [example:dq_demo, batch]
# 3) Total sum of amounts must not be negative
- type: non_negative_sum
table: orders
column: amount
tags: [example:dq_demo, batch]
# 4) Customer status values must be within a known set
- type: accepted_values
table: mart_orders_agg
column: status
values: ["active", "inactive", "prospect"]
severity: warn # show as warning, not hard failure
tags: [example:dq_demo, batch]
# 5) Row count sanity check on mart
- type: row_count_between
table: mart_orders_agg
min_rows: 1
max_rows: 100000
tags: [example:dq_demo, batch]
# 6) Freshness: last order in the mart must not be "too old"
- type: freshness
table: mart_orders_agg
column: last_order_ts
max_delay_minutes: 100000000
tags: [example:dq_demo, batch]
# 7) Custom Python test: ensure at least a given share of positive amounts
- type: min_positive_share
table: orders
column: amount
params:
min_share: 0.75
where: "amount <> 0"
tags: [example:dq_demo, batch]
# 8) Custom SQL test: no future orders allowed
- type: no_future_orders
table: orders
column: order_ts
params:
where: "amount <> 0"
tags: [example:dq_demo, batch]
Cross-table reconciliations¶
# 7) Reconcile total revenue between orders and mart
- type: reconcile_equal
name: total_amount_orders_vs_mart
tags: [example:dq_demo, reconcile]
left:
table: orders
expr: "sum(amount)"
right:
table: mart_orders_agg
expr: "sum(total_amount)"
abs_tolerance: 0.01
# 8) Ratio of sums should be ~1 (within tight bounds)
- type: reconcile_ratio_within
name: total_amount_ratio
tags: [example:dq_demo, reconcile]
left:
table: orders
expr: "sum(amount)"
right:
table: mart_orders_agg
expr: "sum(total_amount)"
min_ratio: 0.999
max_ratio: 1.001
# 9) Row count diff between orders and mart should be bounded
- type: reconcile_diff_within
name: order_count_diff
tags: [example:dq_demo, reconcile]
left:
table: orders
expr: "count(*)"
right:
table: mart_orders_agg
expr: "sum(order_count)"
max_abs_diff: 0
# 10) Coverage: every customer should appear in the mart
- type: reconcile_coverage
name: customers_covered_in_mart
tags: [example:dq_demo, reconcile]
source:
table: customers
key: "customer_id"
target:
table: mart_orders_agg
key: "customer_id"
This set of tests touches all available test types and ties directly back to the simple data model.
Custom DQ tests (Python & SQL)¶
The demo also shows how to define custom data quality tests that integrate with:
- the
project.yml → tests:block, - the
fft testCLI, - and the same summary output as built-in tests.
Python-based test: min_positive_share¶
File: examples/dq_demo/tests/dq/min_positive_share.ff.py
from __future__ import annotations
from typing import Any
from fastflowtransform.decorators import dq_test
from fastflowtransform.testing import base as testing
@dq_test("min_positive_share")
def min_positive_share(
con: Any,
table: str,
column: str | None,
params: dict[str, Any],
) -> tuple[bool, str | None, str | None]:
"""
Custom DQ test: require that at least `min_share` of rows have column > 0.
Parameters (from project.yml → tests → params):
- min_share: float in [0,1], e.g. 0.75
- where: optional filter (string) to restrict the population
"""
if column is None:
example = f"select count(*) from {table} where <column> > 0"
return False, "min_positive_share requires a 'column' parameter", example
# Params come from project.yml under `params:`
cfg: dict[str, Any] = params.get("params") or params # project.yml wrapper
min_share: float = cfg["min_share"]
where: str | None = cfg.get("where")
where_clause = f" where {where}" if where else ""
total_sql = f"select count(*) from {table}{where_clause}"
if where:
pos_sql = f"select count(*) from {table}{where_clause} and {column} > 0"
else:
pos_sql = f"select count(*) from {table} where {column} > 0"
total = testing._scalar(con, total_sql)
positives = testing._scalar(con, pos_sql)
example_sql = f"{pos_sql}; -- positives\n{total_sql}; -- total"
if not total:
return False, f"min_positive_share: table {table} is empty", example_sql
share = float(positives or 0) / float(total)
if share < min_share:
msg = (
f"min_positive_share failed: positive share {share:.4f} "
f"< required {min_share:.4f} "
f"({positives} of {total} rows have {column} > 0)"
)
return False, msg, example_sql
return True, None, example_sql
````
This test is wired up from `project.yml` like this:
```yaml
- type: min_positive_share
table: orders
column: amount
params:
min_share: 0.75
where: "amount <> 0"
tags: [example:dq_demo, batch]
SQL-based test: no_future_orders¶
File: examples/dq_demo/tests/dq/no_future_orders.ff.sql
{{ config(
type="no_future_orders",
params=["where"]
) }}
-- Custom DQ test: fail if any row has a timestamp in the future.
--
-- Conventions:
-- - {{ table }} : table name (e.g. "orders")
-- - {{ column }} : timestamp column (e.g. "order_ts")
-- - {{ where }} : optional filter, passed via params["where"]
select count(*) as failures
from {{ table }}
where {{ column }} > current_timestamp
{%- if where %} and ({{ where }}){%- endif %}
And the corresponding project.yml test:
- type: no_future_orders
table: orders
column: order_ts
params:
where: "amount <> 0"
tags: [example:dq_demo, batch]
At runtime:
- The SQL file is discovered under
tests/**/*.ff.sql. {{ config(...) }}tells FFT the logicaltypeand allowedparams.fft testvalidates yourparams:fromproject.ymlagainst this schema and then executes the rendered SQL as a “violation count” query (0= pass,>0= fail).
Running the demo¶
Assuming you are in the repo root and using DuckDB as a starting point:
1. Seed the data¶
fft seed examples/dq_demo --env dev_duckdb
This reads seeds/customers.csv and seeds/orders.csv and materializes them as tables referenced by sources.yml.
2. Run the models¶
fft run examples/dq_demo --env dev_duckdb
This builds:
customers(staging)orders(staging)mart_orders_agg(mart)
3. Run all DQ tests¶
fft test examples/dq_demo --env dev_duckdb --select tag:example:dq_demo
You should see a summary like:
Data Quality Summary
────────────────────
✅ not_null customers.customer_id
✅ unique customers.customer_id
✅ greater_equal orders.amount
✅ non_negative_sum orders.amount
❕ accepted_values mart_orders_agg.status
✅ row_count_between mart_orders_agg
✅ freshness mart_orders_agg.last_order_ts
✅ reconcile_equal total_amount_orders_vs_mart
✅ reconcile_ratio_within total_amount_ratio
✅ reconcile_diff_within order_count_diff
✅ reconcile_coverage customers_covered_in_mart
Totals
──────
✓ passed: 10
! warnings: 1
(Exact output will differ, but you’ll see pass/failed/warned checks listed.)
4. Run only reconciliation tests¶
fft test examples/dq_demo --env dev_duckdb --select tag:reconcile
This executes just the cross-table checks, which is handy when you’re iterating on a mart.
BigQuery variant (pandas or BigFrames)¶
To run the same demo on BigQuery:
- Copy
.env.dev_bigquery_pandasor.env.dev_bigquery_bigframesto.envand fill in:FF_BQ_PROJECT=<your-project-id> FF_BQ_DATASET=dq_demo FF_BQ_LOCATION=<region> # e.g., EU or US GOOGLE_APPLICATION_CREDENTIALS=../secrets/<service-account>.json # or rely on gcloud / WIF - Run via the Makefile from
examples/dq_demo:make demo ENGINE=bigquery BQ_FRAME=pandas # or bigframes
Both profiles accept allow_create_dataset in profiles.yml if you want the example to create the dataset automatically.
Snowflake Snowpark variant¶
To run on Snowflake:
- Copy
.env.dev_snowflaketo.envand populate:FF_SF_ACCOUNT=<account> FF_SF_USER=<user> FF_SF_PASSWORD=<password> FF_SF_WAREHOUSE=COMPUTE_WH FF_SF_DATABASE=DQ_DEMO FF_SF_SCHEMA=DQ_DEMO FF_SF_ROLE=<optional-role> - Install the Snowflake extra if needed:
pip install "fastflowtransform[snowflake]" - Run via the Makefile:
make demo ENGINE=snowflake_snowpark
The Snowflake profile enables allow_create_schema, so the schema is created automatically on first run when permitted.
Things to experiment with¶
To understand the tests better, intentionally break the data and re-run fft test:
- Set one
customers.customer_idtoNULL→ watchnot_nullfail. - Duplicate a
customer_id→ watchuniquefail. - Put a negative
amountinorders.csv→greater_equalandnon_negative_sumfail. - Add a new
statusvalue (e.g."paused") →accepted_valueswarns. - Drop a customer from
mart_orders_aggmanually (or filter it out in SQL) →reconcile_coveragefails. - Change an amount in the mart only → reconciliation tests fail.
This makes it very clear what each test guards against.
Summary¶
The Data Quality Demo is designed to be:
- Small and readable – customers, orders, and a single mart.
- Complete – exercises every built-in FFT DQ test type.
-
Practical – real-world patterns like:
-
typing in staging models,
- testing freshness on a mart timestamp,
- reconciling sums and row counts across tables.
Once you’re comfortable with this example, you can copy the patterns into your real project: start with staging-level checks, then layer in reconciliations and freshness on your most important marts.
Tip – Source vs. table freshness
The demo uses the
freshnesstest type on the mart (mart_orders_agg.last_order_ts). For source-level freshness (e.g. “when wascrm.orderslast loaded?”), define freshness rules on your sources and run:fft source freshness examples/dq_demo --env dev_duckdbThis complements table-level DQ tests by checking whether your inputs are recent enough before you even build marts.