跳转至

装饰器与注册表

装饰器 API 是 Astrum 最推荐的使用方式。它把函数注册为 DAG task,并在构建 scheduler 时转换成底层 DynamicScheduler 所需的任务函数和 TaskOrder

模块级入口

astrum.decorators.task

task(task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, namespace: str | None = None, retry: int = 0) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]

模块级任务装饰器。

除了多出的 namespace 参数外,签名和行为与 :meth:SchedulerRegistry.task 完全一致——两者都委托到 :func:_register_function,未来改进底层注册行为 只需要修改那一个函数。

解析命名空间的优先级:显式 namespace= 参数 > use_namespace 上下文栈顶 > :data:DEFAULT_NAMESPACE。注意命名空间 会在 @ 装饰发生时即时解析,因此把 task(...) 调用放在 with use_namespace(...) 块内即可获得上下文绑定的效果。

Module-level task decorator.

Except for the extra namespace parameter, its signature and behavior are exactly the same as :meth:SchedulerRegistry.task: both delegate to :func:_register_function, so future changes to the underlying registration behavior only need to modify that one function.

Namespace resolution priority: explicit namespace= parameter > top of the use_namespace context stack > :data:DEFAULT_NAMESPACE. Note that the namespace is resolved immediately when the @ decoration happens, so placing the task(...) call inside a with use_namespace(...) block is enough to get the context-bound effect.

源代码位于: src/astrum/decorators.py
def task(
    task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, namespace: str | None = None, retry: int = 0
) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]:
    """模块级任务装饰器。

    除了多出的 ``namespace`` 参数外,签名和行为与 :meth:`SchedulerRegistry.task`
    完全一致——两者都委托到 :func:`_register_function`,未来改进底层注册行为
    只需要修改那一个函数。

    解析命名空间的优先级:显式 ``namespace=`` 参数 >
    ``use_namespace`` 上下文栈顶 > :data:`DEFAULT_NAMESPACE`。注意命名空间
    会在 ``@`` 装饰发生时即时解析,因此把 ``task(...)`` 调用放在
    ``with use_namespace(...)`` 块内即可获得上下文绑定的效果。

    Module-level task decorator.

    Except for the extra ``namespace`` parameter, its signature and behavior are
    exactly the same as :meth:`SchedulerRegistry.task`: both delegate to
    :func:`_register_function`, so future changes to the underlying registration
    behavior only need to modify that one function.

    Namespace resolution priority: explicit ``namespace=`` parameter >
    top of the ``use_namespace`` context stack > :data:`DEFAULT_NAMESPACE`. Note
    that the namespace is resolved immediately when the ``@`` decoration happens,
    so placing the ``task(...)`` call inside a ``with use_namespace(...)`` block is
    enough to get the context-bound effect.
    """

    dependencies = tuple(depends_on or ())

    def decorator(func: Callable[..., Awaitable[Any] | Any]) -> Callable[..., Awaitable[Any] | Any]:
        registry = _HUB.get(namespace)
        return _register_function(registry, func, task_id=task_id, depends_on=dependencies, data=data, retry=retry)

    return decorator

astrum.decorators.run async

run(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, visualize: bool = False)
源代码位于: src/astrum/decorators.py
async def run(
    target_tasks: list[str] | None = None,
    *,
    namespace: str | None = None,
    config: AstrumConfig | None = None,
    visualize: bool = False,
):
    if config is None:
        config = AstrumConfig(visualize=visualize)
    return await _HUB.get(namespace).run(target_tasks, config=config)

astrum.decorators.build_scheduler

build_scheduler(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, ignore_tail_task: list[str] | None = None, concurrency_context: Semaphore | None = None, silence: bool = True, visualize: bool = False) -> DynamicScheduler
源代码位于: src/astrum/decorators.py
def build_scheduler(
    target_tasks: list[str] | None = None,
    *,
    namespace: str | None = None,
    config: AstrumConfig | None = None,
    # 以下为向后兼容的遗留参数
    ignore_tail_task: list[str] | None = None,
    concurrency_context: asyncio.Semaphore | None = None,
    silence: bool = True,
    visualize: bool = False,
) -> DynamicScheduler:
    if config is None:
        config = AstrumConfig(
            visualize=visualize,
            silence=silence,
            ignore_tail_task=ignore_tail_task or [],
        )
    return _HUB.get(namespace).build_scheduler(
        target_tasks,
        config=config,
        concurrency_context=concurrency_context,
    )

