def schedule(
levels: list[list[str]],
jobs: int,
fail_policy: FailPolicy,
run_node: Callable[[str], None],
before: Callable[..., None] | None = None,
on_error: Callable[[str, BaseException], None] | None = None,
logger: LogQueue | None = None,
engine_abbr: str = "",
name_width: int = 28,
) -> ScheduleResult:
"""Run levels sequentially; within a level run up to `jobs` nodes in parallel."""
per_node: dict[str, float] = {}
failed: dict[str, BaseException] = {}
per_node_lock = threading.Lock()
t_total0 = perf_counter()
for lvl_idx, lvl in enumerate(levels, start=1):
had_error, _, _, _ = _run_level(
lvl_idx=lvl_idx,
names=lvl,
jobs=jobs,
fail_policy=fail_policy,
before_cb=before,
run_node=run_node,
per_node=per_node,
per_node_lock=per_node_lock,
failed=failed,
logger=logger,
engine_abbr=engine_abbr,
name_width=name_width,
)
if had_error:
if on_error:
# bereits pro Node best-effort gemeldet; keine Sammelmeldung hier
pass
break
total = perf_counter() - t_total0
return ScheduleResult(per_node_s=per_node, total_s=total, failed=failed)