TaskAdmin¶
TaskAdmin mounts task observability routes onto a FastAPI app and manages the snapshot scheduler lifecycle.
Guide: Task Admin covers authentication patterns, retention configuration, and dashboard setup. This page documents every parameter, the routes mounted, and the SSE event format.
Constructor¶
TaskAdmin(
app: FastAPI,
task_manager: TaskManager,
path: str = "/tasks",
display_func_args: bool = False,
auto_install: bool = False,
auth: tuple | list | TaskAuthBackend | None = None,
token_expiry: int = 86400,
secret_key: str | None = None,
poll_interval: float = 30.0,
title: str = "fastapi-taskflow",
retention_days: float | None = None,
)
All setup happens in the constructor. No reference to the TaskAdmin instance needs to be kept after construction.
| Parameter | Type | Default | Description |
|---|---|---|---|
app |
FastAPI |
required | The FastAPI application to mount routes onto. |
task_manager |
TaskManager |
required | The task manager instance whose tasks will be exposed. |
path |
str |
"/tasks" |
URL prefix for all mounted routes. |
display_func_args |
bool |
False |
Include task arguments in the dashboard task list. Disable this if arguments may contain sensitive data. |
auto_install |
bool |
False |
Call task_manager.install(app) automatically, enabling bare BackgroundTasks injection across all routes. |
auth |
tuple \| list \| TaskAuthBackend \| None |
None |
Credentials for dashboard and API authentication. None disables auth entirely. Accepts a (username, password) tuple, a list of such tuples, or a custom TaskAuthBackend instance. |
token_expiry |
int |
86400 |
Seconds before an issued session token expires. |
secret_key |
str \| None |
None |
HMAC signing key for session tokens. Auto-generated if None and auth is configured. Set an explicit value for sessions that survive restarts. |
poll_interval |
float |
30.0 |
Seconds between dashboard poll requests when SSE is unavailable. |
title |
str |
"fastapi-taskflow" |
Display name shown in the dashboard header and on the login page. |
retention_days |
float \| None |
None |
Override the retention policy on task_manager. Terminal records older than this many days are pruned approximately every 6 hours. None leaves the manager's existing setting unchanged. |
Authentication¶
Pass an auth argument to require login before accessing the dashboard or any task API endpoint.
Single user:
Multiple users:
Custom backend:
from fastapi_taskflow import TaskAuthBackend
class DBAuthBackend(TaskAuthBackend):
def authenticate_user(self, username: str, password: str) -> bool:
return verify_against_database(username, password)
TaskAdmin(app, task_manager, auth=DBAuthBackend())
Custom token lifetime and signing key:
TaskAdmin(
app,
task_manager,
auth=("admin", "secret"),
secret_key="your-secret-key",
token_expiry=3600,
)
Note
When secret_key is not provided and auth is configured, a random key is generated at startup. Sessions are therefore invalidated on every restart. Set an explicit secret_key if you need sessions to persist across restarts.
Lifecycle¶
TaskAdmin always registers FastAPI startup and shutdown hooks that delegate to TaskManager.startup() and TaskManager.shutdown(). You do not need a separate lifespan handler.
Startup (in order):
- Loads persisted task history from the backend (if configured).
- Requeues pending tasks if
requeue_pending=True. - Starts the periodic snapshot flush loop.
- Initialises the observer chain.
Shutdown (in order):
- Stops the flush loop.
- Flushes completed tasks to the backend.
- Saves unfinished tasks if
requeue_pending=True. - Closes the observer chain.
- Shuts down the sync thread pool (if
max_sync_threadswas set).
Steps that do not apply (for example, no backend configured or no loggers) are skipped automatically.
Without TaskAdmin¶
If you are not using TaskAdmin, use init_app() to register lifecycle hooks without mounting any routes:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi_taskflow import TaskManager
task_manager = TaskManager(snapshot_db="tasks.db")
@asynccontextmanager
async def lifespan(app):
await task_manager.startup()
yield
await task_manager.shutdown()
app = FastAPI(lifespan=lifespan)
Routes mounted¶
All paths are relative to path (default /tasks):
| Method | Path | Description |
|---|---|---|
GET |
/tasks |
List all tasks. |
GET |
/tasks/metrics |
Aggregated statistics across all tasks. |
GET |
/tasks/audit |
Audit log of retry and cancel actions. |
GET |
/tasks/{task_id} |
Full detail for a single task record. |
POST |
/tasks/{task_id}/retry |
Retry a failed or interrupted task. |
POST |
/tasks/{task_id}/cancel |
Cancel a pending or running task. |
POST |
/tasks/bulk-retry |
Retry a specific list of task IDs. |
POST |
/tasks/retry-failed |
Retry all failed tasks within a time window. |
DELETE |
/tasks/history |
Delete completed task history older than a given time window. |
GET |
/tasks/dashboard |
Live HTML dashboard. |
GET |
/tasks/dashboard/stream |
SSE stream (unauthenticated). |
GET |
/tasks/auth/login |
Login page (auth only). |
POST |
/tasks/auth/login |
Submit credentials (auth only). |
GET |
/tasks/auth/logout |
Clear session and redirect to login (auth only). |
Cancelling tasks¶
POST /tasks/{task_id}/cancel accepts tasks in pending or running states.
Pending tasks are cancelled immediately: the status is set to cancelled in the store and flushed to the backend if one is configured.
Running async tasks receive a cancellation signal via asyncio.Task.cancel(). The executor catches the resulting CancelledError, sets the status to cancelled, and flushes the record. The status transition happens asynchronously, so the API response may still show running immediately after the call returns.
Running sync tasks (functions dispatched to a thread pool) have their asyncio awaitable cancelled, which stops the event loop from waiting for the result. The underlying OS thread cannot be interrupted and runs to completion. The task is marked cancelled once the executor handles the CancelledError raised at the await point.
Returns 400 if the task is already in any other terminal or non-running state.
SSE event format¶
The dashboard stream at GET /tasks/dashboard/stream sends Server-Sent Events. Each event is a JSON object serialised from TaskRecord.to_dict().
Example event payload:
{
"task_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"func_name": "send_email",
"status": "success",
"created_at": "2026-04-30T10:00:00",
"start_time": "2026-04-30T10:00:01",
"end_time": "2026-04-30T10:00:02",
"duration": 1.02,
"retries_used": 0,
"error": null,
"logs": ["2026-04-30T10:00:01 Sending to user@example.com"],
"stacktrace": null,
"tags": {},
"source": "manual",
"priority": null
}
Events are emitted whenever a task changes state. The stream endpoint does not require authentication even when auth is configured, so the dashboard page itself can subscribe from the browser after the page load is authenticated.