Observability¶
fastapi-taskflow includes a pluggable observer system. Observers receive structured events for every task_log() call and every task lifecycle transition (RUNNING, SUCCESS, FAILED, INTERRUPTED). Multiple observers can run simultaneously with full error isolation: a failing observer never affects the task or other observers.
Built-in observers¶
FileLogger¶
Writes log and lifecycle events to a rotating plain text file.
from fastapi_taskflow import FileLogger, TaskManager
task_manager = TaskManager(
snapshot_db="tasks.db",
loggers=[FileLogger("tasks.log", log_lifecycle=True)],
)
Each log entry has the format:
Lifecycle entries (when log_lifecycle=True):
[abc12345] [send_email] 2026-01-01T12:00:00 -- RUNNING
[abc12345] [send_email] 2026-01-01T12:00:01 -- SUCCESS
FileLogger parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
path |
str |
required | File path to write to. Created if it does not exist. |
max_bytes |
int |
10485760 |
Maximum file size (10 MB) before rotating. Ignored in watched mode. |
backup_count |
int |
5 |
Number of rotated backup files to keep. Ignored in watched mode. |
mode |
str |
"rotate" |
"rotate" for automatic rotation; "watched" for external rotation via logrotate. |
log_lifecycle |
bool |
False |
Also write lifecycle transitions. |
min_level |
str |
"info" |
Minimum log level to write. Entries below this level are silently dropped. |
StdoutLogger¶
Prints log and lifecycle events to stdout. Useful in containers where stdout is captured by the logging agent (Fluentd, Logstash, CloudWatch Logs).
from fastapi_taskflow import StdoutLogger, TaskManager
task_manager = TaskManager(
loggers=[StdoutLogger(log_lifecycle=True)],
)
Output format matches FileLogger:
StdoutLogger parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
log_lifecycle |
bool |
False |
Also print lifecycle transitions. |
min_level |
str |
"info" |
Minimum log level to print. |
InMemoryLogger¶
Captures events in memory. Designed for tests: assert exactly what log entries and lifecycle events were emitted.
from fastapi_taskflow import InMemoryLogger, TaskManager
mem_logger = InMemoryLogger()
task_manager = TaskManager(loggers=[mem_logger])
Access captured events after the task runs:
# LogEvent and LifecycleEvent objects
for event in mem_logger.log_events:
print(event.message, event.level, event.extra)
for event in mem_logger.lifecycle_events:
print(event.func_name, event.status.value, event.duration)
# Clear for the next test
mem_logger.clear()
InMemoryLogger parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
min_level |
str |
"debug" |
Minimum log level to capture. Default captures all levels. |
Multiple observers¶
Pass a list to loggers= to fan out to multiple observers simultaneously. All run independently: an error in one never affects the others.
task_manager = TaskManager(
snapshot_db="tasks.db",
loggers=[
FileLogger("tasks.log", log_lifecycle=True),
StdoutLogger(log_lifecycle=True, min_level="warning"),
],
)
Tags¶
Attach key/value labels to a task at enqueue time with tags=. Tags flow through to every LogEvent and LifecycleEvent emitted for that task, so downstream systems can filter logs by label without parsing message strings.
@app.post("/invoice")
def create_invoice(user_id: int, tasks=Depends(task_manager.get_tasks)):
task_id = tasks.add_task(
process_invoice,
user_id,
tags={"user_id": str(user_id), "source": "invoice_api"},
)
return {"task_id": task_id}
Tags are available inside the task via get_task_context():
from fastapi_taskflow import get_task_context
@task_manager.task(retries=2)
def process_invoice(user_id: int) -> None:
ctx = get_task_context()
source = ctx.tags.get("source", "unknown") if ctx else "unknown"
task_log("Processing invoice", user_id=user_id, source=source)
The log_file shorthand¶
If you only need a single FileLogger, use the log_file shorthand on TaskManager instead of constructing FileLogger manually:
# shorthand
task_manager = TaskManager(
snapshot_db="tasks.db",
log_file="tasks.log",
log_file_max_bytes=5 * 1024 * 1024,
log_file_backup_count=3,
log_file_mode="rotate",
log_lifecycle=True,
)
# equivalent
task_manager = TaskManager(
snapshot_db="tasks.db",
loggers=[
FileLogger(
"tasks.log",
max_bytes=5 * 1024 * 1024,
backup_count=3,
mode="rotate",
log_lifecycle=True,
)
],
)
The shorthand is kept for backwards compatibility. Use loggers= directly when you need multiple observers or more control.
Custom observers¶
Implement the TaskObserver ABC to send events anywhere:
from fastapi_taskflow import TaskObserver
from fastapi_taskflow.loggers.base import LifecycleEvent, LogEvent
class MyObserver(TaskObserver):
async def on_log(self, event: LogEvent) -> None:
# event.task_id, event.func_name, event.message,
# event.level, event.timestamp, event.attempt,
# event.tags, event.extra
await my_log_service.send(event.message, tags=event.tags)
async def on_lifecycle(self, event: LifecycleEvent) -> None:
# event.status, event.duration, event.error, event.stacktrace
if event.status.value == "failed":
await alert_service.notify(event.func_name, event.error)
async def startup(self) -> None:
await my_log_service.connect()
async def close(self) -> None:
await my_log_service.disconnect()
task_manager = TaskManager(loggers=[MyObserver()])
startup() is called when TaskAdmin mounts the app. close() is called on shutdown. Both default to no-ops in the base class.
Event reference¶
LogEvent¶
Emitted for every task_log() call.
| Field | Type | Description |
|---|---|---|
task_id |
str |
UUID of the task. |
func_name |
str |
Task function name. |
message |
str |
The log message. |
level |
str |
Log level string ("debug", "info", "warning", "error"). |
timestamp |
datetime |
UTC time the entry was created. |
attempt |
int |
Zero-based retry index. |
tags |
dict[str, str] |
Tags attached at enqueue time. |
extra |
dict |
Arbitrary extra fields from task_log(**extra). |
LifecycleEvent¶
Emitted on each status transition.
| Field | Type | Description |
|---|---|---|
task_id |
str |
UUID of the task. |
func_name |
str |
Task function name. |
status |
TaskStatus |
New status (RUNNING, SUCCESS, FAILED, INTERRUPTED). |
timestamp |
datetime |
UTC time of the transition. |
attempt |
int |
Zero-based retry index. |
retries_used |
int |
Total retries consumed. |
duration |
float \| None |
Seconds from start to end. Present on SUCCESS and FAILED. |
error |
str \| None |
Error message string. Present on FAILED. |
stacktrace |
str \| None |
Formatted traceback. Present on FAILED. |
tags |
dict[str, str] |
Tags attached at enqueue time. |
See also¶
- Task Logging
- Task Context
- File Logging - rotation options and multi-process deployment