Skip to content

Defining Tasks

This page explains how to define tasks, what configuration options are available, and how the task lifecycle works.

Why use the decorator?

You can pass any plain function to add_task() and it will be tracked with a UUID and run as a background task. But without the decorator, you get no retries, no persist-on-startup behavior, and no custom name in the dashboard.

The decorator is how you attach execution configuration to a function once, so you never have to repeat it at call sites.

Basic definition

from fastapi_taskflow import TaskManager

task_manager = TaskManager()

@task_manager.task()
def send_email(address: str) -> None:
    ...

Without any arguments, the task runs once with no retries. This is the right starting point for most tasks.

Decorator options

Parameter Type Default Description
retries int 0 Additional attempts after the first failure
delay float 0.0 Seconds to wait before the first retry
backoff float 1.0 Multiplier applied to delay on each retry
persist bool False Save this task record for requeue on startup
name str function name Override the display name in the dashboard and logs
requeue_on_interrupt bool False Save as PENDING if interrupted at shutdown, then requeue on next startup
eager bool False Dispatch via asyncio.create_task immediately when add_task() is called, before the response is sent
priority int \| None None Route through the priority queue instead of the standard task list. Higher values run first
executor "async" \| "thread" \| "process" \| None None Force a specific executor. None auto-detects: async for coroutines, thread for sync functions

When would you use each option?

  • retries + delay + backoff: Any task that calls an external service and should survive transient failures.
  • persist: Tasks that must not be lost if the process restarts before they complete.
  • name: When the function name is unclear or generated, and you want a human-readable label in the dashboard.
  • requeue_on_interrupt: For idempotent tasks where running from scratch after an unclean shutdown is acceptable.
  • eager: When you need the task to start before the HTTP response goes out (for example, to guarantee ordering).
  • priority: When some tasks are more time-sensitive than others and you want coarse control over execution order.
  • executor: When you need explicit control over how a task runs, in particular "process" for CPU-bound work.

Retry behavior

When a task raises an exception, fastapi-taskflow waits delay seconds and tries again. On each subsequent retry, the wait time is multiplied by backoff.

@task_manager.task(retries=3, delay=1.0, backoff=2.0)
def send_email(address: str) -> None:
    ...

With retries=3, delay=1.0, and backoff=2.0, the wait times are:

Attempt Wait before this attempt
1st retry 1.0 s
2nd retry 2.0 s (1.0 x 2.0)
3rd retry 4.0 s (2.0 x 2.0)

Set backoff=1.0 (the default) for a constant retry interval.

Tip

For tasks that call external APIs, retries=3, delay=1.0, backoff=2.0 is a good starting point. It gives the remote service time to recover without hammering it.

Async and sync tasks

The decorator works identically for both async and sync functions.

  • Async functions are awaited directly on the event loop (AsyncExecutor).
  • Sync functions are offloaded to a thread pool so they do not block request handling (ThreadExecutor).
  • CPU-bound functions can be routed to a separate OS process with executor="process" (ProcessExecutor).
@task_manager.task(retries=1, delay=0.5)
async def process_webhook(payload: dict) -> None:
    await some_http_client.post("/endpoint", json=payload)

@task_manager.task(retries=2, delay=1.0)
def send_notification(user_id: int) -> None:
    # runs in a thread, never blocks the event loop
    push_service.send(user_id)

@task_manager.task(executor="process")
def generate_report(report_id: int) -> bytes:
    # runs in a separate OS process, bypasses the GIL
    return render_pdf(report_id)

No extra configuration is needed for async or sync tasks. fastapi-taskflow detects whether the function is a coroutine and dispatches accordingly. Set executor="process" explicitly for CPU-bound work.

How executors are selected

flowchart TD
    A[add_task called] --> B{executor= set?}
    B -- yes --> C{which executor?}
    B -- no --> D{async def?}
    D -- yes --> E[AsyncExecutor]
    D -- no --> F[ThreadExecutor]
    C -- async --> E
    C -- thread --> F
    C -- process --> G{eager=True?}
    G -- yes --> H[warn + fallback to async/thread]
    G -- no --> I[ProcessExecutor]

See Process Executor for the constraints and configuration specific to process tasks.

Task lifecycle

Every task moves through these states:

stateDiagram-v2
    [*] --> PENDING : add_task() called
    PENDING --> RUNNING : execution starts
    RUNNING --> SUCCESS : function returned
    RUNNING --> FAILED : all retries exhausted
    RUNNING --> RUNNING : retry attempt
    RUNNING --> INTERRUPTED : shutdown during execution (requeue_on_interrupt=False)
    RUNNING --> PENDING : shutdown during execution (requeue_on_interrupt=True)
    FAILED --> [*] : POST /tasks/{id}/retry creates new task
    INTERRUPTED --> [*] : POST /tasks/{id}/retry creates new task

PENDING: The task has been enqueued and is waiting to run.

RUNNING: Execution has started. If a retry is triggered, the task stays in RUNNING for the next attempt.

SUCCESS: The function returned without raising an exception.

FAILED: The function raised an exception and all retries were exhausted.

INTERRUPTED: The process shut down while the task was mid-execution and requeue_on_interrupt was False (or not set). The task did not complete.

INTERRUPTED tasks are visible in the dashboard and queryable via the API. They are not retried automatically. Use this status to identify tasks that need manual follow-up after an unclean shutdown.

POST /tasks/{task_id}/retry re-enqueues a FAILED or INTERRUPTED task as a brand new task with a fresh UUID. The original record is preserved in history.

Warning

Only set requeue_on_interrupt=True on tasks that are idempotent: functions where running from scratch (even after partial execution) produces the correct result. If your task sends an email or charges a card, requeing it after an interrupt could cause duplicate actions.

Calling the function directly

The decorator returns the original function unchanged. You can still call it normally in tests or other contexts:

@task_manager.task(retries=3)
def send_email(address: str) -> None:
    ...

# In a test - no task tracking, just the function
send_email("test@example.com")

Unregistered functions

You can pass a plain function (without the decorator) to add_task(). It will be tracked with a UUID and run with default settings: no retries, no persistence. Use this for fire-and-forget work where you only need basic tracking.

def plain_work() -> None:
    ...

tasks.add_task(plain_work)  # tracked, no retries