astrum.decorators.build_task_orders

build_task_orders(target_tasks: list[str] | None = None, *, namespace: str | None = None, config: AstrumConfig | None = None, visualize: bool = False) -> list[TaskOrder]
源代码位于: src/astrum/decorators.py
def build_task_orders(
    target_tasks: list[str] | None = None,
    *,
    namespace: str | None = None,
    config: AstrumConfig | None = None,
    visualize: bool = False,
) -> list[TaskOrder]:
    if config is None:
        config = AstrumConfig(visualize=visualize)
    return _HUB.get(namespace).build_task_orders(target_tasks, config=config)

命名空间

astrum.decorators.use_namespace

use_namespace(namespace: str) -> Iterator[SchedulerRegistry]

临时把激活命名空间切换为 namespace

with use_namespace("analytics"): 块内的 @task 装饰器与 build_scheduler / run 等模块级函数将默认作用于该命名空间。 支持嵌套,退出 with 后会自动恢复上一层命名空间。

Temporarily switch the active namespace to namespace.

Inside a with use_namespace("analytics"): block, the @task decorator and module-level functions such as build_scheduler / run default to that namespace. Nesting is supported; after leaving the with block, the previous namespace is restored automatically.

源代码位于: src/astrum/decorators.py
@contextmanager
def use_namespace(namespace: str) -> Iterator[SchedulerRegistry]:
    """临时把激活命名空间切换为 ``namespace``。

    ``with use_namespace("analytics"):`` 块内的 ``@task`` 装饰器与
    ``build_scheduler`` / ``run`` 等模块级函数将默认作用于该命名空间。
    支持嵌套,退出 ``with`` 后会自动恢复上一层命名空间。

    Temporarily switch the active namespace to ``namespace``.

    Inside a ``with use_namespace("analytics"):`` block, the ``@task`` decorator and
    module-level functions such as ``build_scheduler`` / ``run`` default to that
    namespace. Nesting is supported; after leaving the ``with`` block, the previous
    namespace is restored automatically.
    """

    token = _HUB.push(namespace)
    try:
        yield _HUB.get(namespace)
    finally:
        _HUB.reset(token)

astrum.decorators.get_registry

get_registry(namespace: str | None = None) -> SchedulerRegistry

返回指定命名空间对应的 :class:SchedulerRegistry(不存在则按需创建)。

Return the :class:SchedulerRegistry for the specified namespace, creating it on demand if it does not exist.

源代码位于: src/astrum/decorators.py
def get_registry(namespace: str | None = None) -> SchedulerRegistry:
    """返回指定命名空间对应的 :class:`SchedulerRegistry`(不存在则按需创建)。

    Return the :class:`SchedulerRegistry` for the specified namespace, creating it
    on demand if it does not exist.
    """

    return _HUB.get(namespace)

astrum.decorators.clear_registry

clear_registry(namespace: str | None = None) -> None

清除全局 hub 中的注册表。namespace=None 时清空所有命名空间。

Clear registries in the global hub. When namespace=None, clear all namespaces.

源代码位于: src/astrum/decorators.py
def clear_registry(namespace: str | None = None) -> None:
    """清除全局 hub 中的注册表。``namespace=None`` 时清空所有命名空间。

    Clear registries in the global hub. When ``namespace=None``, clear all namespaces.
    """

    _HUB.clear(namespace)

astrum.decorators.active_namespace

active_namespace() -> str

返回当前激活的命名空间名。

Return the name of the currently active namespace.

源代码位于: src/astrum/decorators.py
def active_namespace() -> str:
    """返回当前激活的命名空间名。

    Return the name of the currently active namespace.
    """

    return _HUB.active()

SchedulerRegistry

SchedulerRegistry 适合显式创建独立工作流对象,避免依赖全局命名空间。

astrum.decorators.SchedulerRegistry

SchedulerRegistry(name: str = 'default')

Decorator-based DAG builder.

This module is intentionally isolated from the core scheduler: it only converts decorated functions into the same inputs used by manual DAGs.

