TaskManager¶
TaskManager is the central object in fastapi-taskflow: it holds the task registry and in-memory store, manages the snapshot scheduler and observer chain, and exposes the @task() and @schedule() decorators along with FastAPI dependency helpers.
Guide: Task Manager covers setup patterns and injection strategies. This page documents every parameter and method signature.
Constructor¶
from fastapi_taskflow import TaskManager
task_manager = TaskManager(
snapshot_db="tasks.db",
retries=3,
# ...
)
TaskManager(
snapshot_db: str | None = None,
snapshot_backend: SnapshotBackend | None = None,
snapshot_interval: float = 60.0,
requeue_pending: bool = False,
merged_list_ttl: float = 5.0,
loggers: list[TaskObserver] | None = None,
log_file: str | None = None,
log_file_max_bytes: int = 10485760,
log_file_backup_count: int = 5,
log_file_mode: str = "rotate",
log_lifecycle: bool = False,
encrypt_args_key: bytes | str | None = None,
max_concurrent_tasks: int | None = None,
max_sync_threads: int | None = None,
max_process_workers: int | None = None,
process_shutdown_timeout: float = 30.0,
retention_days: float | None = None,
)
Persistence parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
snapshot_db |
str \| None |
None |
Path to a SQLite file. Creates a SqliteBackend automatically. Ignored when snapshot_backend is also provided. |
snapshot_backend |
SnapshotBackend \| None |
None |
A pre-constructed backend instance (for example RedisBackend or PostgresBackend). Takes precedence over snapshot_db. |
snapshot_interval |
float |
60.0 |
Seconds between periodic flushes of completed tasks to the backend. |
requeue_pending |
bool |
False |
When True, unfinished tasks are saved at shutdown and re-dispatched on the next startup. |
merged_list_ttl |
float |
5.0 |
How long, in seconds, to cache the backend task list used by merged_list(). Only the backend portion is cached; in-memory tasks are always current. Useful in multi-instance deployments. |
retention_days |
float \| None |
None |
Automatically delete terminal task records (success, failed, cancelled) older than this many days. Pruning runs approximately every 6 hours inside the snapshot loop. None disables automatic pruning. Can also be set via TaskAdmin(retention_days=...). |
Logging parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
loggers |
list[TaskObserver] \| None |
None |
Observer instances that receive every task_log() call and lifecycle transition. All observers run independently; an error in one never affects the others or the task. |
log_file |
str \| None |
None |
Shorthand for adding a single FileLogger. Equivalent to loggers=[FileLogger(log_file)]. Ignored when loggers already contains a FileLogger for the same path. |
log_file_max_bytes |
int |
10485760 |
Maximum file size before rotation (10 MB). Only used with log_file. Has no effect in "watched" mode. |
log_file_backup_count |
int |
5 |
Number of rotated backup files to keep. Only used with log_file. |
log_file_mode |
str |
"rotate" |
"rotate" uses RotatingFileHandler for single-process setups. "watched" uses WatchedFileHandler for multi-process deployments with external rotation (for example, logrotate). Only used with log_file. |
log_lifecycle |
bool |
False |
Also write task lifecycle transitions (running, success, failed) when using the log_file shorthand. |
Concurrency parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
max_concurrent_tasks |
int \| None |
None |
Maximum number of async tasks running concurrently on the event loop. When set, an asyncio.Semaphore is acquired before each execution and released on completion. None removes the limit. |
max_sync_threads |
int \| None |
None |
Size of the dedicated thread pool for sync task functions. When set, sync tasks run in an isolated ThreadPoolExecutor instead of the default asyncio pool, preventing task bursts from starving sync request handlers. None uses asyncio.to_thread. |
max_process_workers |
int \| None |
None |
Number of worker processes in the ProcessPoolExecutor used by executor='process' tasks. None uses os.cpu_count(). Has no effect unless at least one task is registered with executor='process'. |
process_shutdown_timeout |
float |
30.0 |
Seconds to wait for in-flight process tasks to finish at shutdown before terminating workers forcefully. Only applies when executor='process' tasks are registered. |
Encryption parameter¶
| Parameter | Type | Default | Description |
|---|---|---|---|
encrypt_args_key |
bytes \| str \| None |
None |
A Fernet key for encrypting task args and kwargs at rest. When set, arguments are encrypted at enqueue time and decrypted only when the executor is about to call the function. Accepts a URL-safe base64 string or raw bytes from Fernet.generate_key(). Requires pip install "fastapi-taskflow[encryption]". |
Decorators¶
task()¶
Registers a function as a managed background task. The decorated function is returned unchanged and can still be called directly.
@task_manager.task(
retries=3,
delay=1.0,
backoff=2.0,
persist=False,
name=None,
requeue_on_interrupt=False,
eager=False,
priority=None,
executor=None,
)
def my_task(user_id: int) -> None:
...
| Parameter | Type | Default | Description |
|---|---|---|---|
retries |
int |
0 |
Additional attempts after the first failure. A value of 3 means up to 4 total attempts. |
delay |
float |
0.0 |
Seconds to wait before the first retry. |
backoff |
float |
1.0 |
Multiplier applied to delay on each subsequent retry. Use 2.0 for exponential backoff (1 s, 2 s, 4 s, ...). |
persist |
bool |
False |
Save this task record to the backend for requeue on restart. Scoped to this function only. |
name |
str \| None |
function name | Override the display name in logs and the dashboard. |
requeue_on_interrupt |
bool |
False |
When True and requeue_pending=True on the manager, a task interrupted at shutdown is reset to PENDING and re-dispatched on next startup. Only set this for idempotent functions that are safe to restart from scratch. |
eager |
bool |
False |
Dispatch via asyncio.create_task immediately when add_task() is called, before FastAPI sends the response. Per-call eager on add_task() overrides this value. |
priority |
int \| None |
None |
Route through the priority queue instead of Starlette's background task list. Higher values run first. The conventional range is 1 (lowest) to 10 (highest). Per-call priority on add_task() overrides this value. |
executor |
"async" \| "thread" \| "process" \| None |
None |
Force a specific executor. "async" requires a coroutine. "thread" requires a plain function. "process" routes to a ProcessPoolExecutor and requires a module-level function with picklable arguments. None auto-detects from the function signature. |
Raises ValueError at decoration time if the function is incompatible with the requested executor (for example, executor='async' on a sync function, or executor='process' on a lambda or nested function).
Example:
@task_manager.task(retries=3, delay=1.0, backoff=2.0)
def send_email(address: str) -> None:
...
@task_manager.task(executor="process")
def generate_report(report_id: int) -> bytes:
return render_pdf(report_id)
See also: Process Executor for constraints and configuration specific to
executor='process'.
schedule()¶
Registers a function as a periodic background task. Exactly one of every or cron must be provided.
The decorated function is also added to the task registry so it can be enqueued manually via add_task() in addition to running on schedule.
| Parameter | Type | Default | Description |
|---|---|---|---|
every |
float \| None |
None |
Interval in seconds between runs (for example 300 for every 5 minutes). Mutually exclusive with cron. |
cron |
str \| None |
None |
Five-field cron expression (for example "0 * * * *" for every hour). Requires pip install "fastapi-taskflow[scheduler]". Mutually exclusive with every. |
retries |
int |
0 |
Additional attempts after the first failure. |
delay |
float |
0.0 |
Seconds to wait before the first retry. |
backoff |
float |
1.0 |
Multiplier applied to delay on each retry. |
name |
str \| None |
function name | Override the display name in logs and the dashboard. |
run_on_startup |
bool |
False |
When True, fire the task on the first scheduler tick immediately after startup, rather than waiting for the first interval or cron slot. |
timezone |
str |
"UTC" |
IANA timezone name used when evaluating cron expressions (for example "America/New_York"). Ignored when every is used. |
executor |
"async" \| "thread" \| "process" \| None |
None |
Force a specific executor for each firing. Same constraints as on @task(). None auto-detects from the function signature. |
Raises ValueError if neither or both of every and cron are provided, or if executor is incompatible with the function. Raises ImportError if cron is used and croniter is not installed.
Examples:
@task_manager.schedule(every=300, retries=1)
async def cleanup_expired_sessions() -> None:
...
@task_manager.schedule(cron="0 9 * * *", timezone="America/New_York")
async def morning_report() -> None:
...
@task_manager.schedule(every=3600, executor="process")
def rebuild_search_index() -> None:
...
See also: Scheduled Tasks API for
ScheduledEntryfields and theTaskRecord.sourcevalue.
Methods¶
get_tasks(background_tasks)¶
FastAPI dependency that returns a ManagedBackgroundTasks instance sharing the native request task list.
@app.post("/signup")
def signup(email: str, tasks=Depends(task_manager.get_tasks)):
task_id = tasks.add_task(send_email, email)
return {"task_id": task_id}
background_tasks¶
Property alias for get_tasks. Reads more naturally when naming the dependency parameter:
@app.post("/signup")
def signup(
email: str,
background_tasks: ManagedBackgroundTasks = Depends(task_manager.background_tasks),
):
task_id = background_tasks.add_task(send_email, email)
return {"task_id": task_id}
init_app(app)¶
Registers startup and shutdown hooks on app. Use this when you want lifecycle management without mounting the dashboard or any other routes.
from fastapi import FastAPI
from fastapi_taskflow import TaskManager
task_manager = TaskManager(snapshot_db="tasks.db")
app = FastAPI()
task_manager.init_app(app)
Calling init_app() more than once on the same app is safe. Hooks are only registered the first time.
TaskAdmin calls init_app() internally, so you do not need to call it yourself when using TaskAdmin.
startup()¶
Starts the snapshot scheduler, requeues pending tasks, and initialises the observer chain. Called automatically by init_app() (and therefore by TaskAdmin).
Call this directly only when managing the lifecycle yourself without init_app():
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi_taskflow import TaskManager
task_manager = TaskManager(snapshot_db="tasks.db")
@asynccontextmanager
async def lifespan(app):
await task_manager.startup()
yield
await task_manager.shutdown()
app = FastAPI(lifespan=lifespan)
Prefer init_app() when possible. It registers both hooks for you and is safe to call multiple times.
shutdown()¶
Stops the scheduler, flushes completed and pending tasks to the backend, closes the observer chain, shuts down the sync thread pool, and waits for in-flight process tasks up to process_shutdown_timeout. Called automatically by init_app() (and therefore by TaskAdmin).
install(app)¶
Patches fastapi.dependencies.utils.BackgroundTasks so that bare BackgroundTasks annotations in route signatures receive a ManagedBackgroundTasks instance. The patch is process-wide.
task_manager.install(app)
@app.post("/signup")
def signup(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_email, email)
# background_tasks is now ManagedBackgroundTasks
After this call, all three injection patterns return a ManagedBackgroundTasks instance:
background_tasks: BackgroundTasks(zero migration)background_tasks: ManagedBackgroundTasks(explicit type)tasks = Depends(task_manager.get_tasks)(explicit dependency)
install() is called automatically when TaskAdmin(auto_install=True) is set or when init_app() is called. For route-scoped injection, use Depends(task_manager.get_tasks) instead.
Public attributes¶
| Attribute | Type | Description |
|---|---|---|
registry |
TaskRegistry |
The registered functions and their TaskConfig objects. |
store |
TaskStore |
In-memory task state for the current process. |
logger |
LoggerChain \| None |
Active observer chain. Present when loggers was set or log_file was configured. None otherwise. |
fernet |
Fernet \| None |
Active Fernet encryptor. Present when encrypt_args_key was set. None otherwise. |