Changelog¶
v0.8.1¶
Bug fixes¶
- Fixed a process hang at shutdown when tasks were still running.
- Fixed running tasks showing as
CANCELLEDinstead ofINTERRUPTEDafter a server shutdown. - Fixed requeued tasks not showing their executor in the dashboard after a restart.
v0.8.0¶
Adds a process executor for CPU-bound tasks, PostgreSQL and MySQL snapshot backends, readable duration formatting in the dashboard, and bug fixes.
Process executor¶
- Added
executor='process'on@task_manager.task()and@task_manager.schedule(). Tasks marked with it run in a separate OS process viaProcessPoolExecutor, bypassing the GIL for true CPU parallelism. max_process_workersandprocess_shutdown_timeoutonTaskManagerconfigure pool size and graceful shutdown wait.- cloudpickle is used automatically when installed (
pip install "fastapi-taskflow[process]"), enabling lambdas and closures as task arguments. Falls back to standard pickle without it. TaskArgumentErroris raised atadd_task()time when arguments cannot be serialized, rather than failing silently inside the worker.TaskRecordnow carries anexecutorfield recording which executor ran the task (async,thread, orprocess). Shown in the dashboard detail panel with a color-coded badge.
PostgreSQL and MySQL backends¶
- Added
PostgresBackendfor PostgreSQL. Install withpip install "fastapi-taskflow[postgres]". - Added
MySQLBackendfor MySQL and MariaDB. Install withpip install "fastapi-taskflow[mysql]". - Both backends support the full feature set: task history, pending requeue, idempotency keys, scheduled task locking, and multi-instance deployments.
- Tables are created automatically on first startup. No migrations to run.
- Both are included in
pip install "fastapi-taskflow[all]".
Dashboard improvements¶
- Duration values now display in a readable format. Values under 1 second show as
ms, above that as seconds, thenXm Xsfor minutes, andXh Xmfor hours. Applies to the task table, detail panel, and all stat cards (Avg, Min, Max, P95). - Task logs in the detail panel no longer scroll inside a fixed-height box. They expand with the panel.
- Error and stacktrace sections in the detail panel also expand fully instead of clipping.
Bug fixes¶
- Fixed select-all checkbox on the main tasks table incorrectly picking up Dead Letters tab checkboxes, causing
POST /tasks/undefined/retry404 errors when retrying selected tasks.
v0.7.0¶
Adds priority queues, eager dispatch, and Dead Letter Queue bulk replay.
Priority queues¶
- Added
priorityparameter to@task_manager.task()andadd_task(). Tasks with a priority value bypass Starlette's background task list and enter a dedicatedasyncio.PriorityQueuedrained by a worker coroutine. - Higher values run first. Equal-priority tasks execute in arrival (FIFO) order. The conventional range is 1 (lowest) to 10 (highest); any integer is accepted.
- Per-call
priorityonadd_task()overrides the decorator-level default. TaskRecord.priorityrecords the priority at enqueue time. Shown in the dashboard task table and detail panel with a color-coded badge.
Eager dispatch¶
- Added
eagerparameter to@task_manager.task()andadd_task(). WhenTrue, the task is dispatched viaasyncio.create_taskthe momentadd_task()is called, before FastAPI sends the response. - Per-call
eageronadd_task()overrides the decorator-level default.
Dead Letter Queue bulk replay¶
- Added
POST /tasks/bulk-retry. Accepts a JSON body{"task_ids": [...]}and re-enqueues each listed task that isfailedorinterrupted. Tasks not found, not retryable, or whose function is no longer registered are counted inskippedwithout raising an error. - Added
POST /tasks/retry-failed. Accepts asincequery param (<N>h,<N>d, orall) and an optionalfunc_namefilter. Re-enqueues all matched failed tasks created within that window. - Both endpoints return
{dispatched, skipped, results}and record abulk_retryentry in the audit log. - Dead Letters tab toolbar: a time-window selector (1h, 6h, 24h, 7d, all) and a Replay window button. Clicking opens a confirmation modal before anything is dispatched.
- Per-row checkboxes on Dead Letters rows. Checking one or more rows shows a bulk bar with a Replay selected button. Selection clears automatically after dispatch.
v0.6.1¶
Fixed a bug where the distributed schedule lock was issued with a zero-second TTL for sub-second intervals, causing both instances to fire the same entry on every tick. The lock TTL now has a minimum of 1 second.
v0.6.0¶
Adds a periodic task scheduler with interval and cron triggers, distributed locking for multi-instance deployments, dashboard differentiation between scheduled and manual runs, and TaskManager.init_app() for lifecycle registration without TaskAdmin.
Scheduled tasks¶
- Added
@task_manager.schedule(every=, cron=)decorator. Registers a function as both a periodic task and a normal task registry entry so it can also be enqueued manually viaadd_task(). every=accepts a float (seconds). No extra dependencies required.cron=accepts a standard five-field cron expression. Requirespip install "fastapi-taskflow[scheduler]"(croniter>=1.4).retries,delay,backoff, andnamework the same as on@task_manager.task().run_on_startup=Truefires the task on the first scheduler tick instead of waiting for the first interval or cron slot.PeriodicSchedulerruns a singleasyncio.Taskloop ticking every second. Started and stopped automatically byTaskManager.startup()andTaskManager.shutdown().
Distributed lock¶
- Added
acquire_schedule_lock(key, ttl)toSnapshotBackend. Default implementation always returnsTrue.SqliteBackendandRedisBackendimplement proper distributed locking so only one instance fires each scheduled entry per interval.
TaskRecord source¶
- Added
sourcefield toTaskRecord. Value is"manual"for tasks enqueued viaadd_task()(default, no behaviour change) and"scheduled"for tasks fired by the scheduler. sourceis included inTaskRecord.to_dict()and the REST API response.
Dashboard¶
- Scheduled task rows show a
scheduledbadge next to the function name. - New Schedules tab lists all registered entries, their trigger (interval or cron expression), and the next scheduled run time.
- Dashboard tabs restructured: "Tasks" renamed to "View" (task run table); new "Tasks" tab lists all registered functions with their config as inline badges.
- Added
DELETE /tasks/history?value=N&unit=<min|hour|day>to delete completed tasks older than a given time window from both the in-memory store and the backend. - Added
TaskStore.delete_completed_before(cutoff)to remove terminal records from the in-memory store whereend_time < cutoff. - Added
cancelledstatus with a dedicated badge and metrics card;POST /tasks/{task_id}/cancelnow works for both pending and running tasks — async tasks are cancelled immediately, sync tasks stop being awaited while the underlying thread completes. - Added Dead Letters tab (failed tasks only), auth-gated Audit tab, drag-resizable detail panel, P95 duration in function analytics, and cancel button in the detail panel for pending and running tasks.
Retention¶
- Added
retention_daysparameter toTaskManagerandTaskAdmin. When set, terminal records (success,failed,cancelled) whoseend_timeis older than the configured number of days are pruned automatically every ~6 hours during the snapshot loop.TaskAdmin(retention_days=...)overrides the manager's setting at mount time. Pending and running tasks are never deleted.
Lifecycle¶
- Added
TaskManager.init_app(app). Registers startup and shutdown hooks on a FastAPI app without requiringTaskAdmin.TaskAdmincalls this internally. Calling it more than once on the same app is safe.
v0.5.1¶
Dashboard login screen updated to match the teal theme. TaskAdmin gains a title parameter to customise the display name shown in the dashboard header and login page.
v0.5.0¶
Adds pluggable observability, task context, tags, argument encryption, trace propagation, and opt-in concurrency controls for async and sync tasks.
Concurrency controls¶
- Added
max_concurrent_tasksonTaskManager. When set, anasyncio.Semaphorecaps how many async tasks hold event loop time simultaneously. Tasks waiting for a slot are parked without blocking the event loop or delaying request handlers. Defaults toNone(no limit, existing behaviour unchanged). - Added
max_sync_threadsonTaskManager. When set, sync task functions run in a dedicatedThreadPoolExecutorinstead of the default asyncio thread pool, preventing sync task bursts from exhausting threads needed by sync request handlers. Defaults toNone(existing behaviour unchanged). - Both parameters are opt-in. Neither changes any existing behaviour when not set.
Pluggable observability¶
- Added
TaskObserverABC. Implement it to send task log and lifecycle events to any destination. - Added
FileLogger,StdoutLogger, andInMemoryLoggerbuilt-in implementations. TaskManageraccepts aloggers: list[TaskObserver]parameter. All observers run independently viaLoggerChain; an error in one never affects the others.task_log()gainslevel=("debug","info","warning","error") and**extrakeyword fields. Extra fields are forwarded toLogEvent.extrafor structured log consumers. The oldtask_log(message)signature is unchanged.FileLoggerandStdoutLoggersupportmin_level=filtering so low-severity entries can be suppressed without changing the task code.InMemoryLoggercaptureslog_eventsandlifecycle_eventslists for test assertions.- The
log_fileshorthand onTaskManagerremains fully supported.
Task context¶
- Added
get_task_context(). Returns aTaskContextdataclass (task_id,func_name,attempt,tags) from inside any code path invoked during task execution, including helper functions. TaskContextis available in both sync and async tasks.
Tags¶
add_task()accepts an optionaltags: dict[str, str]parameter. Tags are attached to theTaskRecordand forwarded to everyLogEventandLifecycleEvent.
Argument encryption¶
- Added
encrypt_args_keyparameter toTaskManager. When set, taskargsandkwargsare encrypted with Fernet (AES-128-CBC + HMAC) atadd_task()time and decrypted only inside the executor just before the function is called. They are never stored in plain text. - Requires
pip install "fastapi-taskflow[encryption]". SqliteBackendandRedisBackendstore the encrypted payload in a newencrypted_payloadcolumn/field with automatic migration.
task_log outside managed context¶
task_log()no longer silently drops calls made outside a managed task. It now forwards to the stdlib loggerfastapi_taskflow.taskat the matching level. Task functions work correctly in all call contexts (routes, scripts, cron jobs, direct test calls) without code changes.
Trace context propagation¶
add_task()captures the caller'scontextvarscontext snapshot. On Python 3.11+, the task function runs inside this context so OpenTelemetry spans and other trace state flow from the enqueue site into background execution transparently. On Python 3.10,task_log()andget_task_context()still work correctly; full trace propagation requires Python 3.11+.
v0.4.1¶
- Added docstrings with
Args:andReturns:descriptions to all public classes, methods, and functions across the codebase. - Fixed
TaskStore._notify_changeto usecall_soon_threadsafesotask_log()entries from sync tasks trigger live dashboard updates correctly.
v0.4.0¶
File logging¶
- Added
log_fileparameter toTaskManager. When set, everytask_log()call and retry separator is written to a plain text file in addition to being stored on the task record. - Each line has the format
[task_id] [func_name] 2024-01-01T12:00:00 message. log_lifecycle=Truewrites an additional line for each status transition (RUNNING,SUCCESS,FAILED,INTERRUPTED).- Automatic rotation via
RotatingFileHandler(default). External rotation viaWatchedFileHandlerwithlog_file_mode="watched". - For same-host multi-process deployments, use separate files per instance or
log_file_mode="watched"with logrotate. For multi-host Redis deployments, each host writes its own file.
v0.3.0¶
Adds resilience and multi-instance support.
Resilience¶
- Added
INTERRUPTEDstatus for tasks that were mid-execution at shutdown. They are saved to history and visible in the dashboard but not re-executed automatically. - Added
requeue_on_interrupt=Trueon@task_manager.task()for idempotent tasks that are safe to restart from scratch. - Completed tasks are flushed to the backend immediately on
SUCCESS, closing the crash recovery gap between completion and the next periodic flush.
Idempotency¶
add_task()accepts an optionalidempotency_key. If a non-failed task with the same key exists, the originaltask_idis returned and the task is not run again. Works in-process and cross-instance via the shared backend.
Multi-instance¶
- Requeue claiming is now atomic per task (SQLite
DELETErowcount, RedisHDEL). Only one instance dispatches each pending task on startup. SqliteBackendenables WAL mode for safer concurrent multi-process access.- Dashboard merges completed task history from all instances via the shared backend, with a cached backend read (default 5s TTL) to avoid excess I/O.
- Added
poll_intervalonTaskAdmin(default 30s) to control how often the SSE stream refreshes from the backend for cross-instance updates.
Task retry¶
- Added
POST /tasks/{task_id}/retryendpoint. Re-enqueues afailedorinterruptedtask using the original function, args, and kwargs. Returns a new task record with a fresh task ID. The original record is left unchanged in history. - The dashboard detail panel shows a retry button for failed and interrupted tasks. Interrupted tasks include a warning that the function may have already partially executed.
- Returns
400if the task status does not allow retry,404if not found, and409if the function is no longer registered in the process.
Dashboard¶
- Added time-range filter (last 1h, 6h, 24h, 7d). Filter and sort preferences are persisted in
localStorage. - Added pause/resume button. While paused, incoming SSE events are buffered and a badge shows how many new tasks arrived. Resuming applies the latest state immediately. Pause state survives page reloads.
- Added bulk retry. Checkboxes appear on failed and interrupted rows. A toolbar shows the selection count and a single Retry button dispatches all selected tasks in parallel.
interruptedtasks now have a dedicated amber status badge and metrics card.
v0.2.0¶
Task logging¶
- Added
task_log(message)function. Call it inside any task to emit timestamped log entries that are stored on the task record and shown in the dashboard. - Retry attempts are separated automatically with a
--- Retry N ---marker so per-attempt logs are easy to read.
Error visibility¶
- Failed tasks now capture the full Python traceback in a
stacktracefield onTaskRecord. - The dashboard detail panel now has three tabs: Details, Logs, and Error. The Logs and Error tabs are only shown when the task has data for them.
Storage¶
TaskRecordgains two new fields:logs: list[str]andstacktrace: str | None.SqliteBackendstores both fields as new columns with automatic migration for existing databases.RedisBackendstores both fields as hash entries. Old records without these fields load cleanly with empty defaults.
v0.1.0¶
First public release.
Core¶
@task_manager.task()decorator withretries,delay,backoff,persist, andnameoptionsManagedBackgroundTaskssubclass of FastAPI'sBackgroundTaskswith UUID-based task tracking- In-memory
TaskStorewith full task lifecycle:PENDING,RUNNING,SUCCESS,FAILED - Both sync and async task functions supported
Injection¶
task_manager.get_tasksFastAPI dependencytask_manager.background_tasksproperty aliastask_manager.install(app)patches FastAPI's internal injection so bareBackgroundTasksannotations receive a managed instanceTaskAdmin(auto_install=True)convenience flag
Observability¶
GET /tasks— list all in-memory tasksGET /tasks/{task_id}— single task detailGET /tasks/metrics— success rate, total, duration averagesGET /tasks/dashboard— live admin dashboard over SSEdisplay_func_argsoption to store and show task arguments in the dashboard
Persistence¶
- Pluggable
SnapshotBackendABC SqliteBackend— default, zero extra dependencies, includesquery()for history lookupsRedisBackend— available viapip install "fastapi-taskflow[redis]"- Periodic snapshot scheduler managed automatically by
TaskAdmin - Final flush on shutdown
Requeue¶
requeue_pending=TrueonTaskManagersaves unfinished tasks on shutdown- Tasks are matched by function name and re-dispatched on next startup
- Unrecognised functions are skipped with a warning log
- Pending store is cleared after successful requeue
Authentication¶
TaskAdminnow acceptsauth,secret_key, andtoken_expiryto protect the dashboard and task API with cookie-based authentication; supports a single credential tuple, a list of tuples, or a customTaskAuthBackendinstance.