Skip to content

Adding Tasks

This page explains how to enqueue tasks, what add_task() returns, and the options available at call time.

How add_task works

Once you have a task function and a route that injects ManagedBackgroundTasks, enqueue work by calling add_task():

@app.post("/signup")
def signup(email: str, tasks=Depends(task_manager.get_tasks)):
    task_id = tasks.add_task(send_email, email)
    return {"task_id": task_id}

add_task() returns a task_id string immediately. The task itself runs after the response is sent.

Positional and keyword arguments

Pass arguments the same way you would call the function directly:

tasks.add_task(send_email, "user@example.com")                 # positional
tasks.add_task(send_email, address="user@example.com")         # keyword
tasks.add_task(resize_image, image_id, width=800, height=600)  # mixed

Using the task_id to track status

The returned task_id can be stored in your database and used later to query task status via the API:

@app.post("/report")
def request_report(user_id: int, tasks=Depends(task_manager.get_tasks)):
    task_id = tasks.add_task(generate_report, user_id)
    return {"task_id": task_id}

@app.get("/report/{task_id}")
def report_status(task_id: str):
    record = task_manager.store.get(task_id)
    if record is None:
        raise HTTPException(status_code=404)
    return record.to_dict()

You can also query via the built-in API route at GET /tasks/{task_id} when TaskAdmin is mounted.

Idempotency keys

Some operations should only run once, even if a client retries the request. Pass an idempotency_key to prevent duplicate execution:

@app.post("/invoice/{invoice_id}/send")
def send_invoice(invoice_id: int, tasks=Depends(task_manager.get_tasks)):
    task_id = tasks.add_task(
        email_invoice,
        invoice_id,
        idempotency_key=f"invoice-{invoice_id}",
    )
    return {"task_id": task_id}

If a non-failed task with the same key already exists, add_task() returns its existing task_id without enqueuing the function again. No duplicate task is created.

Note

Idempotency checks are performed in-process first (fast, no I/O). When a shared backend is configured and multiple instances are running, the check also covers tasks from other instances.

Tip

A failed task does not block a re-run. If a task fails and you want to retry it, the existing key will not prevent a new attempt.

Tags

Tags are key/value labels you attach to a task at enqueue time. They travel with the task through its entire lifecycle and are forwarded to every log and lifecycle event, making it straightforward to filter or aggregate by label in downstream systems.

@app.post("/invoice")
def create_invoice(user_id: int, plan: str, tasks=Depends(task_manager.get_tasks)):
    task_id = tasks.add_task(
        process_invoice,
        user_id,
        tags={"user_id": str(user_id), "plan": plan, "source": "api"},
    )
    return {"task_id": task_id}

Inside the task function, tags are accessible via get_task_context():

from fastapi_taskflow import get_task_context, task_log

@task_manager.task(retries=2)
def process_invoice(user_id: int) -> None:
    ctx = get_task_context()
    plan = ctx.tags.get("plan", "free") if ctx else "free"
    task_log("Processing invoice", user_id=user_id, plan=plan)

See Task Context for the full get_task_context() API.

Overriding eager and priority per call

The eager and priority values set on the decorator apply to every call. You can override them for a specific enqueue without changing the decorator:

# Normally this task runs after the response.
# For this one call, dispatch it immediately.
task_id = tasks.add_task(send_email, email, eager=True)

# Route this call through the priority queue at level 9.
task_id = tasks.add_task(send_alert, message, priority=9)

eager=True: The task is dispatched via asyncio.create_task immediately when add_task() is called, before FastAPI sends the response. Use this when you need the task to start before the response goes out.

priority: Routes the task through the priority queue instead of the standard background task list. Higher integers run first. The conventional range is 1 (lowest) to 10 (highest), but any integer is accepted. Tasks with the same priority execute in arrival order (FIFO).

Info

When priority is set, the eager flag is ignored. The priority queue provides its own non-blocking dispatch path.

Adding tasks outside a route

If you need to enqueue a task from a startup handler, a management script, or anywhere else without a request context, construct ManagedBackgroundTasks directly and dispatch with asyncio.create_task:

import asyncio
from fastapi_taskflow import ManagedBackgroundTasks

async def on_startup():
    tasks = ManagedBackgroundTasks(task_manager)
    task_id = tasks.add_task(sync_database, eager=True)

Note

Pass eager=True (or priority=...) when adding tasks outside a route. Without a request lifecycle to trigger Starlette's background runner, the task would sit in the queue and never be dispatched.

For most situations, the route-based injection patterns are simpler and preferred. Direct construction is only necessary when no request context is available.