Skip to content

Decorators and Registry

The decorator API is the recommended way to use Astrum. It registers functions as DAG tasks, then converts them into the task callables and TaskOrder objects required by DynamicScheduler.

Module-level entry points

astrum.decorators.task

task(task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, namespace: str | None = None, retry: int = 0) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]

模块级任务装饰器。

除了多出的 namespace 参数外,签名和行为与 :meth:SchedulerRegistry.task 完全一致——两者都委托到 :func:_register_function,未来改进底层注册行为 只需要修改那一个函数。

解析命名空间的优先级:显式 namespace= 参数 > use_namespace 上下文栈顶 > :data:DEFAULT_NAMESPACE。注意命名空间 会在 @ 装饰发生时即时解析,因此把 task(...) 调用放在 with use_namespace(...) 块内即可获得上下文绑定的效果。

Module-level task decorator.

Except for the extra namespace parameter, its signature and behavior are exactly the same as :meth:SchedulerRegistry.task: both delegate to :func:_register_function, so future changes to the underlying registration behavior only need to modify that one function.

Namespace resolution priority: explicit namespace= parameter > top of the use_namespace context stack > :data:DEFAULT_NAMESPACE. Note that the namespace is resolved immediately when the @ decoration happens, so placing the task(...) call inside a with use_namespace(...) block is enough to get the context-bound effect.

源代码位于: src/astrum/decorators.py
def task(
    task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, namespace: str | None = None, retry: int = 0
) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]:
    """模块级任务装饰器。

    除了多出的 ``namespace`` 参数外,签名和行为与 :meth:`SchedulerRegistry.task`
    完全一致——两者都委托到 :func:`_register_function`,未来改进底层注册行为
    只需要修改那一个函数。

    解析命名空间的优先级:显式 ``namespace=`` 参数 >
    ``use_namespace`` 上下文栈顶 > :data:`DEFAULT_NAMESPACE`。注意命名空间
    会在 ``@`` 装饰发生时即时解析,因此把 ``task(...)`` 调用放在
    ``with use_namespace(...)`` 块内即可获得上下文绑定的效果。

    Module-level task decorator.

    Except for the extra ``namespace`` parameter, its signature and behavior are
    exactly the same as :meth:`SchedulerRegistry.task`: both delegate to
    :func:`_register_function`, so future changes to the underlying registration
    behavior only need to modify that one function.

    Namespace resolution priority: explicit ``namespace=`` parameter >
    top of the ``use_namespace`` context stack > :data:`DEFAULT_NAMESPACE`. Note
    that the namespace is resolved immediately when the ``@`` decoration happens,
    so placing the ``task(...)`` call inside a ``with use_namespace(...)`` block is
    enough to get the context-bound effect.
    """

    dependencies = tuple(depends_on or ())

    def decorator(func: Callable[..., Awaitable[Any] | Any]) -> Callable[..., Awaitable[Any] | Any]:
        registry = _HUB.get(namespace)
        return _register_function(registry, func, task_id=task_id, depends_on=dependencies, data=data, retry=retry)

    return decorator

astrum.decorators.run async

run(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, visualize: bool = False)
源代码位于: src/astrum/decorators.py
async def run(
    target_tasks: list[str] | None = None,
    *,
    namespace: str | None = None,
    config: AstrumConfig | None = None,
    visualize: bool = False,
):
    if config is None:
        config = AstrumConfig(visualize=visualize)
    return await _HUB.get(namespace).run(target_tasks, config=config)

astrum.decorators.build_scheduler

build_scheduler(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, ignore_tail_task: list[str] | None = None, concurrency_context: Semaphore | None = None, silence: bool = True, visualize: bool = False) -> DynamicScheduler
源代码位于: src/astrum/decorators.py
def build_scheduler(
    target_tasks: list[str] | None = None,
    *,
    namespace: str | None = None,
    config: AstrumConfig | None = None,
    # 以下为向后兼容的遗留参数
    ignore_tail_task: list[str] | None = None,
    concurrency_context: asyncio.Semaphore | None = None,
    silence: bool = True,
    visualize: bool = False,
) -> DynamicScheduler:
    if config is None:
        config = AstrumConfig(
            visualize=visualize,
            silence=silence,
            ignore_tail_task=ignore_tail_task or [],
        )
    return _HUB.get(namespace).build_scheduler(
        target_tasks,
        config=config,
        concurrency_context=concurrency_context,
    )

