Skip to content

fastflowtransform.stdlib

sql_safe_cast

sql_safe_cast(expr, target_type, *, default=None, engine=None)

Engine-aware “safe cast” builder.

Semantics by engine

DuckDB: TRY_CAST(expr AS type) BigQuery: SAFE_CAST(expr AS type) Spark (3.x+): TRY_CAST(expr AS type) Snowflake: CAST(expr AS type) # TRY_CAST(FLOAT -> NUMBER) is not supported Postgres / Redshift / Generic: CAST(expr AS type)

If default is provided, it is treated as a raw SQL snippet and wrapped via COALESCE(…, default).

Source code in src/fastflowtransform/stdlib/casts.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def sql_safe_cast(
    expr: str,
    target_type: str,
    *,
    default: str | None = None,
    engine: str | None = None,
) -> str:
    """
    Engine-aware “safe cast” builder.

    Semantics by engine
    -------------------
    DuckDB:
        TRY_CAST(expr AS type)
    BigQuery:
        SAFE_CAST(expr AS type)
    Spark (3.x+):
        TRY_CAST(expr AS type)
    Snowflake:
        CAST(expr AS type)      # TRY_CAST(FLOAT -> NUMBER) is not supported
    Postgres / Redshift / Generic:
        CAST(expr AS type)

    If `default` is provided, it is treated as a raw SQL snippet and
    wrapped via COALESCE(…, default).
    """
    eng = normalize_engine(engine)
    expr_sql = expr.strip()
    raw_type = target_type.strip()
    if not expr_sql:
        raise ValueError("expr must be a non-empty SQL expression")
    if not raw_type:
        raise ValueError("target_type must be a non-empty SQL type")

    # Normalize logical numeric/decimal types per engine
    norm = raw_type.lower()
    if norm in {"numeric", "number", "decimal"}:
        if eng == "bigquery":
            # BigQuery fixed-precision decimal
            type_sql = "NUMERIC"
        elif eng in {"duckdb", "postgres", "redshift"}:
            type_sql = "NUMERIC"
        elif eng == "snowflake":
            # Use a concrete NUMBER with scale, but via plain CAST (no TRY_CAST)
            type_sql = "NUMBER(38,10)"
        else:
            type_sql = "NUMERIC"
    else:
        type_sql = raw_type

    # Engine-specific safe cast core
    if eng == "bigquery":
        inner = f"SAFE_CAST({expr_sql} AS {type_sql})"
    elif eng == "duckdb":
        inner = f"try_cast({expr_sql} AS {type_sql})"
    elif eng == "spark":
        inner = f"TRY_CAST({expr_sql} AS {type_sql})"
    elif eng == "snowflake":
        # TRY_CAST(FLOAT -> NUMBER(...)) is not allowed, so we use plain CAST
        inner = f"CAST({expr_sql} AS {type_sql})"
    else:
        # Fallback: no TRY_/SAFE_ variant
        inner = f"CAST({expr_sql} AS {type_sql})"

    if default is not None:
        default_sql = default.strip()
        if not default_sql:
            return inner
        return f"COALESCE({inner}, {default_sql})"

    return inner

sql_date_add

sql_date_add(expr, part, amount, *, engine=None)

Build an engine-aware date / timestamp addition expression.

Parameters

expr: SQL expression / column reference to add to. part: "day", "month", "year", ... (engine-specific support may vary). amount: Integer offset (positive or negative). engine: Engine key/hint ("duckdb", "postgres", "bigquery", "snowflake", "spark", ...).

Examples (golden SQL)

DuckDB / Postgres / Redshift / Generic: sql_date_add("order_date", "day", 3, engine="duckdb") -> "CAST(order_date AS TIMESTAMP) + INTERVAL '3 day'"

Snowflake

sql_date_add("created_at", "month", 1, engine="snowflake") -> "DATEADD(MONTH, 1, created_at)"

BigQuery

sql_date_add("order_date", "day", -7, engine="bigquery") -> "DATE_ADD(order_date, INTERVAL -7 DAY)"

