跳转至

模型与异常

这里记录用户在执行结果读取、统计分析和错误处理时最常用的模型。

DAG 与执行结果

astrum.models.TaskOrder dataclass

TaskOrder(task_name: str, dependencies: list[TaskOrder] = list())

A task node and the task nodes that must complete before it can run.

depend property writable

depend: TaskOrder | None

Backward-compatible single-dependency accessor.

astrum.models.ExecutionReport dataclass

ExecutionReport(total_start_time: float, total_end_time: float, total_duration: float, planning_duration: float, execution_duration: float, total_tasks: int, total_stages: int, max_parallelism: int, successful_tasks: int, failed_tasks: int, stage_statistics: list[TaskStageStatistics], task_statistics: list[TaskExecutionStatistics], execution_state: str, error_summary: list[str] = list(), original_tasks: dict[str, list[str]] = dict(), tail_tasks: dict[str, Task[Any]] = dict(), task_return_set: dict[str, Any] = list())

astrum.models.TaskStageStatistics dataclass

TaskStageStatistics(stage_id: int, stage_name: str, start_time: float, end_time: float, duration: float, parallel_task_count: int, wait_task_count: int, parallel_tasks: list[str], wait_tasks: list[str])

astrum.models.TaskExecutionStatistics dataclass

TaskExecutionStatistics(task_name: str, stage_id: int, start_time: float, end_time: float, duration: float, status: str, error_message: str | None = None, retry: int = 0, attempt_count: int = 0, attempt_statistics: list[TaskAttemptStatistics] = list())

astrum.models.TaskAttemptStatistics dataclass

TaskAttemptStatistics(task_name: str, attempt_number: int, max_retries: int, start_time: float, end_time: float, duration: float, status: str, error_message: str | None = None, will_retry: bool = False)

astrum.models.ExecutionState

Bases: str, Enum

Overall scheduler execution state.

astrum.models.StageStatus

Bases: str, Enum

Current execution stage state.

用户可处理异常

astrum.models.SchedulerError

Bases: Exception

Base class for scheduler errors.

astrum.models.TaskOrderLoopError

TaskOrderLoopError(errors: list[str])

Bases: SchedulerError

Raised when the DAG contains a cycle.

源代码位于: src/astrum/models.py
def __init__(self, errors: list[str]) -> None:
    self.errors = errors
    super().__init__("\n".join(errors))

astrum.models.TaskOrderNoExitError

Bases: SchedulerError

Raised when no task can be used as an entry point.

astrum.models.TaskDependencyError

Bases: SchedulerError

Raised when a task depends on an unknown task.

astrum.models.TaskNotFoundError

TaskNotFoundError(task_name: str)

Bases: SchedulerError

Raised when a planned task has no matching coroutine.

源代码位于: src/astrum/models.py
def __init__(self, task_name: str) -> None:
    self.task_name = task_name
    super().__init__(f"Task '{task_name}' not found in task list.")

astrum.models.TaskDuplicateExecutionError

TaskDuplicateExecutionError(task_name: str)

Bases: SchedulerError

Raised when a task would be scheduled more than once.

源代码位于: src/astrum/models.py
def __init__(self, task_name: str) -> None:
    self.task_name = task_name
    super().__init__(f"Task '{task_name}' is already scheduled for execution.")

astrum.models.TaskRegistrationError

Bases: SchedulerError

Raised when decorator-based registration is invalid.