Decorators and Registry¶
The decorator API is the recommended way to use Astrum. It registers functions as DAG tasks, then converts them into the task callables and TaskOrder objects required by DynamicScheduler.
Module-level entry points¶
astrum.decorators.task ¶
task(task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, namespace: str | None = None, retry: int = 0) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]
模块级任务装饰器。
除了多出的 namespace 参数外,签名和行为与 :meth:SchedulerRegistry.task
完全一致——两者都委托到 :func:_register_function,未来改进底层注册行为
只需要修改那一个函数。
解析命名空间的优先级:显式 namespace= 参数 >
use_namespace 上下文栈顶 > :data:DEFAULT_NAMESPACE。注意命名空间
会在 @ 装饰发生时即时解析,因此把 task(...) 调用放在
with use_namespace(...) 块内即可获得上下文绑定的效果。
Module-level task decorator.
Except for the extra namespace parameter, its signature and behavior are
exactly the same as :meth:SchedulerRegistry.task: both delegate to
:func:_register_function, so future changes to the underlying registration
behavior only need to modify that one function.
Namespace resolution priority: explicit namespace= parameter >
top of the use_namespace context stack > :data:DEFAULT_NAMESPACE. Note
that the namespace is resolved immediately when the @ decoration happens,
so placing the task(...) call inside a with use_namespace(...) block is
enough to get the context-bound effect.
源代码位于: src/astrum/decorators.py
astrum.decorators.run
async
¶
run(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, visualize: bool = False)
源代码位于: src/astrum/decorators.py
astrum.decorators.build_scheduler ¶
build_scheduler(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, ignore_tail_task: list[str] | None = None, concurrency_context: Semaphore | None = None, silence: bool = True, visualize: bool = False) -> DynamicScheduler
源代码位于: src/astrum/decorators.py
astrum.decorators.build_task_orders ¶
build_task_orders(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, visualize: bool = False) -> list[TaskOrder]
源代码位于: src/astrum/decorators.py
Namespaces¶
astrum.decorators.use_namespace ¶
临时把激活命名空间切换为 namespace。
with use_namespace("analytics"): 块内的 @task 装饰器与
build_scheduler / run 等模块级函数将默认作用于该命名空间。
支持嵌套,退出 with 后会自动恢复上一层命名空间。
Temporarily switch the active namespace to namespace.
Inside a with use_namespace("analytics"): block, the @task decorator and
module-level functions such as build_scheduler / run default to that
namespace. Nesting is supported; after leaving the with block, the previous
namespace is restored automatically.
源代码位于: src/astrum/decorators.py
astrum.decorators.get_registry ¶
返回指定命名空间对应的 :class:SchedulerRegistry(不存在则按需创建)。
Return the :class:SchedulerRegistry for the specified namespace, creating it
on demand if it does not exist.
源代码位于: src/astrum/decorators.py
astrum.decorators.clear_registry ¶
清除全局 hub 中的注册表。namespace=None 时清空所有命名空间。
Clear registries in the global hub. When namespace=None, clear all namespaces.
astrum.decorators.active_namespace ¶
SchedulerRegistry¶
SchedulerRegistry is useful when you want an explicit workflow object instead of relying on global namespaces.
astrum.decorators.SchedulerRegistry ¶
Decorator-based DAG builder.
This module is intentionally isolated from the core scheduler: it only converts decorated functions into the same inputs used by manual DAGs.
源代码位于: src/astrum/decorators.py
task ¶
task(task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, retry: int = 0) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]
Register a function as a DAG task in this registry.
源代码位于: src/astrum/decorators.py
get_task ¶
get_all_tasks ¶
clear ¶
build_task_orders ¶
build_task_orders(target_tasks: list[str] | None = None, config: AstrumConfig | None = None) -> list[TaskOrder]
源代码位于: src/astrum/decorators.py
build_tasks ¶
build_tasks(target_tasks: list[str] | None = None) -> list[tuple[str, Callable[..., Awaitable[Any] | Any]]]
返回 (task_id, callable) 列表。
改造前这里会立刻 func() 把任务定格成无参协程;改造后保留函数引用,
由 :class:DynamicScheduler 在调度期决定何时调用以及如何注入参数。
Return a list of (task_id, callable) pairs.
Before the refactor, this would immediately call func() and freeze the
task as a no-argument coroutine; after the refactor, it keeps the function
reference and lets :class:DynamicScheduler decide when to call it and how
to inject parameters during scheduling.
源代码位于: src/astrum/decorators.py
build_scheduler ¶
build_scheduler(target_tasks: list[str] | None = None, *, config: AstrumConfig | None = None, ignore_tail_task: list[str] | None = None, concurrency_context: Semaphore | None = None, silence: bool = True, visualize: bool = False) -> DynamicScheduler
源代码位于: src/astrum/decorators.py
run
async
¶
astrum.decorators.RegisteredTask
dataclass
¶
RegisteredTask(task_id: str, function: Callable[..., Awaitable[Any] | Any], depends_on: tuple[str, ...], retry: int, data: TaskData | None = None)
Metadata captured by the decorator DAG builder.