Source code in src/fastflowtransform/stdlib/dates.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def sql_date_add(
    expr: str,
    part: str,
    amount: int,
    *,
    engine: str | None = None,
) -> str:
    """
    Build an engine-aware date / timestamp addition expression.

    Parameters
    ----------
    expr:
        SQL expression / column reference to add to.
    part:
        "day", "month", "year", ... (engine-specific support may vary).
    amount:
        Integer offset (positive or negative).
    engine:
        Engine key/hint ("duckdb", "postgres", "bigquery", "snowflake", "spark", ...).

    Examples (golden SQL)
    ---------------------
    DuckDB / Postgres / Redshift / Generic:
        sql_date_add("order_date", "day", 3, engine="duckdb")
        -> "CAST(order_date AS TIMESTAMP) + INTERVAL '3 day'"

    Snowflake:
        sql_date_add("created_at", "month", 1, engine="snowflake")
        -> "DATEADD(MONTH, 1, created_at)"

    BigQuery:
        sql_date_add("order_date", "day", -7, engine="bigquery")
        -> "DATE_ADD(order_date, INTERVAL -7 DAY)"
    """
    eng = normalize_engine(engine)
    expr_sql = _clean_expr(expr)
    part_norm = part.strip().lower()
    if not part_norm:
        raise ValueError("date part must be a non-empty string")
    amt = int(amount)

    if eng in {"duckdb", "postgres", "redshift", "generic"}:
        # For these engines we usually want TIMESTAMP + INTERVAL.
        # Heuristic: if the expression already contains a cast, don't wrap it again.
        lower_expr = expr_sql.lower()
        already_cast = (
            "cast(" in lower_expr
            or "::timestamp" in lower_expr
            or "::timestamptz" in lower_expr
            or "::date" in lower_expr
        )
        base_expr = expr_sql if already_cast else f"CAST({expr_sql} AS TIMESTAMP)"
        return f"{base_expr} + INTERVAL '{amt} {part_norm}'"

    if eng == "spark":
        # Spark has DATE_ADD(date, days) for day-precision, but not all parts.
        if part_norm == "day":
            return f"date_add({expr_sql}, {amt})"
        # fall back to ANSI-ish INTERVAL for other parts
        return f"{expr_sql} + INTERVAL {amt} {part_norm.upper()}"

    if eng == "snowflake":
        part_upper = part_norm.upper()
        # Make sure we're not doing VARCHAR + INTERVAL
        expr_ts = f"TO_TIMESTAMP({expr_sql})"
        return f"DATEADD({part_upper}, {amt}, {expr_ts})"

    if eng == "bigquery":
        part_upper = part_norm.upper()

        # If the user already passed a CAST(...) or SAFE_CAST(...), don't double-wrap.
        lower_expr = expr_sql.lower().replace(" ", "")
        already_casted = lower_expr.startswith("cast(") or lower_expr.startswith("safe_cast(")

        expr_for_bq = expr_sql
        if not already_casted:
            # Be permissive: coerce to TIMESTAMP so strings like '2025-10-01T12:00:00'
            # work out of the box.
            expr_for_bq = f"CAST({expr_sql} AS TIMESTAMP)"

        # For dates/timestamps BigQuery supports this signature:
        #   DATE_ADD(timestamp_expr, INTERVAL amt PART)
        return f"DATE_ADD({expr_for_bq}, INTERVAL {amt} {part_upper})"

    # Fallback: ANSI-ish
    return f"{expr_sql} + INTERVAL '{amt} {part_norm}'"

sql_date_trunc

sql_date_trunc(expr, part='day', *, engine=None)

Build an engine-aware DATE_TRUNC expression.

Parameters

expr: SQL expression / column reference, e.g. "order_date" or "CAST(ts AS TIMESTAMP)". part: Date part like "day", "month", "year", "week", ... engine: Engine key/hint (e.g. "duckdb", "postgres", "bigquery"). If omitted, "generic" semantics are used.

Examples (golden SQL)

DuckDB / Postgres / Redshift / Snowflake / Spark: sql_date_trunc("order_date", "day", engine="duckdb") -> "date_trunc('day', order_date)"

BigQuery

sql_date_trunc("order_date", "day", engine="bigquery") -> "DATE_TRUNC(order_date, DAY)"

Generic

sql_date_trunc("created_at", "month") -> "date_trunc('month', created_at)"