astrum.decorators.build_task_orders

build_task_orders(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, visualize: bool = False) -> list[TaskOrder]
源代码位于: src/astrum/decorators.py
def build_task_orders(
    target_tasks: list[str] | None = None,
    *,
    namespace: str | None = None,
    config: AstrumConfig | None = None,
    visualize: bool = False,
) -> list[TaskOrder]:
    if config is None:
        config = AstrumConfig(visualize=visualize)
    return _HUB.get(namespace).build_task_orders(target_tasks, config=config)

Namespaces

astrum.decorators.use_namespace

use_namespace(namespace: str) -> Iterator[SchedulerRegistry]

临时把激活命名空间切换为 namespace

with use_namespace("analytics"): 块内的 @task 装饰器与 build_scheduler / run 等模块级函数将默认作用于该命名空间。 支持嵌套,退出 with 后会自动恢复上一层命名空间。

Temporarily switch the active namespace to namespace.

Inside a with use_namespace("analytics"): block, the @task decorator and module-level functions such as build_scheduler / run default to that namespace. Nesting is supported; after leaving the with block, the previous namespace is restored automatically.

源代码位于: src/astrum/decorators.py
@contextmanager
def use_namespace(namespace: str) -> Iterator[SchedulerRegistry]:
    """临时把激活命名空间切换为 ``namespace``。

    ``with use_namespace("analytics"):`` 块内的 ``@task`` 装饰器与
    ``build_scheduler`` / ``run`` 等模块级函数将默认作用于该命名空间。
    支持嵌套,退出 ``with`` 后会自动恢复上一层命名空间。

    Temporarily switch the active namespace to ``namespace``.

    Inside a ``with use_namespace("analytics"):`` block, the ``@task`` decorator and
    module-level functions such as ``build_scheduler`` / ``run`` default to that
    namespace. Nesting is supported; after leaving the ``with`` block, the previous
    namespace is restored automatically.
    """

    token = _HUB.push(namespace)
    try:
        yield _HUB.get(namespace)
    finally:
        _HUB.reset(token)

astrum.decorators.get_registry

get_registry(namespace: str | None = None) -> SchedulerRegistry

返回指定命名空间对应的 :class:SchedulerRegistry(不存在则按需创建)。

Return the :class:SchedulerRegistry for the specified namespace, creating it on demand if it does not exist.

源代码位于: src/astrum/decorators.py
def get_registry(namespace: str | None = None) -> SchedulerRegistry:
    """返回指定命名空间对应的 :class:`SchedulerRegistry`(不存在则按需创建)。

    Return the :class:`SchedulerRegistry` for the specified namespace, creating it
    on demand if it does not exist.
    """

    return _HUB.get(namespace)

astrum.decorators.clear_registry

clear_registry(namespace: str | None = None) -> None

清除全局 hub 中的注册表。namespace=None 时清空所有命名空间。

Clear registries in the global hub. When namespace=None, clear all namespaces.

源代码位于: src/astrum/decorators.py
def clear_registry(namespace: str | None = None) -> None:
    """清除全局 hub 中的注册表。``namespace=None`` 时清空所有命名空间。

    Clear registries in the global hub. When ``namespace=None``, clear all namespaces.
    """

    _HUB.clear(namespace)

astrum.decorators.active_namespace

active_namespace() -> str

返回当前激活的命名空间名。

Return the name of the currently active namespace.

源代码位于: src/astrum/decorators.py
def active_namespace() -> str:
    """返回当前激活的命名空间名。

    Return the name of the currently active namespace.
    """

    return _HUB.active()

SchedulerRegistry

SchedulerRegistry is useful when you want an explicit workflow object instead of relying on global namespaces.

astrum.decorators.SchedulerRegistry

SchedulerRegistry(name: str = 'default')

Decorator-based DAG builder.

This module is intentionally isolated from the core scheduler: it only converts decorated functions into the same inputs used by manual DAGs.

