Data Quality Demo Project¶
The Data Quality Demo shows how to combine:
- Built-in FFT data quality tests
- Tests generated from data contracts (
*.contracts.yml+contracts.yml) - Custom DQ tests (Python & SQL)
- Multiple engines (DuckDB, Postgres, Databricks Spark, BigQuery, Snowflake Snowpark)
on a small, understandable model:
- Column checks (from contracts + project.yml):
column_physical_typenot_nulluniqueaccepted_valuesbetweenregex_matchgreater_equalnon_negative_sumrow_count_betweenfreshness-
relationships -
Cross-table reconciliations:
reconcile_equalreconcile_ratio_withinreconcile_diff_within-
reconcile_coverage -
Custom tests (demo):
min_positive_share(Python-based)no_future_orders(SQL-based)
It uses a simple customers / orders / mart setup so you can see exactly what each test does and how it fails when something goes wrong.
What this example demonstrates¶
- Basic column checks on staging tables
- Enforced via contracts (
*.contracts.yml+contracts.yml) and project tests: -
IDs are present / non-null, status values are constrained, numeric ranges are respected, physical types match the warehouse.
-
Freshness on a timestamp column
- Table-level
freshnesstest onorders.order_ts. -
Source-level freshness via
sources.ymlforcrm.customers/crm.orders. -
Row count sanity checks
-
Guard against empty tables and unexpectedly large row counts.
-
Cross-table reconciliations between staging and mart
-
Verify that sums and counts match between
ordersand the aggregatedmart_orders_agg, and that every order has a matching customer. -
Tagged tests and selective execution
-
All tests are tagged (e.g.
example:dq_demo,reconcile,fk,contract) so you can run exactly the subset you care about. -
Contracts-driven tests
- Per-table contracts plus project-wide defaults generate DQ tests
automatically (including
column_physical_type).
Project layout¶
examples/dq_demo/
.env.dev_bigquery_bigframes
.env.dev_bigquery_pandas
.env.dev_databricks
.env.dev_duckdb
.env.dev_postgres
.env.dev_snowflake
Makefile
README.md
contracts.yml
profiles.yml
project.yml
sources.yml
models/
README.md
marts/
mart_orders_agg.contracts.yml
mart_orders_agg.ff.sql
staging/
customers.contracts.yml
customers.ff.sql
orders.contracts.yml
orders.ff.sql
seeds/
README.md
schema.yml
seed_customers.csv
seed_orders.csv
tests/
dq/
min_positive_share.ff.py
no_future_orders.ff.sql
unit/
README.md
````
High level:
* **`.env.dev_*`** — engine-specific environment examples
* **`Makefile`** — convenience wrapper for seeding, running models, DAG HTML and tests
* **`profiles.yml`** — connection profiles for all engines
* **`project.yml`** — central place for **tests** (including reconciliations & custom DQ tests)
* **`contracts.yml`** — project-level **contract defaults**
* **`models/**.contracts.yml`** — per-table contracts
* **`sources.yml`** — source definitions + freshness on raw seeds
* **`seeds/`** — demo CSVs and seed schema
* **`tests/dq/`** — custom DQ tests (Python + SQL)
---
## Seeds
### `seeds/seed_customers.csv`
Simple customer dimension with a creation timestamp:
```csv
customer_id,name,status,created_at
1,Alice,active,2025-01-01T10:00:00
2,Bob,active,2025-01-02T11:00:00
3,Carol,inactive,2025-01-03T12:00:00
Columns:
customer_id– integername– stringstatus– string (active/inactive)created_at– ISO-8601 timestamp
seeds/seed_orders.csv¶
Order fact data with per-order timestamps:
order_id,customer_id,amount,order_ts
100,1,50.00,2025-01-10T09:00:00
101,1,20.00,2025-01-11T09:00:00
102,2,30.00,2025-01-11T10:00:00
103,3,0.00,2025-01-12T10:00:00
Columns:
order_id– integercustomer_id– integeramount– doubleorder_ts– ISO-8601 timestamp
One order has amount = 0.00 so the custom
min_positive_share test has something to complain about.
Seed schema and sources¶
seeds/schema.yml defines target placement and types:
targets:
seed_customers:
schema: dq_demo
seed_orders:
schema: dq_demo
columns:
seed_customers:
customer_id: integer
name: string
status: string
created_at:
type: timestamp
seed_orders:
order_id: integer
customer_id: integer
amount: double
order_ts:
type: timestamp
sources.yml exposes them as crm.customers and crm.orders with source
freshness:
version: 1
sources:
- name: crm
schema: dq_demo
tables:
- name: customers
identifier: seed_customers
description: "Seeded customers table"
freshness:
loaded_at_field: _ff_loaded_at
warn_after:
count: 60
period: minute
error_after:
count: 240
period: minute
- name: orders
identifier: seed_orders
description: "Seeded orders table"
freshness:
loaded_at_field: _ff_loaded_at
warn_after:
count: 60
period: minute
error_after:
count: 240
period: minute
Models¶
1. Staging: models/staging/customers.ff.sql¶
Materialized as a table; casts IDs and timestamps into proper types and prepares the customer dimension:
{{ config(
materialized='table',
tags=[
'example:dq_demo',
'scope:staging',
'engine:duckdb',
'engine:postgres',
'engine:databricks_spark',
'engine:bigquery',
'engine:snowflake_snowpark'
],
) }}
-- Staging table for customers
select
cast(customer_id as int) as customer_id,
name,
status,
cast(created_at as timestamp) as created_at
from {{ source('crm', 'customers') }};
2. Staging: models/staging/orders.ff.sql¶
Materialized as a table; ensures types are suitable for numeric and freshness checks:
{{ config(
materialized='table',
tags=[
'example:dq_demo',
'scope:staging',
'engine:duckdb',
'engine:postgres',
'engine:databricks_spark',
'engine:bigquery',
'engine:snowflake_snowpark'
],
) }}
-- Staging table for orders with proper types for DQ checks
select
cast(order_id as int) as order_id,
cast(customer_id as int) as customer_id,
cast(amount as numeric) as amount,
cast(order_ts as timestamp) as order_ts
from {{ source('crm', 'orders') }};
This is important for:
- Numeric checks (
greater_equal,non_negative_sum) - Timestamp-based
freshnesschecks onorder_ts - Relationships on
customer_id
3. Mart: models/marts/mart_orders_agg.ff.sql¶
Aggregates orders per customer and prepares data for reconciliation + freshness:
{{ config(
materialized='table',
tags=[
'example:dq_demo',
'scope:mart',
'engine:duckdb',
'engine:postgres',
'engine:databricks_spark',
'engine:bigquery',
'engine:snowflake_snowpark'
],
) }}
-- Aggregate orders per customer for reconciliation & freshness tests
with base as (
select
o.order_id,
o.customer_id,
-- Ensure numeric and timestamp types for downstream DQ checks
cast(o.amount as numeric) as amount,
cast(o.order_ts as timestamp) as order_ts,
c.name as customer_name,
c.status as customer_status
from {{ ref('orders.ff') }} o
join {{ ref('customers.ff') }} c
on o.customer_id = c.customer_id
)
select
customer_id,
customer_name,
customer_status as status,
count(*) as order_count,
sum(amount) as total_amount,
min(order_ts) as first_order_ts,
max(order_ts) as last_order_ts
from base
group by customer_id, customer_name, customer_status;
Key columns:
status→ used by contracts (enum)order_countandtotal_amount→ used by reconciliation testsfirst_order_ts/last_order_ts→ available for freshness & diagnostics
Contracts in the demo¶
The demo uses contracts for:
- Per-table contracts in
models/**.contracts.yml - Project-wide defaults in
contracts.yml
See docs/Contracts.md for the full specification; below is how the demo uses
it.
Project-level defaults: contracts.yml¶
version: 1
defaults:
columns:
- match:
name: ".*_id$"
type: integer
nullable: false
- match:
name: "created_at"
type: timestamp
nullable: false
- match:
name: ".*_ts$"
type: timestamp
nullable: true
description: "Timestamp-like but allowed to be null in some pipelines"
These rules say:
- Any column ending with
_idis an integer and not nullable. - Any
created_atis a non-null timestamp. - Any
*_tscolumn is a (possibly nullable) timestamp with a description.
Defaults are merged into per-table contracts, but never override explicit settings.
Example: models/staging/customers.contracts.yml¶
version: 1
table: customers
columns:
customer_id:
type: integer
nullable: false
physical:
duckdb: integer
postgres: integer
bigquery: INT64
snowflake_snowpark: NUMBER
databricks_spark: INT
name:
type: string
nullable: false
physical:
duckdb: VARCHAR
postgres: text
bigquery: STRING
snowflake_snowpark: TEXT
databricks_spark: STRING
status:
type: string
nullable: false
enum:
- active
- inactive
physical:
duckdb: VARCHAR
postgres: text
bigquery: STRING
snowflake_snowpark: TEXT
databricks_spark: STRING
created_at:
type: timestamp
nullable: false
physical:
duckdb: TIMESTAMP
postgres: timestamp without time zone
bigquery: TIMESTAMP
snowflake_snowpark: TIMESTAMP_NTZ
databricks_spark: TIMESTAMP
At runtime, these contracts get turned into tests:
column_physical_typefor each column withphysicalnot_nullfor columns withnullable: falseaccepted_valuesforstatusviaenum- Plus any inherited defaults from
contracts.yml
Similarly, the mart has a contract at
models/marts/mart_orders_agg.contracts.yml specifying types, nullability,
and enums.
Data quality configuration (project.yml)¶
All explicit tests live under project.yml → tests:.
Contracts produce additional tests with the tag contract.
The demo uses the tag example:dq_demo for easy selection.
Single-table & relationships tests¶
tests:
# --- Single-table checks ----------------------------------------------------
- type: row_count_between
table: customers
min_rows: 1
max_rows: 100
tags: [example:dq_demo, batch]
- type: greater_equal
table: orders
column: amount
threshold: 0
tags: [example:dq_demo, batch]
- type: non_negative_sum
table: orders
column: amount
tags: [example:dq_demo, batch]
- type: relationships
table: orders
column: customer_id
to: "ref('customers.ff')"
to_field: customer_id
tags: [example:dq_demo, fk]
# Large max_delay_minutes so the example typically passes;
# adjust down in real projects to enforce freshness SLAs.
- type: freshness
table: orders
column: order_ts
max_delay_minutes: 100000000
tags: [example:dq_demo, batch]
What these do:
row_count_between— ensurecustomersis not empty and not unexpectedly large.greater_equal/non_negative_sum— protect against negativeamountand weird aggregates.relationships— enforces referential integrity: everyorders.customer_idmust exist incustomers.customer_id.freshness— checks that the latestorder_tsis recent enough.
Reconciliation tests¶
# --- Reconciliation checks --------------------------------------------------
- type: reconcile_equal
name: orders_total_matches_mart
tags: [example:dq_demo, reconcile]
left:
table: orders
expr: "sum(amount)"
right:
table: mart_orders_agg
expr: "sum(total_amount)"
abs_tolerance: 0.0
- type: reconcile_ratio_within
name: order_counts_match
tags: [example:dq_demo, reconcile]
left:
table: mart_orders_agg
expr: "sum(order_count)"
right:
table: orders
expr: "count(*)"
min_ratio: 0.999
max_ratio: 1.001
- type: reconcile_diff_within
name: customers_vs_orders_volume
tags: [example:dq_demo, reconcile]
left:
table: orders
expr: "count(*)"
right:
table: customers
expr: "count(*)"
max_abs_diff: 10
- type: reconcile_coverage
name: all_orders_have_customers
tags: [example:dq_demo, reconcile]
source:
table: orders
key: "customer_id"
target:
table: customers
key: "customer_id"
These checks ensure:
- Sums match between raw
orders.amountandmart_orders_agg.total_amount. - The number of rows in
ordersmatches the sum oforder_countin the mart. - Overall orders vs customers volume stays within a reasonable bound.
- All orders reference an existing customer (coverage).
Custom tests¶
# --- Custom tests --------------------------------------------------
- type: no_future_orders
table: orders
column: order_ts
where: "order_ts is not null"
tags: [example:dq_demo, batch]
- type: min_positive_share
table: orders
column: amount
params:
min_share: 0.75
where: "amount <> 0"
tags: [example:dq_demo, batch]
no_future_orders— SQL-based test that fails if any order has a timestamp in the future.min_positive_share— Python-based test that requires a minimum share of positive values inamount.
Custom DQ tests (Python & SQL)¶
The demo shows how to define custom data quality tests that integrate with:
project.yml → tests:fft test- The same summary output as built-in tests.
Python-based test: min_positive_share¶
File: examples/dq_demo/tests/dq/min_positive_share.ff.py
# examples/dq_demo/tests/dq/min_positive_share.ff.py
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, ConfigDict
from fastflowtransform.decorators import dq_test
from fastflowtransform.testing import base as testing
class MinPositiveShareParams(BaseModel):
"""
Params for the min_positive_share test.
- min_share: required minimum share of positive values in [0, 1]
- where: optional WHERE predicate to filter rows
"""
model_config = ConfigDict(extra="forbid")
min_share: float = 0.5
where: str | None = None
@dq_test("min_positive_share", params_model=MinPositiveShareParams)
def min_positive_share(
executor: Any,
table: str,
column: str | None,
params: dict[str, Any],
) -> tuple[bool, str | None, str | None]:
"""
Require that at least `min_share` of rows have column > 0.
"""
if column is None:
example = f"select count(*) from {table} where <column> > 0"
return False, "min_positive_share requires a 'column' parameter", example
min_share: float = params["min_share"]
where: str | None = params.get("where")
where_clause = f" where {where}" if where else ""
total_sql = f"select count(*) from {table}{where_clause}"
if where:
pos_sql = f"{total_sql} and {column} > 0"
else:
pos_sql = f"select count(*) from {table} where {column} > 0"
total = testing._scalar(executor, total_sql)
positives = testing._scalar(executor, pos_sql)
example_sql = f"{pos_sql}; -- positives\n{total_sql}; -- total"
if not total:
return False, f"min_positive_share: table {table} is empty", example_sql
share = float(positives or 0) / float(total)
if share < min_share:
msg = (
f"min_positive_share failed: positive share {share:.4f} "
f"< required {min_share:.4f} "
f"({positives} of {total} rows have {column} > 0"
+ (f" where {where}" if where else "")
+ ")"
)
return False, msg, example_sql
return True, None, example_sql
Wiring in project.yml:
- type: min_positive_share
table: orders
column: amount
params:
min_share: 0.75
where: "amount <> 0"
tags: [example:dq_demo, batch]
SQL-based test: no_future_orders¶
File: examples/dq_demo/tests/dq/no_future_orders.ff.sql
{{ config(
type="no_future_orders",
params=["where"]
) }}
-- Custom DQ test: fail if any row has a timestamp in the future.
--
-- Context variables injected by the runner:
-- {{ table }} : table name (e.g. "orders")
-- {{ column }} : timestamp column (e.g. "order_ts")
-- {{ where }} : optional filter (string), from params["where"]
-- {{ params }} : full params dict (validated), if you ever need it
select count(*) as failures
from {{ table }}
where {{ column }} > current_timestamp
{%- if where %} and ({{ where }}){%- endif %}
And the corresponding entry in project.yml:
- type: no_future_orders
table: orders
column: order_ts
where: "order_ts is not null"
tags: [example:dq_demo, batch]
At runtime:
- FFT discovers
*.ff.sqltest files undertests/dq/. {{ config(...) }}declares the testtypeand validparams.fft testvalidates and injects params, then executes the query as a “violation count” (0= pass,>0= fail).
Running the demo¶
From examples/dq_demo/, you can either:
- Use the Makefile (recommended), or
- Run
fftcommands manually.
Using the Makefile¶
Pick an engine:
# DuckDB
make demo ENGINE=duckdb
# Postgres
make demo ENGINE=postgres
# Databricks Spark
make demo ENGINE=databricks_spark
# BigQuery (pandas or BigFrames)
make demo ENGINE=bigquery BQ_FRAME=pandas
make demo ENGINE=bigquery BQ_FRAME=bigframes
# Snowflake Snowpark
make demo ENGINE=snowflake_snowpark
The demo target runs:
fft seed(load seeds)fft source freshnessfft run(build models)fft dag(generate DAG HTML)fft test(run DQ tests)- Prints locations of artifacts (manifest, run_results, catalog, DAG HTML)
Running manually (DuckDB example)¶
From the repo root:
# 1) Seed
fft seed examples/dq_demo --env dev_duckdb
# 2) Build models
fft run examples/dq_demo --env dev_duckdb
# 3) Run all DQ tests
fft test examples/dq_demo --env dev_duckdb --select tag:example:dq_demo
You’ll see a summary of:
- Tests derived from contracts (tag:
contract) - Explicit tests from
project.yml(tags:batch,reconcile,fk, …)
You can also run just reconciliations, just FK tests, etc.:
# Only reconciliation tests
fft test examples/dq_demo --env dev_duckdb --select tag:reconcile
# Only FK-style relationship tests
fft test examples/dq_demo --env dev_duckdb --select tag:fk
Things to experiment with¶
To understand the tests better, intentionally break the data and re-run
fft test:
- Set one
customers.customer_idtoNULL→not_null(from contracts) fails. - Duplicate a
customer_id→unique(from contracts) fails. - Put a negative
amountinseed_orders.csv→greater_equalandnon_negative_sumfail. - Change
statusto a value not in the enum →accepted_valuesfails. - Drop a customer from
customersor change an ID →relationshipsand reconciliation tests complain. - Change an amount in the mart only → reconciliation tests fail.
- Push an order timestamp into the future →
no_future_ordersfails. - Change a physical column type in the warehouse to disagree with the
contract →
column_physical_typefails.
This makes it very clear what each test guards against.
Summary¶
The Data Quality Demo is designed to be:
- Small and readable – customers, orders, and a single mart.
-
Complete – exercises:
-
Built-in FFT DQ tests,
- Tests generated from contracts,
- Custom Python & SQL tests.
-
Practical – real-world patterns like:
-
Typing in staging models,
- Testing freshness on staging tables and sources,
- Reconciling sums and row counts across tables,
- Enforcing physical types per engine.
Once you’re comfortable with this example, you can copy the patterns into your real projects:
- Start with contracts and simple column tests on staging.
- Add freshness on key timestamps and sources.
- Layer in reconciliations across marts and fact tables.
- Add custom tests when built-ins aren’t enough.
Tip – Source vs. table freshness
The demo uses:
freshnesstests on tables (orders.order_ts), andfreshnessinsources.yml(via_ff_loaded_at).Run source freshness with:
fft source freshness examples/dq_demo --env dev_duckdbThis complements table-level DQ tests by checking whether your inputs are recent enough before you even build marts.