Reads project_dir/profiles.yml and looks for:
.artifacts.mode
.artifacts.postgres.dsn
.artifacts.postgres.db_schema
Also supports an optional top-level 'profiles:' wrapper:
profiles:
dev: {...}
Source code in src/fastflowtransform/artifacts/config.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 | def resolve_artifacts_db(project_dir: Path, env_name: str) -> ResolvedArtifactsDb | None:
"""
Reads project_dir/profiles.yml and looks for:
<env_name>.artifacts.mode
<env_name>.artifacts.postgres.dsn
<env_name>.artifacts.postgres.db_schema
Also supports an optional top-level 'profiles:' wrapper:
profiles:
dev: {...}
"""
raw = _read_profiles_yaml(project_dir)
if not raw:
return None
profiles = raw.get("profiles")
root = profiles if isinstance(profiles, dict) else raw
env = root.get(env_name)
if not isinstance(env, dict):
return None
artifacts = env.get("artifacts")
if not isinstance(artifacts, dict):
return None
mode = str(artifacts.get("mode") or "files").lower().strip()
if mode not in ("files", "db", "both"):
warn(f"[artifacts] invalid artifacts.mode={mode!r}; falling back to 'files'")
mode = "files"
engine = str(artifacts.get("engine") or "postgres").lower().strip()
if engine != "postgres":
warn(f"[artifacts] artifacts.engine={engine!r} not supported yet; ignoring")
return None
pg = artifacts.get("postgres")
if not isinstance(pg, dict):
return None
dsn = pg.get("dsn")
schema = pg.get("db_schema") or pg.get("schema")
if not isinstance(dsn, str) or not dsn.strip():
return None
if not isinstance(schema, str) or not schema.strip():
return None
return ResolvedArtifactsDb(mode=mode, dsn=dsn.strip(), db_schema=schema.strip())
|