源代码位于: src/astrum/decorators.py
def __init__(self, name: str = "default") -> None:
    self.name = name
    self._tasks: dict[str, RegisteredTask] = {}

task

task(task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, retry: int = 0) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]

Register a function as a DAG task in this registry.

源代码位于: src/astrum/decorators.py
def task(
    self, task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, retry: int = 0
) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]:
    """Register a function as a DAG task in this registry."""

    return _make_task_decorator(self, task_id=task_id, depends_on=depends_on, data=data, retry=retry)

get_task

get_task(task_id: str) -> RegisteredTask
源代码位于: src/astrum/decorators.py
def get_task(self, task_id: str) -> RegisteredTask:
    try:
        return self._tasks[task_id]
    except KeyError as exc:
        raise TaskRegistrationError(f"Task '{task_id}' is not registered in '{self.name}'.") from exc

get_all_tasks

get_all_tasks() -> dict[str, RegisteredTask]
源代码位于: src/astrum/decorators.py
def get_all_tasks(self) -> dict[str, RegisteredTask]:
    return self._tasks.copy()

clear

clear() -> None
源代码位于: src/astrum/decorators.py
def clear(self) -> None:
    self._tasks.clear()

build_task_orders

build_task_orders(target_tasks: list[str] | None = None, config: AstrumConfig | None = None) -> list[TaskOrder]
源代码位于: src/astrum/decorators.py
def build_task_orders(self, target_tasks: list[str] | None = None, config: AstrumConfig | None = None) -> list[TaskOrder]:
    cfg = config or AstrumConfig()

    all_task_orders = [TaskOrder(task_name=task_id) for task_id in self._tasks]
    all_task_func_map = {task_id: registered.function for task_id, registered in self._tasks.items()}
    generated_transports = auto_generate_data_transports(all_task_orders, all_task_func_map)
    for generated in generated_transports:
        registered_task = self._tasks[generated.task_id]
        merged_data = _merge_task_data(registered_task.data, generated)
        self._tasks[generated.task_id] = RegisteredTask(
            task_id=registered_task.task_id,
            function=registered_task.function,
            depends_on=registered_task.depends_on,
            retry=registered_task.retry,
            data=merged_data,
        )
        setattr(registered_task.function, "_scheduler_task_data", merged_data)

    selected_task_ids = self._collect_task_ids(target_tasks)
    order_map = {task_id: TaskOrder(task_name=task_id) for task_id in selected_task_ids}

    for task_id in selected_task_ids:
        registered_task = self._tasks[task_id]
        order_map[task_id].dependencies = [order_map[dependency] for dependency in registered_task.depends_on if dependency in order_map]

    task_orders = [order_map[task_id] for task_id in selected_task_ids]
    task_func_map = {tid: self._tasks[tid].function for tid in selected_task_ids}

    # 校验并补全数据传输关系
    task_transports: list[TaskData] = [self._tasks[tid].data for tid in selected_task_ids if self._tasks[tid].data is not None]
    resolve_task_data(
        task_orders,
        task_transports,
        allow_no_dir_definition=cfg.allow_no_dir_definition,
        infer_via_ast=cfg.infer_via_ast,
        silence_warnings=cfg.silence_warnings,
    )
    autocast_data_transports_path(task_transports, task_orders)

    # 自动将 data transport 推导出的 from_tasks 同步回任务图的 dependencies
    if cfg.auto_sync_dependencies:
        for task_id, task_order in order_map.items():
            dt = next((d for d in task_transports if d.task_id == task_id), None)
            if dt and dt.from_tasks:
                for from_id in dt.from_tasks:
                    if from_id in order_map and order_map[from_id] not in task_order.dependencies:
                        task_order.dependencies.append(order_map[from_id])

    # 类型安全校验 (final_check)
    if not cfg.skip_type_check:
        errors = final_check(
            task_transports,
            task_orders,
            task_func_map,
            skip_type_check=cfg.skip_type_check,
            infer_via_ast=cfg.infer_via_ast,
            strict_topology=cfg.strict_topology,
        )
        if errors:
            raise TaskRegistrationError(f"Data transport validation failed with the following errors:\n- " + "\n- ".join(errors))

    ExecutionPlanner(task_orders).validate()

    if cfg.visualize:
        from .data_transport import visualize_data_transport

        visualize_data_transport(task_transports, task_orders)

    return task_orders

