装饰器与注册表¶
装饰器 API 是 Astrum 最推荐的使用方式。它把函数注册为 DAG task,并在构建 scheduler 时转换成底层 DynamicScheduler 所需的任务函数和 TaskOrder。
模块级入口¶
astrum.decorators.task ¶
task(task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, namespace: str | None = None, retry: int = 0) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]
模块级任务装饰器。
除了多出的 namespace 参数外,签名和行为与 :meth:SchedulerRegistry.task
完全一致——两者都委托到 :func:_register_function,未来改进底层注册行为
只需要修改那一个函数。
解析命名空间的优先级:显式 namespace= 参数 >
use_namespace 上下文栈顶 > :data:DEFAULT_NAMESPACE。注意命名空间
会在 @ 装饰发生时即时解析,因此把 task(...) 调用放在
with use_namespace(...) 块内即可获得上下文绑定的效果。
Module-level task decorator.
Except for the extra namespace parameter, its signature and behavior are
exactly the same as :meth:SchedulerRegistry.task: both delegate to
:func:_register_function, so future changes to the underlying registration
behavior only need to modify that one function.
Namespace resolution priority: explicit namespace= parameter >
top of the use_namespace context stack > :data:DEFAULT_NAMESPACE. Note
that the namespace is resolved immediately when the @ decoration happens,
so placing the task(...) call inside a with use_namespace(...) block is
enough to get the context-bound effect.
源代码位于: src/astrum/decorators.py
astrum.decorators.run
async
¶
run(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, visualize: bool = False)
源代码位于: src/astrum/decorators.py
astrum.decorators.build_scheduler ¶
build_scheduler(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, ignore_tail_task: list[str] | None = None, concurrency_context: Semaphore | None = None, silence: bool = True, visualize: bool = False) -> DynamicScheduler
源代码位于: src/astrum/decorators.py
astrum.decorators.build_task_orders ¶
build_task_orders(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, visualize: bool = False) -> list[TaskOrder]
源代码位于: src/astrum/decorators.py
命名空间¶
astrum.decorators.use_namespace ¶
临时把激活命名空间切换为 namespace。
with use_namespace("analytics"): 块内的 @task 装饰器与
build_scheduler / run 等模块级函数将默认作用于该命名空间。
支持嵌套,退出 with 后会自动恢复上一层命名空间。
Temporarily switch the active namespace to namespace.
Inside a with use_namespace("analytics"): block, the @task decorator and
module-level functions such as build_scheduler / run default to that
namespace. Nesting is supported; after leaving the with block, the previous
namespace is restored automatically.
源代码位于: src/astrum/decorators.py
astrum.decorators.get_registry ¶
返回指定命名空间对应的 :class:SchedulerRegistry(不存在则按需创建)。
Return the :class:SchedulerRegistry for the specified namespace, creating it
on demand if it does not exist.
源代码位于: src/astrum/decorators.py
astrum.decorators.clear_registry ¶
清除全局 hub 中的注册表。namespace=None 时清空所有命名空间。
Clear registries in the global hub. When namespace=None, clear all namespaces.
astrum.decorators.active_namespace ¶
SchedulerRegistry¶
SchedulerRegistry 适合显式创建独立工作流对象,避免依赖全局命名空间。
astrum.decorators.SchedulerRegistry ¶
Decorator-based DAG builder.
This module is intentionally isolated from the core scheduler: it only converts decorated functions into the same inputs used by manual DAGs.
源代码位于: src/astrum/decorators.py
task ¶
task(task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, retry: int = 0) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]
Register a function as a DAG task in this registry.
源代码位于: src/astrum/decorators.py
get_task ¶
get_all_tasks ¶
clear ¶
build_task_orders ¶
build_task_orders(target_tasks: list[str] | None = None, config: AstrumConfig | None = None) -> list[TaskOrder]
源代码位于: src/astrum/decorators.py
build_tasks ¶
build_tasks(target_tasks: list[str] | None = None) -> list[tuple[str, Callable[..., Awaitable[Any] | Any]]]
返回 (task_id, callable) 列表。
改造前这里会立刻 func() 把任务定格成无参协程;改造后保留函数引用,
由 :class:DynamicScheduler 在调度期决定何时调用以及如何注入参数。
Return a list of (task_id, callable) pairs.
Before the refactor, this would immediately call func() and freeze the
task as a no-argument coroutine; after the refactor, it keeps the function
reference and lets :class:DynamicScheduler decide when to call it and how
to inject parameters during scheduling.
源代码位于: src/astrum/decorators.py
build_scheduler ¶
build_scheduler(target_tasks: list[str] | None = None, *, config: AstrumConfig | None = None, ignore_tail_task: list[str] | None = None, concurrency_context: Semaphore | None = None, silence: bool = True, visualize: bool = False) -> DynamicScheduler
源代码位于: src/astrum/decorators.py
run
async
¶
astrum.decorators.RegisteredTask
dataclass
¶
RegisteredTask(task_id: str, function: Callable[..., Awaitable[Any] | Any], depends_on: tuple[str, ...], retry: int, data: TaskData | None = None)
Metadata captured by the decorator DAG builder.