API Calls in Python Models¶
Status: Experimental but stable for demos and smaller workflows.
Goal: Query HTTP APIs from Python models, return responses as DataFrames, cache and instrument them cleanly, and support reproducible offline runs.
- Motivation
 - Quickstart
 - Programming API
 get_jsonget_df- Pagination
 - Context & Telemetry
 - CLI Flags & Environment Variables
 - Example Model
 - Artifacts
 - Tests & Offline Demos
 - Best Practices
 - Troubleshooting
 - Security & Compliance
 - FAQ
 
Motivation¶
Many pipelines need small, reliable API fetchers: configuration tables, miniature dimensions, feature flags, SaaS exports. This feature provides:
- Simple HTTP calls inside Python models
 - File-backed cache (reproducible builds, works offline)
 - Per-node telemetry (requests, hits, bytes, hashes)
 - CLI switches 
--offlineand--http-cachefor reproducible runs 
Quickstart¶
- Optionally enable flags (recommended):
 
# No network - cache hits only
fft run . --env dev --offline
# Cache mode
fft run . --env dev --http-cache rw   # rw|ro|off
- Write a Python model:
 
# models/users_from_api.ff.py
import pandas as pd
from fastflowtransform.core import model
from fastflowtransform.api.http import get_df
@model(name="users_from_api", deps=["users.ff"])
def fetch(_: pd.DataFrame) -> pd.DataFrame:
    df = get_df(
        url="https://api.example.com/users",
        params={"page": 1},
        record_path=["data"],        # JSON -> list -> DataFrame
    )
    return df
- Run it:
 
fft run . --env dev --select users_from_api
Programming API¶
Module:
fastflowtransform.api.http
get_json¶
from fastflowtransform.api.http import get_json
data = get_json(
    url="https://api.example.com/objects",
    params={"page": 1},        # optional
    headers={"Authorization": "Bearer ..."},  # optional
    timeout=20,                # optional (seconds)
)
# -> Python dict / list
Behavior
- Reads from the local cache (when present and valid).
 - Writes to the cache (
rwmode), including the response body. - Respects offline mode (no network traffic).
 
get_df¶
from fastflowtransform.api.http import get_df
df = get_df(
    url="https://api.example.com/users",
    params={"page": 1},
    record_path=["data"],     # path to the JSON list
    normalize=True,           # optional: flatten nested objects
    paginator=None,           # optional: pagination strategy (see below)
    output="pandas",          # pandas|spark (default=pandas)
)
# -> pandas.DataFrame
Conversion
- Default: 
record_pathpoints to the array payload (for example["data"]). normalize=Truedelegates tojson_normalizefor deeper structures.output='spark'(plus an optionalsession=SparkSession) converts the normalized result into apyspark.sql.DataFrame. Additional backends will reuse the same parameter.
Pagination¶
For paged APIs you can describe the next request declaratively:
def paginator(url: str, params: dict | None, json_obj: dict):
    next_url = json_obj.get("next")          # e.g. absolute URL
    if next_url:
        return {"next_request": {"url": next_url}}
    return None
df = get_df(
    "https://api.example.com/users?page=1",
    paginator=paginator,
    record_path=["data"],
)
The paginator may return the following fields:
{"next_request": {"url": "...", "params": {...}, "headers": {...}}}(any missing field keeps its previous value)
Context & Telemetry¶
During a model run the executor collects telemetry per node and writes it into run_results.json:
requests(count)cache_hitsbytes(sum of response bodies)used_offline(bool)keys(cache keys)entries(optional compact array with URL, status, content hash)
You will find these metrics under the http block of each node (see Artifacts).
CLI Flags & Environment Variables¶
CLI
--offline
SetsFF_HTTP_OFFLINE=1; network requests are blocked, cache hits only.- 
--http-cache {off|ro|rw}
SetsFF_HTTP_CACHE_MODE: - 
off: neither read nor write. ro: read-only (hits), no writes.rw: read and write (default).
Environment (optional to set directly)
| Variable | Default | Effect | 
|---|---|---|
FF_HTTP_OFFLINE | 
0 | 
1/true/on -> offline mode | 
FF_HTTP_CACHE_MODE | 
rw | 
off / ro / rw | 
FF_HTTP_CACHE_DIR | 
.fastflowtransform/http_cache | 
Cache directory | 
FF_HTTP_TTL | 
0 | 
Seconds; 0 = never expires | 
FF_HTTP_TIMEOUT | 
20 | 
Request timeout (seconds) | 
FF_HTTP_MAX_RETRIES | 
3 | 
Basic retry count | 
FF_HTTP_RATE_LIMIT_RPS | 
0 | 
Requests per second (0 = unlimited) | 
Example Model¶
# models/dim_countries_from_api.ff.py
import pandas as pd
from fastflowtransform.core import model
from fastflowtransform.api.http import get_df
@model(name="dim_countries_from_api", deps=["users.ff"])
def countries(_: pd.DataFrame) -> pd.DataFrame:
    def pager(u, p, js):
        nxt = js.get("paging", {}).get("next")
        return {"next_request": {"url": nxt}} if nxt else None
    df = get_df(
        url="https://api.example.com/countries?page=1",
        paginator=pager,
        record_path=["data"],
        normalize=True,
    )
    # lightweight post-processing
    if "code" in df.columns:
        df["code"] = df["code"].str.upper()
    return df