Source code in src/fastflowtransform/stdlib/dates.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def sql_date_trunc(expr: str, part: str = "day", *, engine: str | None = None) -> str:
    """
    Build an engine-aware DATE_TRUNC expression.

    Parameters
    ----------
    expr:
        SQL expression / column reference, e.g. "order_date" or "CAST(ts AS TIMESTAMP)".
    part:
        Date part like "day", "month", "year", "week", ...
    engine:
        Engine key/hint (e.g. "duckdb", "postgres", "bigquery").
        If omitted, "generic" semantics are used.

    Examples (golden SQL)
    ---------------------
    DuckDB / Postgres / Redshift / Snowflake / Spark:
        sql_date_trunc("order_date", "day", engine="duckdb")
        -> "date_trunc('day', order_date)"

    BigQuery:
        sql_date_trunc("order_date", "day", engine="bigquery")
        -> "DATE_TRUNC(order_date, DAY)"

    Generic:
        sql_date_trunc("created_at", "month")
        -> "date_trunc('month', created_at)"
    """
    eng = normalize_engine(engine)
    expr_sql = _clean_expr(expr)
    part_norm = part.strip().lower()
    if not part_norm:
        raise ValueError("date part must be a non-empty string")

    # Engines like DuckDB want date_trunc('<part>', <TIMESTAMP>)
    if eng in {"duckdb", "postgres", "redshift", "snowflake", "spark", "generic"}:
        return f"date_trunc('{part_norm}', CAST({expr_sql} AS TIMESTAMP))"

    if eng == "bigquery":
        # DATE_TRUNC(timestamp_expression, date_part)
        part_upper = part_norm.upper()
        return f"DATE_TRUNC(CAST({expr_sql} AS TIMESTAMP), {part_upper})"

    # Fallback: ANSI-ish
    return f"date_trunc('{part_norm}', CAST({expr_sql} AS TIMESTAMP))"

engine_family

engine_family(engine)

Return a broad engine family key.

For now this is identical to normalize_engine(), but having a separate function makes it easy to distinguish “exact engine” vs “family” later.

Source code in src/fastflowtransform/stdlib/engine.py
54
55
56
57
58
59
60
61
def engine_family(engine: str | None) -> str:
    """
    Return a broad engine *family* key.

    For now this is identical to normalize_engine(), but having a separate
    function makes it easy to distinguish “exact engine” vs “family” later.
    """
    return normalize_engine(engine)

is_engine

is_engine(engine, *candidates)

Convenience helper: check if engine matches any of the given candidates.

Examples

is_engine("duckdb", "duckdb", "postgres") True is_engine("bigquery", "duckdb", "postgres") False

Source code in src/fastflowtransform/stdlib/engine.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def is_engine(engine: str | None, *candidates: str) -> bool:
    """
    Convenience helper: check if `engine` matches any of the given candidates.

    Examples
    --------
    >>> is_engine("duckdb", "duckdb", "postgres")
    True
    >>> is_engine("bigquery", "duckdb", "postgres")
    False
    """
    norm = normalize_engine(engine)
    cand_norm = {normalize_engine(c) for c in candidates}
    return norm in cand_norm

normalize_engine

normalize_engine(engine)

Normalize an engine string into a canonical key.

  • None / empty → "generic"
  • Known aliases → canonical (e.g. "postgresql" → "postgres")
  • Other values → lower-case as-is

Examples

normalize_engine("Postgres") 'postgres' normalize_engine("databricks_spark") 'spark' normalize_engine(None) 'generic'

Source code in src/fastflowtransform/stdlib/engine.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def normalize_engine(engine: str | None) -> str:
    """
    Normalize an engine string into a canonical key.

    - None / empty → "generic"
    - Known aliases → canonical (e.g. "postgresql" → "postgres")
    - Other values  → lower-case as-is

    Examples
    --------
    >>> normalize_engine("Postgres")
    'postgres'
    >>> normalize_engine("databricks_spark")
    'spark'
    >>> normalize_engine(None)
    'generic'
    """
    if not engine:
        return "generic"
    key = engine.strip().lower()
    if not key:
        return "generic"
    return _ENGINE_ALIASES.get(key, key)

