Skip to content

fastflowtransform.executors._test_utils

make_fetchable

make_fetchable(result)

Ensure a result supports .fetchone()/.fetchall().

  • If already fetchable, return as-is.
  • If it has .result() (e.g., BigQuery QueryJob), use that first.
  • If it's an iterable (not string/bytes), wrap in a simple fetch wrapper.
  • Otherwise return unchanged (may still fail if caller expects fetch methods).
Source code in src/fastflowtransform/executors/_test_utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def make_fetchable(result: Any) -> Any:
    """
    Ensure a result supports .fetchone()/.fetchall().

    - If already fetchable, return as-is.
    - If it has .result() (e.g., BigQuery QueryJob), use that first.
    - If it's an iterable (not string/bytes), wrap in a simple fetch wrapper.
    - Otherwise return unchanged (may still fail if caller expects fetch methods).
    """
    if hasattr(result, "fetchone") and hasattr(result, "fetchall"):
        return result
    if hasattr(result, "result") and callable(result.result):
        try:
            return make_fetchable(result.result())
        except Exception:
            raise
    if isinstance(result, Iterable) and not isinstance(result, (str, bytes, bytearray)):
        return _ListFetchWrapper(result)
    return result

rows_to_tuples

rows_to_tuples(rows)

Normalize various row shapes to simple tuples.

Supported shapes
  • tuple -> returned as-is
  • mapping-like via .asDict() -> tuple(values)
  • general Sequence (excluding str/bytes) -> tuple(row)
  • fallback: (row,)
Source code in src/fastflowtransform/executors/_test_utils.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def rows_to_tuples(rows: Iterable[Any] | None) -> list[tuple[Any, ...]]:
    """
    Normalize various row shapes to simple tuples.

    Supported shapes:
      - tuple -> returned as-is
      - mapping-like via .asDict() -> tuple(values)
      - general Sequence (excluding str/bytes) -> tuple(row)
      - fallback: (row,)
    """

    def _one(row: Any) -> tuple[Any, ...]:
        if isinstance(row, tuple):
            return row
        if hasattr(row, "asDict"):
            try:
                d = row.asDict()
                if isinstance(d, dict):
                    return tuple(d.values())
            except Exception:
                pass
        if isinstance(row, Sequence) and not isinstance(row, (str, bytes, bytearray)):
            try:
                return tuple(row)
            except Exception:
                pass
        return (row,)

    return [_one(r) for r in (rows or [])]