Skip to content

fastflowtransform.executors._shims

BigQueryConnShim

Lightweight shim so fastflowtransform.testing can call executor.con.execute(...) against BigQuery clients.

Source code in src/fastflowtransform/executors/_shims.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class BigQueryConnShim:
    """
    Lightweight shim so fastflowtransform.testing can call executor.con.execute(...)
    against BigQuery clients.
    """

    marker = "BQ_SHIM"

    def __init__(
        self,
        client: Client,
        location: str | None = None,
        project: str | None = None,
        dataset: str | None = None,
    ):
        self.client = client
        self.location = location
        self.project = project
        self.dataset = dataset

    def execute(self, sql_or_stmts: Any) -> Any:
        if isinstance(sql_or_stmts, str):
            return self.client.query(sql_or_stmts, location=self.location)
        if isinstance(sql_or_stmts, Sequence) and not isinstance(
            sql_or_stmts, (bytes, bytearray, str)
        ):
            job = None
            for stmt in sql_or_stmts:
                job = self.client.query(str(stmt), location=self.location)
                job.result()
            return job
        raise TypeError(f"Unsupported sql argument type for BigQuery shim: {type(sql_or_stmts)}")

SAConnShim

Compatibility layer so fastflowtransform.testing can call executor.con.execute(...) against SQLAlchemy engines (Postgres, etc.). Adds PG-safe DDL rewrites.

Source code in src/fastflowtransform/executors/_shims.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class SAConnShim:
    """
    Compatibility layer so fastflowtransform.testing can call executor.con.execute(...)
    against SQLAlchemy engines (Postgres, etc.). Adds PG-safe DDL rewrites.
    """

    marker = "PG_SHIM"

    def __init__(self, engine: Engine, schema: str | None = None):
        self._engine = engine
        self._schema = schema

    def _exec_one(self, conn: Any, stmt: Any, params: dict | None = None) -> Any:
        # tuple (sql, params)
        statement_len = 2
        if (
            isinstance(stmt, tuple)
            and len(stmt) == statement_len
            and isinstance(stmt[0], str)
            and isinstance(stmt[1], dict)
        ):
            return self._exec_one(conn, stmt[0], stmt[1])

        # sqlalchemy expression
        if isinstance(stmt, ClauseElement):
            return conn.execute(stmt)

        # plain string (apply rewrite, then possibly split into multiple statements)
        if isinstance(stmt, str):
            rewritten = _rewrite_pg_create_or_replace_table(stmt)
            parts = [p.strip() for p in rewritten.split(";") if p.strip()]
            res = None
            for i, part in enumerate(parts):
                res = conn.execute(text(part), params if (i == len(parts) - 1) else None)
            return res

        # iterable of statements -> sequential execution
        if isinstance(stmt, Iterable) and not isinstance(stmt, (bytes, bytearray, str)):
            res = None
            for s in stmt:
                res = self._exec_one(conn, s)
            return res

        # fallback
        return self._exec_one(conn, str(stmt))

    def execute(self, sql: Any) -> Any:
        with self._engine.begin() as conn:
            if self._schema:
                conn.execute(text(f'SET LOCAL search_path = "{self._schema}"'))
            return self._exec_one(conn, sql)