Skip to content

fastflowtransform.logging

bind_context

bind_context(*, run_id=None, engine=None, env=None, node=None)

Bind fields that get injected into every record.

Source code in src/fastflowtransform/logging.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def bind_context(
    *,
    run_id: str | None = None,
    engine: str | None = None,
    env: str | None = None,
    node: str | None = None,
) -> None:
    """Bind fields that get injected into every record."""
    if run_id is not None:
        _run_id.set(run_id)
    if engine is not None:
        _engine.set(engine)
    if env is not None:
        _env.set(env)
    if node is not None:
        _node.set(node)

clear_context

clear_context()

Clear all bound fields.

Source code in src/fastflowtransform/logging.py
93
94
95
96
97
98
def clear_context() -> None:
    """Clear all bound fields."""
    _run_id.set(None)
    _engine.set(None)
    _env.set(None)
    _node.set(None)

bound_context

bound_context(*, run_id=None, engine=None, env=None, node=None)

Temporarily bind (or override) selected fields. Only 'node' is auto-cleared on exit; other fields persist unless you pass new values.

Source code in src/fastflowtransform/logging.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@contextmanager
def bound_context(
    *,
    run_id: str | None = None,
    engine: str | None = None,
    env: str | None = None,
    node: str | None = None,
) -> Generator[None, None, None]:
    """
    Temporarily bind (or override) selected fields.
    Only 'node' is auto-cleared on exit; other fields persist unless you pass new values.
    """
    prev = (_run_id.get(), _engine.get(), _env.get(), _node.get())
    try:
        bind_context(run_id=run_id, engine=engine, env=env, node=node)
        yield
    finally:
        # restore previous values; keep run_id/engine/env stable if you want by not overriding
        _run_id.set(prev[0])
        _engine.set(prev[1])
        _env.set(prev[2])
        _node.set(prev[3])

set_flags

set_flags(*, debug=None, sql_debug=None)

Set runtime flags (thread/task-local).

Source code in src/fastflowtransform/logging.py
125
126
127
128
129
130
def set_flags(*, debug: bool | None = None, sql_debug: bool | None = None) -> None:
    """Set runtime flags (thread/task-local)."""
    if debug is not None:
        _dbg.set(bool(debug))
    if sql_debug is not None:
        _sqldbg.set(bool(sql_debug))

setup

setup(*, level=_logging.INFO, json=False, to_stderr=False, propagate_sql=False)

Configure the 'fastflowtransform' logging tree. Idempotent: previous console handlers installed by this function are removed.

Source code in src/fastflowtransform/logging.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def setup(
    *,
    level: int = _logging.INFO,
    json: bool = False,
    to_stderr: bool = False,
    propagate_sql: bool = False,
) -> None:
    """
    Configure the 'fastflowtransform' logging tree.
    Idempotent: previous console handlers installed by this function are removed.
    """
    root = get_logger()  # 'fastflowtransform'
    root.setLevel(level)

    # Remove previous console handlers we installed
    for h in list(root.handlers):
        try:
            if h.get_name() == "ff_console":
                root.removeHandler(h)
        except Exception:
            # get_name gibt es ab 3.8 - fallback über .name
            if getattr(h, "name", None) == "ff_console":
                root.removeHandler(h)

    handler = _logging.StreamHandler(sys.stderr if to_stderr else sys.stdout)
    handler.set_name("ff_console")
    handler.setFormatter(_JsonFormatter() if json else _ConsoleFormatter())
    handler.addFilter(_CtxFilter())
    root.addHandler(handler)

    # SQL channel inherits formatter/sink via propagation
    sql = get_logger("sql")
    sql.setLevel(_logging.DEBUG if propagate_sql else level)
    sql.propagate = True

    # Also ensure a NullHandler at import-time on the root package
    _logging.getLogger("fastflowtransform").addHandler(_logging.NullHandler())

setup_from_cli_flags

setup_from_cli_flags(*, verbose=0, quiet=0, json=False, sql_debug=False, to_stderr=False)
Convenience for CLI

quiet >=1 → ERROR verbose=0 → WARNING verbose=1 → INFO verbose>=2 → DEBUG

Source code in src/fastflowtransform/logging.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def setup_from_cli_flags(
    *,
    verbose: int = 0,
    quiet: int = 0,
    json: bool = False,
    sql_debug: bool = False,
    to_stderr: bool = False,
) -> None:
    """
    Convenience for CLI:
      quiet >=1 → ERROR
      verbose=0 → WARNING
      verbose=1 → INFO
      verbose>=2 → DEBUG
    """
    verbose_debug_level = 2
    level = _logging.WARNING
    if quiet >= 1:
        level = _logging.ERROR
    elif verbose == 1:
        level = _logging.INFO
    elif verbose >= verbose_debug_level:
        level = _logging.DEBUG

    env_sql = os.getenv("FFT_SQL_DEBUG", "").lower() in ("1", "true", "yes", "on")
    setup(
        level=level,
        json=json,
        to_stderr=to_stderr,
        propagate_sql=(sql_debug or env_sql or verbose >= verbose_debug_level),
    )
    set_flags(
        debug=(verbose >= verbose_debug_level),
        sql_debug=(sql_debug or env_sql or verbose >= verbose_debug_level),
    )