Run:
fft run . --env dev --select dim_countries_from_api --http-cache ro
Artifacts¶
<project>/.fastflowtransform/target/run_results.json (excerpt):
{
  "results": [
    {
      "name": "dim_countries_from_api",
      "status": "success",
      "duration_ms": 153,
      "http": {
        "requests": 2,
        "cache_hits": 2,
        "bytes": 1842,
        "used_offline": true,
        "keys": ["GET:https://api.example.com/countries?page=1|{}|{}", "..."],
        "entries": [
          {"url": "https://api.example.com/countries?page=1", "status": 200, "content_hash": "sha256:..."},
          {"url": "https://api.example.com/countries?page=2", "status": 200, "content_hash": "sha256:..."}
        ]
      }
    }
  ]
}
Note: When a node is skipped (fingerprint cache hit), no new
httpblock is emitted - the model did not run.
Tests & Offline Demos¶
- Place unit tests under 
tests/api/...and seed the cache directly (no real HTTP calls). - 
Suggested scenarios:
 - 
Offline hit: set
FF_HTTP_OFFLINE=1, seed the cache,get_json/get_dfmust succeed. - Cache mode 
off: even with cache entries, no reads; expect a failure in offline mode. ro: allow read hits; no cache writes after a real or mocked request.- Pagination: stitch several pages from offline fixtures; telemetry should count requests/hits.
 
Best Practices¶
- Stable URLs and parameter order produce identical cache keys and reproducible builds.
 - Keep 
record_pathshallow; usenormalize=Trueonly when necessary (performance). - Never cache secrets: provide tokens via headers; the response body and metadata are cached.
 - Use 
--offlinein CI for deterministic tests with a pre-seeded cache. - Set TTL intentionally when APIs change frequently.
 - Scope engine-specific variants with 
engine_model(only=...)so each execution backend registers only the models it can run (pair with SQLconfig(engines=[...])when duplicating logical names). 
Troubleshooting¶
- “offline + cache miss”
Seed the cache (see tests) or disable offline mode. - “Schema mismatch”
Harmonize columns afterget_df(types, missing keys). - “Too many requests”
ConfigureFF_HTTP_RATE_LIMIT_RPS; make pagination more efficient (largerpage_size). - “No http block”
Was the node skipped (fingerprint cache)? Or did the model avoid HTTP calls altogether? 
Security & Compliance¶
- Do not commit secrets - use environment variables or a secret manager.
 - PII/GDPR: verify whether the API returns personal data; minimise retention.
 - Cache directory: keep it in 
.gitignore; encrypt or isolate it if necessary. 
FAQ¶
Q: Can I call other libraries (for example requests, httpx) directly?
A: Yes, but you lose telemetry and caching. The recommended entrypoint is fastflowtransform.api.http.
Q: How do I add custom headers (for example OAuth)?
A: Pass headers={...}. Store sensitive values in env vars and inject them into your models.
Q: Does this work for POST requests?
A: Release R1 focuses on GET. Please open an issue for POST/PUT support; the design can be extended.
See also:
- Technical guide: Developer Guide – Architecture & Internals
 - Unit tests: 
tests/api/test_http_*.py - Runtime & cache: Parallelism & Cache (v0.3)