sql_partition_filter

sql_partition_filter(column, start=None, end=None, *, engine=None)

Build a WHERE predicate for a range of partition values.

Semantics
  • start only → col >=
  • end only → col <=
  • both → col BETWEEN AND
  • neither → "1=1" (no-op filter)

start and end are Python values and will be converted with sql_literal(), so you can safely pass datetime.date, datetime.datetime, strings, ints, etc.

Parameters

column: Partition column name / expression, e.g. "ds" or "DATE(event_time)". start, end: Python values interpreted as partition bounds. engine: Currently unused but accepted so callers can pass it consistently.

Examples (golden SQL)

Daily date partition: sql_partition_filter("ds", date(2024, 1, 1), date(2024, 1, 31)) -> "ds BETWEEN '2024-01-01' AND '2024-01-31'"

Open interval

sql_partition_filter("ds", start=date(2024, 1, 1), end=None) -> "ds >= '2024-01-01'"

Source code in src/fastflowtransform/stdlib/partitions.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def sql_partition_filter(
    column: str,
    start: Any | None = None,
    end: Any | None = None,
    *,
    engine: str | None = None,  # reserved for future engine-specific tweaks
) -> str:
    """
    Build a WHERE predicate for a *range* of partition values.

    Semantics:
      - start only → col >= <start_literal>
      - end only   → col <= <end_literal>
      - both       → col BETWEEN <start_literal> AND <end_literal>
      - neither    → "1=1" (no-op filter)

    `start` and `end` are Python values and will be converted with sql_literal(),
    so you can safely pass `datetime.date`, `datetime.datetime`, strings, ints, etc.

    Parameters
    ----------
    column:
        Partition column name / expression, e.g. "ds" or "DATE(event_time)".
    start, end:
        Python values interpreted as partition bounds.
    engine:
        Currently unused but accepted so callers can pass it consistently.

    Examples (golden SQL)
    ---------------------
    Daily date partition:
        sql_partition_filter("ds", date(2024, 1, 1), date(2024, 1, 31))
        -> "ds BETWEEN '2024-01-01' AND '2024-01-31'"

    Open interval:
        sql_partition_filter("ds", start=date(2024, 1, 1), end=None)
        -> "ds >= '2024-01-01'"
    """
    _ = normalize_engine(engine)  # placeholder for future branching

    col = column.strip()
    if not col:
        raise ValueError("column must be a non-empty SQL expression")

    if start is None and end is None:
        return "1=1"

    conds: list[str] = []
    if start is not None and end is not None:
        conds.append(f"{col} BETWEEN {_lit(start)} AND {_lit(end)}")
    else:
        if start is not None:
            conds.append(f"{col} >= {_lit(start)}")
        if end is not None:
            conds.append(f"{col} <= {_lit(end)}")

    return " AND ".join(conds)

sql_partition_in

sql_partition_in(column, values, *, engine=None)

Build an IN() predicate for a set of partition values.

  • Empty values → "1=0" (guaranteed false, useful for guard rails).
  • Non-empty → col IN (, , ...)

Examples (golden SQL)

Daily partitions: sql_partition_in("ds", [date(2024, 1, 1), date(2024, 1, 2)]) -> "ds IN ('2024-01-01', '2024-01-02')"

String partitions

sql_partition_in("region", ["EU", "US"]) -> "region IN ('EU', 'US')"

Source code in src/fastflowtransform/stdlib/partitions.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def sql_partition_in(
    column: str,
    values: Iterable[Any],
    *,
    engine: str | None = None,  # reserved for future engine-specific tweaks
) -> str:
    """
    Build an IN() predicate for a set of partition values.

    - Empty values → "1=0" (guaranteed false, useful for guard rails).
    - Non-empty   → col IN (<literal1>, <literal2>, ...)

    Examples (golden SQL)
    ---------------------
    Daily partitions:
        sql_partition_in("ds", [date(2024, 1, 1), date(2024, 1, 2)])
        -> "ds IN ('2024-01-01', '2024-01-02')"

    String partitions:
        sql_partition_in("region", ["EU", "US"])
        -> "region IN ('EU', 'US')"
    """
    _ = normalize_engine(engine)  # placeholder for future branching

    col = column.strip()
    if not col:
        raise ValueError("column must be a non-empty SQL expression")

    vals = list(values or [])
    if not vals:
        return "1=0"

    literals = ", ".join(_lit(v) for v in vals)
    return f"{col} IN ({literals})"

