Skip to content

fastflowtransform.logging

bind_context

bind_context(*, run_id=None, engine=None, env=None, node=None, invocation_id=None)

Bind fields that get injected into every record.

Source code in src/fastflowtransform/logging.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def bind_context(
    *,
    run_id: str | None = None,
    engine: str | None = None,
    env: str | None = None,
    node: str | None = None,
    invocation_id: str | None = None,
) -> None:
    """Bind fields that get injected into every record."""
    if invocation_id is not None:
        run_id = invocation_id
    if run_id is not None:
        _run_id.set(run_id)
    if engine is not None:
        _engine.set(engine)
    if env is not None:
        _env.set(env)
    if node is not None:
        _node.set(node)

clear_context

clear_context()

Clear all bound fields.

Source code in src/fastflowtransform/logging.py
 98
 99
100
101
102
103
def clear_context() -> None:
    """Clear all bound fields."""
    _run_id.set(None)
    _engine.set(None)
    _env.set(None)
    _node.set(None)

bound_context

bound_context(*, run_id=None, engine=None, env=None, node=None, invocation_id=None)

Temporarily bind (or override) selected fields. Only 'node' is auto-cleared on exit; other fields persist unless you pass new values.

Source code in src/fastflowtransform/logging.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@contextmanager
def bound_context(
    *,
    run_id: str | None = None,
    engine: str | None = None,
    env: str | None = None,
    node: str | None = None,
    invocation_id: str | None = None,
) -> Generator[None, None, None]:
    """
    Temporarily bind (or override) selected fields.
    Only 'node' is auto-cleared on exit; other fields persist unless you pass new values.
    """
    prev = (_run_id.get(), _engine.get(), _env.get(), _node.get())
    try:
        bind_context(
            run_id=run_id,
            engine=engine,
            env=env,
            node=node,
            invocation_id=invocation_id,
        )
        yield
    finally:
        # restore previous values; keep run_id/engine/env stable if you want by not overriding
        _run_id.set(prev[0])
        _engine.set(prev[1])
        _env.set(prev[2])
        _node.set(prev[3])

set_flags

set_flags(*, debug=None, sql_debug=None)

Set runtime flags (thread/task-local).

Source code in src/fastflowtransform/logging.py
137
138
139
140
141
142
def set_flags(*, debug: bool | None = None, sql_debug: bool | None = None) -> None:
    """Set runtime flags (thread/task-local)."""
    if debug is not None:
        _dbg.set(bool(debug))
    if sql_debug is not None:
        _sqldbg.set(bool(sql_debug))

setup

setup(*, level=_logging.INFO, json=False, to_stderr=False, propagate_sql=False)

Configure the 'fastflowtransform' logging tree. Idempotent: previous console handlers installed by this function are removed.

Source code in src/fastflowtransform/logging.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def setup(
    *,
    level: int = _logging.INFO,
    json: bool = False,
    to_stderr: bool = False,
    propagate_sql: bool = False,
) -> None:
    """
    Configure the 'fastflowtransform' logging tree.
    Idempotent: previous console handlers installed by this function are removed.
    """
    root = get_logger()  # 'fastflowtransform'
    root.setLevel(level)

    # Remove previous console handlers we installed
    for h in list(root.handlers):
        try:
            if h.get_name() == "ff_console":
                root.removeHandler(h)
        except Exception:
            if getattr(h, "name", None) == "ff_console":
                root.removeHandler(h)

    handler = _logging.StreamHandler(sys.stderr if to_stderr else sys.stdout)
    handler.set_name("ff_console")
    handler.setFormatter(_JsonFormatter() if json else _ConsoleFormatter())
    handler.addFilter(_CtxFilter())
    root.addHandler(handler)

    # SQL channel inherits formatter/sink via propagation
    sql = get_logger("sql")
    sql.setLevel(_logging.DEBUG if propagate_sql else level)
    sql.propagate = True

    # Also ensure a NullHandler at import-time on the root package
    _logging.getLogger("fastflowtransform").addHandler(_logging.NullHandler())

setup_from_cli_flags

setup_from_cli_flags(*, verbose=0, quiet=0, json=False, sql_debug=False, to_stderr=False)
Convenience for CLI

quiet >=1 → ERROR verbose=0 → WARNING verbose=1 → INFO verbose>=2 → DEBUG

Source code in src/fastflowtransform/logging.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def setup_from_cli_flags(
    *,
    verbose: int = 0,
    quiet: int = 0,
    json: bool = False,
    sql_debug: bool = False,
    to_stderr: bool = False,
) -> None:
    """
    Convenience for CLI:
      quiet >=1 → ERROR
      verbose=0 → WARNING
      verbose=1 → INFO
      verbose>=2 → DEBUG
    """
    verbose_debug_level = 2
    level = _logging.WARNING
    if quiet >= 1:
        level = _logging.ERROR
    elif verbose == 1:
        level = _logging.INFO
    elif verbose >= verbose_debug_level:
        level = _logging.DEBUG

    env_sql = os.getenv("FFT_SQL_DEBUG", "").lower() in ("1", "true", "yes", "on")
    setup(
        level=level,
        json=json,
        to_stderr=to_stderr,
        propagate_sql=(sql_debug or env_sql or verbose >= verbose_debug_level),
    )
    set_flags(
        debug=(verbose >= verbose_debug_level),
        sql_debug=(sql_debug or env_sql or verbose >= verbose_debug_level),
    )

