Loggers and Observers¶
This page documents the built-in observer implementations, the TaskObserver ABC, and the event types delivered to observers.
fastapi-taskflow ships three built-in observer implementations. All implement the TaskObserver ABC and can be combined in any order via the loggers= parameter on TaskManager.
Guide: Observability covers observer configuration, combining loggers, and writing custom observers.
FileLogger¶
FileLogger writes log and lifecycle events to a plain text file with automatic rotation.
Constructor¶
FileLogger(
path: str,
max_bytes: int = 10485760,
backup_count: int = 5,
mode: Literal["rotate", "watched"] = "rotate",
log_lifecycle: bool = False,
min_level: str = "info",
)
| Parameter | Type | Default | Description |
|---|---|---|---|
path |
str |
required | File path to write to. The file is created if it does not exist. |
max_bytes |
int |
10485760 |
Maximum file size (10 MB) before rotating to a new file. Has no effect in "watched" mode. |
backup_count |
int |
5 |
Number of rotated backup files to keep alongside the active log file. Has no effect in "watched" mode. |
mode |
str |
"rotate" |
"rotate" uses Python's RotatingFileHandler, suitable for single-process deployments. "watched" uses WatchedFileHandler, suitable for multi-process deployments where an external tool such as logrotate manages rotation. |
log_lifecycle |
bool |
False |
When True, also write lifecycle transitions (RUNNING, SUCCESS, FAILED, INTERRUPTED) to the file in addition to task_log() entries. |
min_level |
str |
"info" |
Minimum log level to write. Entries below this level are silently dropped. Accepts "debug", "info", "warning", or "error". |
Output format¶
Log entry (from task_log()):
Lifecycle entry (when log_lifecycle=True):
Thread safety¶
A single FileLogger instance is safe for concurrent use across multiple threads (sync tasks run in a thread pool) and the asyncio event loop within one process. For multi-process or multi-host deployments, see the File Logging guide.
Example:
from fastapi_taskflow import TaskManager, FileLogger
task_manager = TaskManager(
loggers=[FileLogger("tasks.log", log_lifecycle=True, min_level="debug")],
)
StdoutLogger¶
StdoutLogger prints log and lifecycle events to stdout. Useful in containers where stdout is captured by the logging agent.
Constructor¶
| Parameter | Type | Default | Description |
|---|---|---|---|
log_lifecycle |
bool |
False |
When True, also print lifecycle transitions in addition to task_log() entries. |
min_level |
str |
"info" |
Minimum log level to print. Entries below this level are silently dropped. |
Output format¶
Output matches FileLogger:
Example:
from fastapi_taskflow import TaskManager, StdoutLogger
task_manager = TaskManager(
loggers=[StdoutLogger(log_lifecycle=True)],
)
InMemoryLogger¶
InMemoryLogger captures events in memory. It is designed for use in tests where you want to assert on log output and lifecycle transitions without writing to disk or stdout.
Constructor¶
| Parameter | Type | Default | Description |
|---|---|---|---|
min_level |
str |
"debug" |
Minimum log level to capture. The default captures all levels. |
Attributes¶
| Attribute | Type | Description |
|---|---|---|
log_events |
list[LogEvent] |
All captured log events in order, each emitted by a task_log() call. |
lifecycle_events |
list[LifecycleEvent] |
All captured lifecycle transition events in order. |
Methods¶
clear() -> None¶
Empties both log_events and lifecycle_events. Call this between test cases to reset state.
Example:
from fastapi_taskflow import InMemoryLogger, TaskManager, task_log
mem = InMemoryLogger()
task_manager = TaskManager(loggers=[mem])
@task_manager.task()
def my_task() -> None:
task_log("hello")
# ... run the task in a test ...
assert mem.log_events[0].message == "hello"
assert mem.lifecycle_events[-1].status.value == "success"
mem.clear()
assert len(mem.log_events) == 0
TaskObserver ABC¶
TaskObserver is the base class for all custom observers. Subclass it and implement on_log() and/or on_lifecycle() to send task events to any destination, such as a file, a metrics system, or a tracing backend.
from fastapi_taskflow import TaskObserver
from fastapi_taskflow.loggers.base import LogEvent, LifecycleEvent
class TaskObserver(ABC):
def __init__(self, min_level: str = "info") -> None: ...
async def on_log(self, event: LogEvent) -> None: ...
async def on_lifecycle(self, event: LifecycleEvent) -> None: ...
async def startup(self) -> None: ...
async def close(self) -> None: ...
Methods to implement¶
on_log(event: LogEvent) -> None¶
Called for every task_log() entry emitted by a running task. The base class implementation is a no-op; override it to process log entries.
on_lifecycle(event: LifecycleEvent) -> None¶
Called on every task status transition: RUNNING, SUCCESS, FAILED, and INTERRUPTED. The base class implementation is a no-op; override it to process lifecycle events.
Both methods are async, so implementations can await network calls, database writes, or any other async I/O. Sync-only destinations can use asyncio.to_thread inside the method body.
Errors raised inside either method are caught by LoggerChain and logged to stderr. They never propagate to the task or affect its outcome.
Optional methods¶
startup() -> None¶
Called at app startup before any tasks run. Use this to open connections or initialise exporters. Defaults to a no-op; override only if needed.
close() -> None¶
Called at app shutdown. Use this to flush buffered events and release held resources. Defaults to a no-op; override only if needed.
Example custom observer:
from fastapi_taskflow import TaskObserver
from fastapi_taskflow.loggers.base import LifecycleEvent
class PrometheusObserver(TaskObserver):
async def on_lifecycle(self, event: LifecycleEvent) -> None:
TASK_COUNTER.labels(
func=event.func_name,
status=event.status.value,
).inc()
LogEvent¶
LogEvent carries a single structured log entry from task_log() to each observer.
@dataclass
class LogEvent:
task_id: str
func_name: str
message: str
level: str
timestamp: datetime
attempt: int
tags: dict[str, str]
extra: dict[str, Any]
| Field | Type | Description |
|---|---|---|
task_id |
str |
UUID of the task that emitted this entry. |
func_name |
str |
Name of the task function. |
message |
str |
The log message, prefixed with a UTC timestamp in YYYY-MM-DDTHH:MM:SS format. |
level |
str |
Severity: "debug", "info", "warning", or "error". |
timestamp |
datetime |
UTC datetime when the entry was created. |
attempt |
int |
Zero-based retry index. 0 means the first run. |
tags |
dict[str, str] |
Key/value labels attached to the task at enqueue time. |
extra |
dict[str, Any] |
Arbitrary structured data passed as keyword arguments to task_log(). |
LifecycleEvent¶
LifecycleEvent carries a task status transition to each observer. Emitted on every state change: RUNNING, SUCCESS, FAILED, and INTERRUPTED.
@dataclass
class LifecycleEvent:
task_id: str
func_name: str
status: TaskStatus
timestamp: datetime
attempt: int
retries_used: int
duration: float | None
error: str | None
stacktrace: str | None
tags: dict[str, str]
| Field | Type | Description |
|---|---|---|
task_id |
str |
UUID of the task. |
func_name |
str |
Name of the task function. |
status |
TaskStatus |
The new status value after this transition. |
timestamp |
datetime |
UTC datetime when the transition occurred. |
attempt |
int |
Zero-based retry index at the time of the transition. |
retries_used |
int |
Total retry attempts consumed so far. |
duration |
float \| None |
Elapsed seconds from start_time to this transition. None on the RUNNING transition. |
error |
str \| None |
String form of the last exception. Set only on FAILED; None otherwise. |
stacktrace |
str \| None |
Full traceback of the last exception. Set only on FAILED; None otherwise. |
tags |
dict[str, str] |
Key/value labels attached to the task at enqueue time. |
See also¶
- task_log for the function that produces
LogEventinstances - Observability guide for combining observers and filtering by level
- File Logging guide for multi-process rotation patterns