调度器内部
这些方法帮助底层调度器规划、查找任务、记录统计和执行带重试的单个任务。普通用户通常只需要外部 API 中的 execute()。
astrum.scheduler.DynamicScheduler.detect_cycle
源代码位于: src/astrum/scheduler.py
| def detect_cycle(self) -> None:
ExecutionPlanner(self.task_order).detect_cycle()
|
astrum.scheduler.DynamicScheduler.get_execute_timeline
源代码位于: src/astrum/scheduler.py
| def get_execute_timeline(self):
return ExecutionPlanner(self.task_order).get_execute_timeline()
|
astrum.scheduler.DynamicScheduler.find_task_by_name
find_task_by_name(task_name: str) -> TaskCallable | None
源代码位于: src/astrum/scheduler.py
| def find_task_by_name(self, task_name: str) -> TaskCallable | None:
for name, task in self.tasks:
if name == task_name:
return task
return None
|