sql_literal

sql_literal(value)

Convert a Python value into a SQL literal string.

  • None -> "NULL"
  • bool -> "TRUE"/"FALSE"
  • int/float -> "123" (no quotes)
  • str -> quoted with single quotes and escaped
  • other -> JSON-dumped and treated as a string literal
Source code in src/fastflowtransform/stdlib/sql.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def sql_literal(value: Any) -> str:
    """
    Convert a Python value into a SQL literal string.

    - None      -> "NULL"
    - bool      -> "TRUE"/"FALSE"
    - int/float -> "123" (no quotes)
    - str       -> quoted with single quotes and escaped
    - other     -> JSON-dumped and treated as a string literal
    """
    if value is None or isinstance(value, JinjaUndefined):
        return "NULL"

    if isinstance(value, bool):
        return "TRUE" if value else "FALSE"

    if isinstance(value, (int, float)):
        return str(value)

    if isinstance(value, str):
        # Simple quote-escape for single quotes
        escaped = value.replace("'", "''")
        return f"'{escaped}'"

    # Fallback: JSON (or str) and quote it
    try:
        json_text = json.dumps(value, separators=(",", ":"), sort_keys=True)
    except TypeError:
        json_text = str(value)
    return "'" + json_text.replace("'", "''") + "'"

register_jinja

register_jinja(env, *, engine_resolver=None, engine=None)

Register all stdlib helpers into a Jinja Environment.

Either pass
  • engine_resolver: a callable returning the current engine key, OR
  • engine: a fixed engine key string.
Example from core

from fastflowtransform import stdlib as ff_stdlib ff_stdlib.register_jinja(env, engine_resolver=self._current_engine)

Source code in src/fastflowtransform/stdlib/__init__.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def register_jinja(
    env: Environment,
    *,
    engine_resolver: Callable[[], str | None] | None = None,
    engine: str | None = None,
) -> None:
    """
    Register all stdlib helpers into a Jinja `Environment`.

    Either pass:
      - engine_resolver: a callable returning the current engine key, OR
      - engine: a fixed engine key string.

    Example from core:
        from fastflowtransform import stdlib as ff_stdlib
        ff_stdlib.register_jinja(env, engine_resolver=self._current_engine)
    """
    if engine is None and engine_resolver is not None:
        try:
            engine = engine_resolver()
        except Exception:
            engine = None

    # normalized current engine (e.g. "duckdb", "bigquery", "postgres", "generic")
    engine_key = normalize_engine(engine)

    def _bind_engine(fn: Callable) -> Callable:
        """
        Wrap a stdlib function so that Jinja templates automatically get
        the current engine injected as a default kwarg:

            {{ ff_date_trunc('day', 'col') }}

        becomes effectively:

            sql_date_trunc('day', 'col', engine=engine_key)
        """

        def wrapper(*args, **kwargs):
            kwargs.setdefault("engine", engine_key)
            return fn(*args, **kwargs)

        return wrapper

    # Register the engine-bound helpers
    for name, fn in _STD_FUNCS_ENGINE_DEFAULT.items():
        env.globals[name] = _bind_engine(fn)

    # Register low-level engine helpers (no auto-engine injection)
    for name, fn in _STD_FUNCS_RAW.items():
        env.globals[name] = fn

    # Template-friendly helpers that know the *current* engine:
    #   {{ ff_engine() }}
    #   {% if ff_is_engine('bigquery') %} ... {% endif %}
    def _ff_engine(default: str | None = None) -> str:
        # Prefer the active engine; fall back to a normalized default or "generic"
        if engine_key != "generic":
            return engine_key
        if default is not None:
            return normalize_engine(default)
        return engine_key

    def _ff_is_engine(*candidates: str) -> bool:
        return is_engine(engine_key, *candidates)

    env.globals["ff_engine"] = _ff_engine
    env.globals["ff_is_engine"] = _ff_is_engine