get_logger

get_logger(name=None)

Return a namespaced logger under 'fastflowtransform'. get_logger() → 'fastflowtransform' get_logger("sql") → 'fastflowtransform.sql' get_logger("cli.run") → 'fastflowtransform.cli.run'

Source code in src/fastflowtransform/logging.py
268
269
270
271
272
273
274
275
276
def get_logger(name: str | None = None) -> _logging.Logger:
    """
    Return a namespaced logger under 'fastflowtransform'.
    get_logger()          → 'fastflowtransform'
    get_logger("sql")     → 'fastflowtransform.sql'
    get_logger("cli.run") → 'fastflowtransform.cli.run'
    """
    base = "fastflowtransform"
    return _logging.getLogger(base if not name else f"{base}.{name}")

echo

echo(message='', *, prefix=True, **kwargs)

Thin wrapper around typer.echo(...) that prepends the global log prefix.

Usage

echo("hello") echo("to stderr", err=True) echo("no newline", nl=False) echo("colored", color=True) echo("raw message", prefix=False) # skip prefix if needed

Source code in src/fastflowtransform/logging.py
279
280
281
282
283
284
285
286
287
288
289
290
291
def echo(message: Any = "", *, prefix: bool = True, **kwargs: Any) -> None:
    """
    Thin wrapper around typer.echo(...) that prepends the global log prefix.

    Usage:
        echo("hello")
        echo("to stderr", err=True)
        echo("no newline", nl=False)
        echo("colored", color=True)
        echo("raw message", prefix=False)  # skip prefix if needed
    """
    msg = _apply_prefix(message) if prefix else message
    typer.echo(msg, **kwargs)

echo_debug

echo_debug(message='', *, prefix=True, **kwargs)

Like echo(...), but only emits when fastflowtransform logger is in DEBUG.

Usage

echo_debug("SQL preview:", sql_text) echo_debug("to stderr only in debug", err=True)

Source code in src/fastflowtransform/logging.py
294
295
296
297
298
299
300
301
302
303
304
def echo_debug(message: Any = "", *, prefix: bool = True, **kwargs: Any) -> None:
    """
    Like echo(...), but only emits when `fastflowtransform` logger is in DEBUG.

    Usage:
        echo_debug("SQL preview:", sql_text)
        echo_debug("to stderr only in debug", err=True)
    """
    logger = get_logger()
    if logger.isEnabledFor(_logging.DEBUG):
        echo(message, prefix=prefix, **kwargs)

debug

debug(msg, *args, **kwargs)

General debug; gated by runtime flag OR logger level.

Source code in src/fastflowtransform/logging.py
319
320
321
322
def debug(msg: str, *args: Any, **kwargs: Any) -> None:
    """General debug; gated by runtime flag OR logger level."""
    if is_debug_enabled():
        get_logger("app").debug(msg, *args, **kwargs)

dprint

dprint(*parts)

Lightweight stdout debugging; useful for quick probes during dev. Only prints when general debug is enabled.

Source code in src/fastflowtransform/logging.py
325
326
327
328
329
330
331
332
333
def dprint(*parts: Any) -> None:
    """
    Lightweight stdout debugging; useful for quick probes during dev.
    Only prints when general debug is enabled.
    """
    if is_debug_enabled():
        body = " ".join(str(p) for p in parts) if parts else ""
        msg = "[DBG]" if not body else f"[DBG] {body}"
        print(_prefix_text_line(msg), file=sys.stdout)

sql_debug

sql_debug(msg, *args, **kwargs)

Single-line SQL debug; appears only when SQL debug is enabled.

Source code in src/fastflowtransform/logging.py
336
337
338
339
def sql_debug(msg: str, *args: Any, **kwargs: Any) -> None:
    """Single-line SQL debug; appears only when SQL debug is enabled."""
    if is_sql_debug_enabled():
        get_logger("sql").debug(msg, *args, **kwargs)

sql_block

sql_block(title, lines)

Pretty multi-line SQL preview gated by SQL debug.

Source code in src/fastflowtransform/logging.py
342
343
344
345
346
347
348
349
350
def sql_block(title: str, lines: Iterable[str] | str) -> None:
    """Pretty multi-line SQL preview gated by SQL debug."""
    if not is_sql_debug_enabled():
        return
    body = lines.rstrip() if isinstance(lines, str) else "\n".join(str(x) for x in lines).rstrip()
    if title:
        get_logger("sql").debug("%s\n%s", title, body)
    else:
        get_logger("sql").debug("%s", body)