跳转至

调度器内部

这些方法帮助底层调度器规划、查找任务、记录统计和执行带重试的单个任务。普通用户通常只需要外部 API 中的 execute()

astrum.scheduler.DynamicScheduler.detect_cycle

detect_cycle() -> None
源代码位于: src/astrum/scheduler.py
def detect_cycle(self) -> None:
    ExecutionPlanner(self.task_order).detect_cycle()

astrum.scheduler.DynamicScheduler.get_execute_timeline

get_execute_timeline()
源代码位于: src/astrum/scheduler.py
def get_execute_timeline(self):
    return ExecutionPlanner(self.task_order).get_execute_timeline()

astrum.scheduler.DynamicScheduler.find_task_by_name

find_task_by_name(task_name: str) -> TaskCallable | None
源代码位于: src/astrum/scheduler.py
def find_task_by_name(self, task_name: str) -> TaskCallable | None:
    for name, task in self.tasks:
        if name == task_name:
            return task
    return None