Skip to content

Planning Internals

ExecutionPlanner can be useful for tests or DAG topology debugging, but most application code does not need to create it directly.

astrum.planner.ExecutionPlanner

ExecutionPlanner(task_order: list[TaskOrder])

Build execution stages from a predeclared DAG.

源代码位于: src/astrum/planner.py
def __init__(self, task_order: list[TaskOrder]) -> None:
    self.task_order = task_order

detect_cycle

detect_cycle() -> None

Detect dependency cycles by task name.

源代码位于: src/astrum/planner.py
def detect_cycle(self) -> None:
    """Detect dependency cycles by task name."""

    task_map = {task.task_name: task for task in self.task_order}
    visited: set[str] = set()
    rec_stack: set[str] = set()

    def dfs(task: TaskOrder, path: list[str]) -> None:
        task_name = task.task_name
        if task_name in rec_stack:
            loop_start_idx = path.index(task_name) if task_name in path else 0
            loop_chain = path[loop_start_idx:] + [task_name]
            raise TaskOrderLoopError([f"Cycle detected: {' -> '.join(loop_chain)}"])

        if task_name in visited:
            return

        visited.add(task_name)
        rec_stack.add(task_name)

        for dependency in task.dependencies:
            known_dependency = task_map.get(dependency.task_name)
            if known_dependency is not None:
                dfs(known_dependency, path + [task_name])

        rec_stack.remove(task_name)

    for task in self.task_order:
        if task.task_name not in visited:
            dfs(task, [])

get_execute_timeline

get_execute_timeline() -> ExecutionPlan

Validate the DAG and return a staged execution plan.

源代码位于: src/astrum/planner.py
def get_execute_timeline(self) -> ExecutionPlan:
    """Validate the DAG and return a staged execution plan."""

    self.validate()

    if self.task_order and all(task.dependencies for task in self.task_order):
        raise TaskOrderNoExitError("Task order has no entry point; every task depends on another task.")

    return self._build_execution_stages()

astrum.models.ExecutionStage dataclass

ExecutionStage(stage_id: int, parallel_tasks: list[str], wait_for_tasks: list[str], start_tasks: list[str])

One planned scheduler stage.

astrum.models.ExecutionPlan dataclass

ExecutionPlan(stages: list[ExecutionStage], total_tasks: int, max_parallelism: int, original_tasks: list[TaskOrder] = list())

A topologically planned execution timeline.