源代码位于: src/astrum/decorators.py
def __init__(self, name: str = "default") -> None:
    self.name = name
    self._tasks: dict[str, RegisteredTask] = {}

task

task(task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, retry: int = 0) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]

Register a function as a DAG task in this registry.

源代码位于: src/astrum/decorators.py
def task(
    self, task_id: str | None = None, *, depends_on: list[str] | tuple[str, ...] | None = None, data: TaskData | None = None, retry: int = 0
) -> Callable[[Callable[..., Awaitable[Any] | Any]], Callable[..., Awaitable[Any] | Any]]:
    """Register a function as a DAG task in this registry."""

    return _make_task_decorator(self, task_id=task_id, depends_on=depends_on, data=data, retry=retry)

get_task

get_task(task_id: str) -> RegisteredTask
源代码位于: src/astrum/decorators.py
def get_task(self, task_id: str) -> RegisteredTask:
    try:
        return self._tasks[task_id]
    except KeyError as exc:
        raise TaskRegistrationError(f"Task '{task_id}' is not registered in '{self.name}'.") from exc

get_all_tasks

get_all_tasks() -> dict[str, RegisteredTask]
源代码位于: src/astrum/decorators.py
def get_all_tasks(self) -> dict[str, RegisteredTask]:
    return self._tasks.copy()

clear

clear() -> None
源代码位于: src/astrum/decorators.py
def clear(self) -> None:
    self._tasks.clear()

build_task_orders

build_task_orders(target_tasks: list[str] | None = None, config: AstrumConfig | None = None) -> list[TaskOrder]
源代码位于: src/astrum/decorators.py
def build_task_orders(self, target_tasks: list[str] | None = None, config: AstrumConfig | None = None) -> list[TaskOrder]:
    cfg = config or AstrumConfig()

    all_task_orders = [TaskOrder(task_name=task_id) for task_id in self._tasks]
    all_task_func_map = {task_id: registered.function for task_id, registered in self._tasks.items()}
    generated_transports = auto_generate_data_transports(all_task_orders, all_task_func_map)
    for generated in generated_transports:
        registered_task = self._tasks[generated.task_id]
        merged_data = _merge_task_data(registered_task.data, generated)
        self._tasks[generated.task_id] = RegisteredTask(
            task_id=registered_task.task_id,
            function=registered_task.function,
            depends_on=registered_task.depends_on,
            retry=registered_task.retry,
            data=merged_data,
        )
        setattr(registered_task.function, "_scheduler_task_data", merged_data)

    selected_task_ids = self._collect_task_ids(target_tasks)
    order_map = {task_id: TaskOrder(task_name=task_id) for task_id in selected_task_ids}

    for task_id in selected_task_ids:
        registered_task = self._tasks[task_id]
        order_map[task_id].dependencies = [order_map[dependency] for dependency in registered_task.depends_on if dependency in order_map]

    task_orders = [order_map[task_id] for task_id in selected_task_ids]
    task_func_map = {tid: self._tasks[tid].function for tid in selected_task_ids}

    # 校验并补全数据传输关系
    task_transports: list[TaskData] = [self._tasks[tid].data for tid in selected_task_ids if self._tasks[tid].data is not None]
    resolve_task_data(
        task_orders,
        task_transports,
        allow_no_dir_definition=cfg.allow_no_dir_definition,
        infer_via_ast=cfg.infer_via_ast,
        silence_warnings=cfg.silence_warnings,
    )
    autocast_data_transports_path(task_transports, task_orders)

    # 自动将 data transport 推导出的 from_tasks 同步回任务图的 dependencies
    if cfg.auto_sync_dependencies:
        for task_id, task_order in order_map.items():
            dt = next((d for d in task_transports if d.task_id == task_id), None)
            if dt and dt.from_tasks:
                for from_id in dt.from_tasks:
                    if from_id in order_map and order_map[from_id] not in task_order.dependencies:
                        task_order.dependencies.append(order_map[from_id])

    # 类型安全校验 (final_check)
    if not cfg.skip_type_check:
        errors = final_check(
            task_transports,
            task_orders,
            task_func_map,
            skip_type_check=cfg.skip_type_check,
            infer_via_ast=cfg.infer_via_ast,
            strict_topology=cfg.strict_topology,
        )
        if errors:
            raise TaskRegistrationError(f"Data transport validation failed with the following errors:\n- " + "\n- ".join(errors))

    ExecutionPlanner(task_orders).validate()

    if cfg.visualize:
        from .data_transport import visualize_data_transport

        visualize_data_transport(task_transports, task_orders)

    return task_orders

