Planning Internals
ExecutionPlanner can be useful for tests or DAG topology debugging, but most application code does not need to create it directly.
astrum.planner.ExecutionPlanner
ExecutionPlanner(task_order: list[TaskOrder])
Build execution stages from a predeclared DAG.
源代码位于: src/astrum/planner.py
| def __init__(self, task_order: list[TaskOrder]) -> None:
self.task_order = task_order
|
detect_cycle
Detect dependency cycles by task name.
源代码位于: src/astrum/planner.py
| def detect_cycle(self) -> None:
"""Detect dependency cycles by task name."""
task_map = {task.task_name: task for task in self.task_order}
visited: set[str] = set()
rec_stack: set[str] = set()
def dfs(task: TaskOrder, path: list[str]) -> None:
task_name = task.task_name
if task_name in rec_stack:
loop_start_idx = path.index(task_name) if task_name in path else 0
loop_chain = path[loop_start_idx:] + [task_name]
raise TaskOrderLoopError([f"Cycle detected: {' -> '.join(loop_chain)}"])
if task_name in visited:
return
visited.add(task_name)
rec_stack.add(task_name)
for dependency in task.dependencies:
known_dependency = task_map.get(dependency.task_name)
if known_dependency is not None:
dfs(known_dependency, path + [task_name])
rec_stack.remove(task_name)
for task in self.task_order:
if task.task_name not in visited:
dfs(task, [])
|
get_execute_timeline
get_execute_timeline() -> ExecutionPlan
Validate the DAG and return a staged execution plan.
源代码位于: src/astrum/planner.py
| def get_execute_timeline(self) -> ExecutionPlan:
"""Validate the DAG and return a staged execution plan."""
self.validate()
if self.task_order and all(task.dependencies for task in self.task_order):
raise TaskOrderNoExitError("Task order has no entry point; every task depends on another task.")
return self._build_execution_stages()
|
astrum.models.ExecutionStage
dataclass
ExecutionStage(stage_id: int, parallel_tasks: list[str], wait_for_tasks: list[str], start_tasks: list[str])
One planned scheduler stage.
astrum.models.ExecutionPlan
dataclass
ExecutionPlan(stages: list[ExecutionStage], total_tasks: int, max_parallelism: int, original_tasks: list[TaskOrder] = list())
A topologically planned execution timeline.