get_logger

get_logger(name=None)

Return a namespaced logger under 'fastflowtransform'. get_logger() → 'fastflowtransform' get_logger("sql") → 'fastflowtransform.sql' get_logger("cli.run") → 'fastflowtransform.cli.run'

Source code in src/fastflowtransform/logging.py
279
280
281
282
283
284
285
286
287
def get_logger(name: str | None = None) -> _logging.Logger:
    """
    Return a namespaced logger under 'fastflowtransform'.
    get_logger()          → 'fastflowtransform'
    get_logger("sql")     → 'fastflowtransform.sql'
    get_logger("cli.run") → 'fastflowtransform.cli.run'
    """
    base = "fastflowtransform"
    return _logging.getLogger(base if not name else f"{base}.{name}")

echo

echo(message='', *, prefix=True, **kwargs)

Thin wrapper around typer.echo(...) that prepends the global log prefix.

Usage

echo("hello") echo("to stderr", err=True) echo("no newline", nl=False) echo("colored", color=True) echo("raw message", prefix=False) # skip prefix if needed

Source code in src/fastflowtransform/logging.py
290
291
292
293
294
295
296
297
298
299
300
301
302
def echo(message: Any = "", *, prefix: bool = True, **kwargs: Any) -> None:
    """
    Thin wrapper around typer.echo(...) that prepends the global log prefix.

    Usage:
        echo("hello")
        echo("to stderr", err=True)
        echo("no newline", nl=False)
        echo("colored", color=True)
        echo("raw message", prefix=False)  # skip prefix if needed
    """
    msg = _apply_prefix(message) if prefix else message
    typer.echo(msg, **kwargs)

echo_debug

echo_debug(message='', *, prefix=True, **kwargs)

Like echo(...), but only emits when fastflowtransform logger is in DEBUG.

Usage

echo_debug("SQL preview:", sql_text) echo_debug("to stderr only in debug", err=True)

Source code in src/fastflowtransform/logging.py
305
306
307
308
309
310
311
312
313
314
315
def echo_debug(message: Any = "", *, prefix: bool = True, **kwargs: Any) -> None:
    """
    Like echo(...), but only emits when `fastflowtransform` logger is in DEBUG.

    Usage:
        echo_debug("SQL preview:", sql_text)
        echo_debug("to stderr only in debug", err=True)
    """
    logger = get_logger()
    if logger.isEnabledFor(_logging.DEBUG):
        echo(message, prefix=prefix, **kwargs)

debug

debug(msg, *args, **kwargs)

General debug; gated by runtime flag OR logger level.

Source code in src/fastflowtransform/logging.py
330
331
332
333
def debug(msg: str, *args: Any, **kwargs: Any) -> None:
    """General debug; gated by runtime flag OR logger level."""
    if is_debug_enabled():
        get_logger("app").debug(msg, *args, **kwargs)

dprint

dprint(*parts)

Lightweight stdout debugging; useful for quick probes during dev. Only prints when general debug is enabled.

Source code in src/fastflowtransform/logging.py
336
337
338
339
340
341
342
343
344
def dprint(*parts: Any) -> None:
    """
    Lightweight stdout debugging; useful for quick probes during dev.
    Only prints when general debug is enabled.
    """
    if is_debug_enabled():
        body = " ".join(str(p) for p in parts) if parts else ""
        msg = "[DBG]" if not body else f"[DBG] {body}"
        print(_prefix_text_line(msg), file=sys.stdout)

sql_debug

sql_debug(msg, *args, **kwargs)

Single-line SQL debug; appears only when SQL debug is enabled.

Source code in src/fastflowtransform/logging.py
347
348
349
350
def sql_debug(msg: str, *args: Any, **kwargs: Any) -> None:
    """Single-line SQL debug; appears only when SQL debug is enabled."""
    if is_sql_debug_enabled():
        get_logger("sql").debug(msg, *args, **kwargs)

sql_block

sql_block(title, lines)

Pretty multi-line SQL preview gated by SQL debug.

Source code in src/fastflowtransform/logging.py
353
354
355
356
357
358
359
360
361
def sql_block(title: str, lines: Iterable[str] | str) -> None:
    """Pretty multi-line SQL preview gated by SQL debug."""
    if not is_sql_debug_enabled():
        return
    body = lines.rstrip() if isinstance(lines, str) else "\n".join(str(x) for x in lines).rstrip()
    if title:
        get_logger("sql").debug("%s\n%s", title, body)
    else:
        get_logger("sql").debug("%s", body)