build_tasks

build_tasks(target_tasks: list[str] | None = None) -> list[tuple[str, Callable[..., Awaitable[Any] | Any]]]

返回 (task_id, callable) 列表。

改造前这里会立刻 func() 把任务定格成无参协程;改造后保留函数引用, 由 :class:DynamicScheduler 在调度期决定何时调用以及如何注入参数。

Return a list of (task_id, callable) pairs.

Before the refactor, this would immediately call func() and freeze the task as a no-argument coroutine; after the refactor, it keeps the function reference and lets :class:DynamicScheduler decide when to call it and how to inject parameters during scheduling.

源代码位于: src/astrum/decorators.py
def build_tasks(self, target_tasks: list[str] | None = None) -> list[tuple[str, Callable[..., Awaitable[Any] | Any]]]:
    """返回 (task_id, callable) 列表。

    改造前这里会立刻 ``func()`` 把任务定格成无参协程;改造后保留函数引用,
    由 :class:`DynamicScheduler` 在调度期决定何时调用以及如何注入参数。

    Return a list of (task_id, callable) pairs.

    Before the refactor, this would immediately call ``func()`` and freeze the
    task as a no-argument coroutine; after the refactor, it keeps the function
    reference and lets :class:`DynamicScheduler` decide when to call it and how
    to inject parameters during scheduling.
    """

    selected_task_ids = self._collect_task_ids(target_tasks)
    return [(task_id, self._tasks[task_id].function) for task_id in selected_task_ids]

build_scheduler

build_scheduler(target_tasks: list[str] | None = None, *, config: AstrumConfig | None = None, ignore_tail_task: list[str] | None = None, concurrency_context: Semaphore | None = None, silence: bool = True, visualize: bool = False) -> DynamicScheduler
源代码位于: src/astrum/decorators.py
def build_scheduler(
    self,
    target_tasks: list[str] | None = None,
    *,
    config: AstrumConfig | None = None,
    # 以下为向后兼容的遗留参数,优先使用 config
    ignore_tail_task: list[str] | None = None,
    concurrency_context: asyncio.Semaphore | None = None,
    silence: bool = True,
    visualize: bool = False,
) -> DynamicScheduler:
    # 如果用户传了 config 就用 config,否则从散装参数构建
    if config is None:
        config = AstrumConfig(
            visualize=visualize,
            silence=silence,
            ignore_tail_task=ignore_tail_task or [],
        )

    task_orders = self.build_task_orders(target_tasks, config=config)
    tasks = self.build_tasks(target_tasks)

    selected_task_ids = self._collect_task_ids(target_tasks)
    task_data_refs: dict[str, TaskData] = {task_id: self._tasks[task_id].data for task_id in selected_task_ids if self._tasks[task_id].data is not None}
    task_retries = {task_id: self._tasks[task_id].retry for task_id in selected_task_ids if self._tasks[task_id].retry > 0}

    sem = concurrency_context or config.build_semaphore()

    return DynamicScheduler(
        tasks=tasks,
        task_order=task_orders,
        task_data_refs=task_data_refs or None,
        has_data_path=bool(task_data_refs),
        ignore_tail_task=config.ignore_tail_task or ignore_tail_task,
        concurrency_context=sem,
        task_retries=task_retries or None,
        silence=config.silence,
    )

run async

run(target_tasks: list[str] | None = None, config: AstrumConfig | None = None)
源代码位于: src/astrum/decorators.py
async def run(self, target_tasks: list[str] | None = None, config: AstrumConfig | None = None):
    scheduler = self.build_scheduler(target_tasks, config=config)
    return await scheduler.execute()

astrum.decorators.RegisteredTask dataclass

RegisteredTask(task_id: str, function: Callable[..., Awaitable[Any] | Any], depends_on: tuple[str, ...], retry: int, data: TaskData | None = None)

Metadata captured by the decorator DAG builder.