59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 | def run_source_freshness(
executor: Any,
*,
con: Any | None = None,
engine: str | None = None,
) -> list[SourceFreshnessResult]:
"""
Execute freshness checks for all sources that have a configured freshness block.
Returns:
List of per-table SourceFreshnessResult.
"""
engine_label = engine or getattr(executor, "engine_name", None) or ""
engine_norm = engine_label.lower()
connection = con or getattr(executor, "con", executor)
results: list[SourceFreshnessResult] = []
sources = getattr(REGISTRY, "sources", {}) or {}
for src_name, tables in sources.items():
for tbl_name, entry in (tables or {}).items():
freshness_cfg = (entry or {}).get("freshness") or {}
loaded_at = freshness_cfg.get("loaded_at_field")
warn_after = (freshness_cfg.get("warn_after") or {}).get("count_in_minutes")
err_after = (freshness_cfg.get("error_after") or {}).get("count_in_minutes")
if not loaded_at or (warn_after is None and err_after is None):
# No usable freshness config → skip this table.
continue
relation = _relation_for_source(
entry, src_name, tbl_name, executor, engine_norm or None
)
delay: float | None = None
status = "pass"
err_msg: str | None = None
try:
# Use the same logic as the built-in DQ 'freshness' test.
# For classification we consider the strictest threshold:
# - if error_after is set: classify against that;
# - else: classify against warn_after only.
threshold = err_after if err_after is not None else warn_after
if threshold is None:
# should not happen given the guard above
continue
_freshness_test(connection, relation, loaded_at, max_delay_minutes=int(threshold))
# If we reach here, delay <= threshold; we can recompute the actual delay
# by re-running with a large threshold and inferring from error message
# OR we can simply omit it. Keep it simple and omit for now.
except TestFailure as e:
# Parse out the delay from the error message if available.
# Message format from testing.freshness:
# "freshness of table.col too old: {delay} min > {max_delay} min"
txt = str(e)
delay = None
m = None
m = re.search(r"too old: ([0-9.eE+-]+) min >", txt)
if m:
try:
delay = float(m.group(1))
except Exception:
delay = None
err_msg = txt
# Classify WARN vs ERROR
if err_after is not None and delay is not None and delay > err_after:
status = "error"
elif warn_after is not None and delay is not None and delay > warn_after:
status = "warn"
else:
# if we can't parse delay, but test failed against err threshold,
# treat as error.
status = "error"
res = SourceFreshnessResult(
source_name=src_name,
table_name=tbl_name,
relation=relation,
loaded_at_field=loaded_at,
delay_minutes=delay,
warn_after_minutes=warn_after,
error_after_minutes=err_after,
status=status,
error=err_msg,
)
results.append(res)
return results
|