Guide¶
This page introduces Astrum through two questions: how to organize tasks, and how to pass data between them. The examples are intentionally small so you can copy and adapt them.
Module-level @task¶
The most direct style registers functions in the default namespace and calls run().
from astrum import task, run
@task("extract")
async def extract() -> dict:
return {"rows": 100}
@task("load", depends_on=["extract"])
async def load() -> None:
print("loaded")
report = await run(target_tasks=["load"])
Direct namespace=...¶
In real projects, give each workflow a namespace to avoid task name collisions across modules.
from astrum import task, run, clear_registry
@task("extract", namespace="daily_report")
async def extract() -> None:
...
@task("load", depends_on=["extract"], namespace="daily_report")
async def load() -> None:
...
report = await run(target_tasks=["load"], namespace="daily_report")
clear_registry("daily_report")
with use_namespace¶
If a group of tasks belongs to the same namespace, use the context manager to avoid repeating namespace=.
from astrum import task, run, use_namespace
with use_namespace("analytics"):
@task("load_csv")
async def load_csv() -> None:
...
@task("clean_csv", depends_on=["load_csv"])
async def clean_csv() -> None:
...
report = await run(target_tasks=["clean_csv"], namespace="analytics")
SchedulerRegistry¶
Create an explicit registry when you do not want to rely on the global registry. This style is useful for libraries, tests, and code that packages multiple workflows.
from astrum import SchedulerRegistry
workflow = SchedulerRegistry("billing")
@workflow.task("fetch_invoice")
async def fetch_invoice() -> dict:
return {"invoice_id": "INV-001"}
@workflow.task("send_invoice", depends_on=["fetch_invoice"])
async def send_invoice() -> None:
print("sent")
report = await workflow.run(target_tasks=["send_invoice"])
Manual DAGs¶
Decorator mode fits most application code. If you want to separate task functions from graph structure, create TaskOrder objects and pass them to DynamicScheduler.
from astrum import DynamicScheduler, TaskOrder
async def extract() -> None:
...
async def load() -> None:
...
extract_order = TaskOrder("extract")
load_order = TaskOrder("load", dependencies=[extract_order])
scheduler = DynamicScheduler(
tasks=[("extract", extract), ("load", load)],
task_order=[extract_order, load_order],
)
report = await scheduler.execute()
Pass function references such as extract, not already-created coroutine objects such as extract().
Annotation-driven data transport¶
Prefer Ref and F for declaring where arguments come from. F("task", "field") reads a field from an upstream task result and injects it into the current parameter.
from astrum import F, Ref, task, run, AstrumConfig
@task("load_order")
async def load_order() -> dict:
return {"order_id": "A-001", "amount": 128}
@task("format_order")
async def format_order(
order_id: Ref[str, F("load_order", "order_id")],
amount: Ref[int, F("load_order", "amount")],
) -> dict:
return {"message": f"order {order_id}: ${amount}"}
report = await run(
target_tasks=["format_order"],
config=AstrumConfig(skip_type_check=True),
)
Common locators:
F("source", "name"): read a dict key or object attribute.F("source", 0): read a list/tuple index.F("source"): pass the whole upstream result.
Explicit TaskData data transport¶
Use explicit TaskData when data relationships are generated dynamically, or when you need precise control over sources, targets, and type constraints.
from astrum import AstrumConfig, run, task
from astrum.data_transport import DTRela, DataItem, TaskData
@task("load_user", data=TaskData())
async def load_user() -> dict:
return {"name": "Alice"}
@task(
"greet",
data=TaskData(
input_data_item=[
DataItem(
allow_data_model=str,
from_relation=DTRela(key="name", related_task="load_user"),
to_relation=DTRela(key="name", related_task="greet"),
)
]
),
)
async def greet(name: str) -> None:
print(f"hello {name}")
report = await run(
target_tasks=["greet"],
config=AstrumConfig(skip_type_check=True),
)
For day-to-day application code, prefer annotation-driven transport. Explicit TaskData is better for framework integration, legacy migration, and dynamic DAGs.
Inspect the execution plan¶
build_scheduler() builds a scheduler without running it. You can inspect the stage plan first, then call execute().
from astrum import build_scheduler
scheduler = build_scheduler(target_tasks=["greet"])
plan = scheduler.get_execute_timeline()
print(plan.get_visualization_table())
report = await scheduler.execute()
Retries and failures¶
@task(..., retry=N) retries a failed task up to N times. If a task ultimately fails, Astrum cancels parallel work that should no longer continue and records the reason in ExecutionReport.error_summary.
@task("flaky", retry=2)
async def flaky() -> None:
...
report = await run(target_tasks=["flaky"])
for stat in report.task_statistics:
print(stat.task_name, stat.status, stat.attempt_count)
Configuration¶
AstrumConfig controls scheduler behavior in one object.