build_tasks

build_tasks(target_tasks: list[str] | None = None) -> list[tuple[str, Callable[..., Awaitable[Any] | Any]]]

返回 (task_id, callable) 列表。

改造前这里会立刻 func() 把任务定格成无参协程;改造后保留函数引用, 由 :class:DynamicScheduler 在调度期决定何时调用以及如何注入参数。

Return a list of (task_id, callable) pairs.

Before the refactor, this would immediately call func() and freeze the task as a no-argument coroutine; after the refactor, it keeps the function reference and lets :class:DynamicScheduler decide when to call it and how to inject parameters during scheduling.

源代码位于: src/astrum/decorators.py
def build_tasks(self, target_tasks: list[str] | None = None) -> list[tuple[str, Callable[..., Awaitable[Any] | Any]]]:
    """返回 (task_id, callable) 列表。

    改造前这里会立刻 ``func()`` 把任务定格成无参协程;改造后保留函数引用,
    由 :class:`DynamicScheduler` 在调度期决定何时调用以及如何注入参数。

    Return a list of (task_id, callable) pairs.

    Before the refactor, this would immediately call ``func()`` and freeze the
    task as a no-argument coroutine; after the refactor, it keeps the function
    reference and lets :class:`DynamicScheduler` decide when to call it and how
    to inject parameters during scheduling.
    """

    selected_task_ids = self._collect_task_ids(target_tasks)
    return [(task_id, self._tasks[task_id].function) for task_id in selected_task_ids]

build_scheduler

build_scheduler(target_tasks: list[str] | None = None, *, config: AstrumConfig | None = None, ignore_tail_task: list[str] | None = None, concurrency_context: Semaphore | None = None, silence: bool = True, visualize: bool = False) -> DynamicScheduler
源代码位于: src/astrum/decorators.py
def build_scheduler(
    self,
    target_tasks: list[str] | None = None,
    *,
    config: AstrumConfig | None = None,
    # 以下为向后兼容的遗留参数,优先使用 config
    ignore_tail_task: list[str] | None = None,
    concurrency_context: asyncio.Semaphore | None = None,
    silence: bool = True,
    visualize: bool = False,
) -> DynamicScheduler:
    # 如果用户传了 config 就用 config,否则从散装参数构建
    if config is None:
        config = AstrumConfig(
            visualize=visualize,
            silence=silence,
            ignore_tail_task=ignore_tail_task or [],
        )

    task_orders = self.build_task_orders(target_tasks, config=config)
    tasks = self.build_tasks(target_tasks)

    selected_task_ids = self._collect_task_ids(target_tasks)
    task_data_refs: dict[str, TaskData] = {task_id: self._tasks[task_id].data for task_id in selected_task_ids if self._tasks[task_id].data is not None}
    task_retries = {task_id: self._tasks[task_id].retry for task_id in selected_task_ids if self._tasks[task_id].retry > 0}

    sem = concurrency_context or config.build_semaphore()

    return DynamicScheduler(
        tasks=tasks,
        task_order=task_orders,
        task_data_refs=task_data_refs or None,
        has_data_path=bool(task_data_refs),
        ignore_tail_task=config.ignore_tail_task or ignore_tail_task,
        concurrency_context=sem,
        task_retries=task_retries or None,
        silence=config.silence,
    )

run async

run(target_tasks: list[str] | None = None, config: AstrumConfig | None = None)
源代码位于: src/astrum/decorators.py
async def run(self, target_tasks: list[str] | None = None, config: AstrumConfig | None = None):
    scheduler = self.build_scheduler(target_tasks, config=config)
    return await scheduler.execute()

astrum.decorators.RegisteredTask dataclass

RegisteredTask(task_id: str, function: Callable[..., Awaitable[Any] | Any], depends_on: tuple[str, ...], retry: int, data: TaskData | None = None)

Metadata captured by the decorator DAG builder.