Backends¶
A backend is the storage layer for api-shield. It persists route state and the audit log. All backends implement the ShieldBackend abstract base class, so you can swap them with a one-line change and nothing else in your application needs to change.
| Backend | Storage | Survives restart | Live push (SSE) | Best for |
|---|---|---|---|---|
MemoryBackend |
In-process dict | No | Yes (asyncio.Queue) | Development, testing |
FileBackend |
JSON file on disk | Yes | No (polling fallback) | Single-instance, simple deployments |
RedisBackend |
Redis | Yes | Yes (pub/sub) | Multi-instance, production |
ShieldBackend (ABC)¶
The contract all backends must implement. If you are building a custom backend, subclass this.
shield.core.backends.base.ShieldBackend
¶
Bases: ABC
Contract that all storage backends must implement.
Backends are responsible for persisting route state and audit logs.
The subscribe() method is optional — backends that don't support
pub/sub should raise NotImplementedError and the dashboard will
fall back to polling.
get_state
abstractmethod
async
¶
Return the current state for path.
Raises KeyError if no state has been registered for path.
set_state
abstractmethod
async
¶
Persist state for path, overwriting any existing entry.
delete_state
abstractmethod
async
¶
Remove all state for path.
No-op if path is not registered.
get_audit_log
abstractmethod
async
¶
Return audit entries, newest first.
If path is given, return only entries for that route. limit caps the number of entries returned.
set_global_config
async
¶
Persist config as the global maintenance configuration.
startup
async
¶
Called by ShieldEngine on startup.
Override to open database connections, create tables, or perform any other async initialisation your backend requires. The default implementation is a no-op, so built-in backends (MemoryBackend, FileBackend, RedisBackend) require no changes.
shutdown
async
¶
Called by ShieldEngine on shutdown.
Override to close database connections or release resources. The default implementation is a no-op.
subscribe
async
¶
Stream live RouteState changes as they occur.
Backends that support pub/sub (e.g. Redis) should override this.
Backends that do not support it raise NotImplementedError,
and the dashboard will fall back to polling list_states().
try_claim_webhook_dispatch
async
¶
Attempt to claim exclusive webhook dispatch rights for dedup_key.
Called by ShieldEngine._fire_webhooks before dispatching to any
registered webhook URLs. Returns True if this instance should
fire the webhooks, False if another instance has already claimed
the right for the same event.
The default implementation always returns True — single-instance
backends (MemoryBackend, FileBackend) never have concurrent
instances so deduplication is unnecessary.
RedisBackend overrides this with a SET NX command to ensure
only one instance fires webhooks per unique event across an entire
multi-instance deployment.
Parameters¶
dedup_key:
A deterministic string that uniquely identifies this event
(derived from event + path + serialised RouteState).
ttl_seconds:
How long the claim key lives in the backend. After this window
the key expires, allowing re-delivery if the claiming instance
crashed before it could dispatch. Defaults to 60 seconds.
subscribe_global_config
async
¶
Stream a signal whenever the global maintenance config changes.
Yields None on each remote change so callers can invalidate their
in-process cache and re-fetch from the backend.
Backends that support this (e.g. RedisBackend) override this
method. Others raise NotImplementedError — ShieldEngine.start()
checks for this and simply skips starting the listener, so the engine
falls back to the per-process cache behaviour without any error.
get_registered_paths
async
¶
Return the set of all registered path keys for deduplication.
Used by ShieldEngine.register_batch() to detect already-registered
routes without re-querying the full state list. The default
implementation derives the set from list_states(); backends that
store routes under transformed keys (e.g. ShieldServerBackend which
adds a service prefix) should override this to return local/plain keys.
write_rate_limit_hit
async
¶
Append a rate limit hit record to the backend log.
Default implementation is a no-op — backends that support persistent
hit logs (FileBackend, RedisBackend) override this.
MemoryBackend provides an in-memory list implementation.
get_rate_limit_hits
async
¶
Return recent rate limit hits, newest first.
When path is given, return only hits for that route. Default implementation returns an empty list — override in backends that store hits persistently.
set_rate_limit_policy
async
¶
Persist a rate limit policy for path/method.
policy_data is a JSON-serialisable dict matching the
RateLimitPolicy schema. Overwrites any existing policy for
the same path/method pair.
Default is a no-op. MemoryBackend, FileBackend, and
RedisBackend override this to provide real persistence.
get_rate_limit_policies
async
¶
Return all persisted rate limit policies.
Each item is a JSON-serialisable dict matching the
RateLimitPolicy schema. Returns an empty list by default.
delete_rate_limit_policy
async
¶
Remove the persisted rate limit policy for path/method.
No-op if no policy is stored for that pair. Default implementation is a no-op.
get_global_rate_limit_policy
async
¶
Return the persisted global rate limit policy dict, or None.
set_global_rate_limit_policy
async
¶
Persist policy_data as the global rate limit policy.
delete_global_rate_limit_policy
async
¶
Remove the persisted global rate limit policy.
get_service_rate_limit_policy
async
¶
Return the persisted per-service rate limit policy dict, or None.
set_service_rate_limit_policy
async
¶
Persist policy_data as the rate limit policy for service.
delete_service_rate_limit_policy
async
¶
Remove the persisted rate limit policy for service.
get_all_service_rate_limit_policies
async
¶
Return all persisted per-service rate limit policies as {service: policy_data}.
subscribe_rate_limit_policy
async
¶
Stream rate limit policy changes as they occur.
Each yielded dict has the shape::
{"action": "set", "key": "GET:/api/orders", "policy": {...}}
{"action": "delete", "key": "GET:/api/orders"}
Backends that support pub/sub (e.g. RedisBackend) override this.
Others raise NotImplementedError — ShieldEngine.start() treats
that as "single-instance mode" and skips the listener task.
load_all_flags
async
¶
Return all stored feature flags.
Returns a list of FeatureFlag objects. The default
implementation uses an in-memory store. Override for persistent
backends.
save_flag
async
¶
Persist flag (a FeatureFlag instance) by its key.
Default implementation keeps flags in memory. Override for persistent backends.
delete_flag
async
¶
Remove the flag with flag_key from storage.
No-op if the flag does not exist.
load_all_segments
async
¶
Return all stored segments.
Returns a list of Segment objects. The default
implementation uses an in-memory store. Override for persistent
backends.
save_segment
async
¶
Persist segment (a Segment instance) by its key.
Default implementation keeps segments in memory. Override for persistent backends.
delete_segment
async
¶
Remove the segment with segment_key from storage.
No-op if the segment does not exist.
MemoryBackend¶
Stores all state in a Python dict in the current process. No installation required and no configuration needed — the default choice for getting started.
shield.core.backends.memory.MemoryBackend
¶
Bases: ShieldBackend
Backend that stores all state in-process.
Default backend. Ideal for single-instance apps and testing. State is lost when the process restarts.
Audit log is stored in a deque (O(1) append/evict) with a parallel
per-path index (dict[path, list[AuditEntry]]) so that filtered
queries — get_audit_log(path=...) — are O(k) where k is the number
of entries for that specific path, not O(total entries).
Rate limit hits are aggregated: consecutive blocks from the same
(path, method, key) are grouped into a single RateLimitHit
entry whose count increments and last_hit_at advances on every
subsequent block. A new group is started once max_rl_hits_per_group
is reached. This prevents a flood of 429s from a single client from
saturating the hit log.
get_state
async
¶
Return the current state for path.
Raises KeyError if no state has been registered for path.
write_audit
async
¶
Append entry to the audit log, capping at 1000 entries.
When the cap is reached the oldest entry is evicted from both the ordered deque and the per-path index in O(1) / O(k) time respectively, where k is the number of entries for the evicted path (≪ total entries).
get_audit_log
async
¶
Return audit entries, newest first, optionally filtered by path.
When path is provided the per-path index is used — O(k) where k is the number of entries for that route — instead of scanning all 1000 entries (O(N)).
write_rate_limit_hit
async
¶
Append a rate limit hit record, evicting the oldest when the cap is reached.
get_rate_limit_hits
async
¶
Return rate limit hits, newest first, optionally filtered by path.
set_rate_limit_policy
async
¶
Persist policy_data for path/method and notify subscribers.
delete_rate_limit_policy
async
¶
Remove the persisted rate limit policy for path/method and notify subscribers.
subscribe_rate_limit_policy
async
¶
Yield rate limit policy change events as they occur.
Usage¶
from shield.core.engine import ShieldEngine
# MemoryBackend is the default — no need to pass it explicitly
engine = ShieldEngine()
# Or explicitly
from shield.core.backends.memory import MemoryBackend
engine = ShieldEngine(backend=MemoryBackend())
Characteristics¶
- State is stored in a Python
dictand lost on process restart. subscribe()is implemented viaasyncio.Queue, enabling live SSE updates in the admin dashboard.- The audit log is capped at 1000 entries (oldest entries are discarded).
Not for production
MemoryBackend state is reset every time the process restarts. If you restart your server, all runtime state changes (routes disabled via CLI, maintenance mode set via dashboard) are lost. Use FileBackend or RedisBackend in production.
FileBackend¶
Reads and writes a JSON file using aiofiles. State survives process restarts and can be shared between processes on the same machine by pointing them at the same file.
shield.core.backends.file.FileBackend
¶
Bases: ShieldBackend
Backend that persists state to a file via aiofiles.
Survives process restarts. Suitable for simple single-instance deployments.
All reads are served from an in-memory cache populated on first access. Writes update the cache immediately then schedule a debounced disk flush — meaning rapid bursts of state changes (e.g. startup route registration) result in a single file write rather than N writes.
The file format is auto-detected from the extension:
| Extension | Format | Extra dependency |
|---|---|---|
.json |
JSON | (none — stdlib) |
.yaml / .yml |
YAML | pip install pyyaml |
.toml |
TOML | pip install tomli-w |
Data structure (shown as JSON — equivalent across all formats)::
{
"states": { "GET:/payments": { ...RouteState... }, ... },
"audit": [ { ...AuditEntry... }, ... ]
}
subscribe() raises NotImplementedError — use polling.
Parameters¶
path: Path to the state file. Created automatically if absent. The extension determines the serialisation format.
Raises¶
ValueError
If the file extension is not one of .json, .yaml,
.yml, or .toml.
get_state
async
¶
Return the current state for path from the in-memory cache.
Raises KeyError if no state has been registered for path.
Zero file I/O after the cache is warm.
set_state
async
¶
Update the in-memory cache and flush to disk immediately.
State changes are written synchronously so that a second
FileBackend instance (e.g. the CLI) reading the same file sees
the update right away. Unlike write_audit, state mutations
are not debounced — durability is more important than batching here.
delete_state
async
¶
Remove state for path from cache and flush to disk immediately.
No-op if path is not registered.
list_states
async
¶
Return all registered route states from the in-memory cache.
Zero file I/O after the cache is warm.
write_audit
async
¶
Append entry to the in-memory audit log (capped at 1000 entries) and schedule a debounced disk flush.
get_audit_log
async
¶
Return audit entries, newest first, optionally filtered by path.
Served entirely from the in-memory cache — zero file I/O.
write_rate_limit_hit
async
¶
Append a rate limit hit record, evicting the oldest when the cap is reached.
get_rate_limit_hits
async
¶
Return rate limit hits, newest first, optionally filtered by path.
set_rate_limit_policy
async
¶
Persist policy_data for path/method and flush to disk.
delete_rate_limit_policy
async
¶
Remove the persisted rate limit policy for path/method.
shutdown
async
¶
Flush any pending write and release resources.
Cancels the debounce timer and performs a synchronous flush so that in-flight state changes are not lost on graceful shutdown.
Usage¶
from shield.core.backends.file import FileBackend
from shield.core.engine import ShieldEngine
engine = ShieldEngine(backend=FileBackend(path="shield-state.json"))
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
path |
str |
required | File path for the JSON state file. Relative to the working directory of the process. |
File format¶
{
"states": {
"GET:/payments": {
"path": "GET:/payments",
"status": "maintenance",
"reason": "DB migration",
"window": null
}
},
"audit": [
{
"id": "...",
"timestamp": "2025-06-01T02:00:00Z",
"path": "GET:/payments",
"action": "maintenance",
"actor": "alice",
"platform": "cli"
}
]
}
Characteristics¶
- File writes go through an
asyncio.Lockto prevent concurrent write corruption. subscribe()raisesNotImplementedError; the admin dashboard falls back to polling every few seconds.- Supports JSON, YAML, and TOML files — the format is detected from the file extension.
Commit the .gitignore entry
Add the state file path to .gitignore to avoid accidentally committing runtime state to version control. The file is machine-generated and changes frequently.
RedisBackend¶
Uses redis.asyncio for fully async, multi-instance deployments. State is shared across all app instances, and the subscribe() method enables real-time SSE push in the admin dashboard via Redis pub/sub.
shield.core.backends.redis.RedisBackend
¶
Bases: ShieldBackend
Backend that stores all state in Redis.
Supports multi-instance deployments. subscribe() uses Redis
pub/sub so that state changes made by one instance are immediately
visible in the dashboard on any other instance.
Parameters¶
url:
Redis connection URL (e.g. "redis://localhost:6379/0").
get_state
async
¶
Return the current state for path.
Raises KeyError if no state has been registered for path.
set_state
async
¶
Persist state for path, update the route-index, and publish to
shield:changes.
The state key and the route-index entry are written atomically in a
single pipeline so list_states() can never see a state key that
is missing from the index (or vice-versa).
delete_state
async
¶
Remove state for path and remove it from the route-index.
No-op if path is not registered.
list_states
async
¶
Return all registered route states.
Uses SMEMBERS shield:route-index + MGET instead of the
dangerous KEYS shield:state:* pattern. KEYS is an O(keyspace)
blocking command that can freeze a busy Redis server; SMEMBERS on
the dedicated route-index set is safe to use in production.
write_audit
async
¶
Append entry to both the global audit list and the per-path list.
Both lists are capped at 1000 entries via LTRIM. Writing to a
per-path list means get_audit_log(path=X) can fetch exactly the
required entries directly — no full-list fetch-then-filter in Python.
get_audit_log
async
¶
Return audit entries, newest first.
When path is provided the per-path list is used — fetches exactly
limit entries via a single LRANGE call, eliminating the
fetch-all-then-filter pattern of the previous implementation.
set_global_config
async
¶
Persist config and broadcast a cache-invalidation signal.
Calls the base implementation (which stores the config via
set_state and publishes to shield:changes), then
additionally publishes a lightweight "1" signal to
shield:global_invalidate. Any other instance running
subscribe_global_config() receives this signal and
immediately drops its in-process GlobalMaintenanceConfig
cache so that the next check() call re-reads from Redis.
The extra publish is best-effort: a Redis error here is logged and swallowed so that a transient Redis blip never prevents a global maintenance toggle from taking effect locally.
try_claim_webhook_dispatch
async
¶
Use Redis SET NX to claim exclusive webhook dispatch rights.
The key shield:webhook:dedup:{dedup_key} is written with
NX (only if absent) and a TTL. The instance that wins the
atomic write fires the webhooks; all others receive None from
Redis and skip dispatch.
Fails open: a Redis error logs a warning and returns True so
that webhooks are over-delivered rather than silently dropped.
Parameters¶
dedup_key:
Deterministic key identifying the event (hash of event + path
+ serialised state — computed by ShieldEngine).
ttl_seconds:
Key TTL. After this window the key expires, allowing
re-delivery if the winning instance crashed mid-dispatch.
write_rate_limit_hit
async
¶
Append a rate limit hit record, evicting the oldest when the cap is reached.
get_rate_limit_hits
async
¶
Return rate limit hits, newest first, optionally filtered by path.
set_rate_limit_policy
async
¶
Persist policy_data for path/method in Redis and broadcast to
all other instances via shield:rl-policy-change.
get_rate_limit_policies
async
¶
Return all persisted rate limit policies from Redis.
delete_rate_limit_policy
async
¶
Remove the persisted rate limit policy for path/method from Redis and broadcast the deletion to all other instances.
subscribe_global_config
async
¶
Yield None whenever the global maintenance config changes.
Subscribes to shield:global_invalidate. Each message
arrival means another instance has written a new
GlobalMaintenanceConfig to Redis and this instance should
drop its in-process cache.
The generator runs indefinitely — callers (ShieldEngine)
are expected to run it inside a cancellable asyncio.Task.
subscribe_rate_limit_policy
async
¶
Yield policy-change events whenever another instance sets or deletes a rate limit policy.
Each yielded dict has one of two shapes::
{"action": "set", "key": "GET:/api/orders", "policy": {...}}
{"action": "delete", "key": "GET:/api/orders"}
The generator runs indefinitely inside a cancellable asyncio.Task
managed by ShieldEngine.
Installation¶
Usage¶
from shield.core.backends.redis import RedisBackend
from shield.core.engine import ShieldEngine
engine = ShieldEngine(backend=RedisBackend(url="redis://localhost:6379/0"))
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
url |
str |
required | Redis connection URL. Supports redis://, rediss:// (TLS), and redis+unix://. |
Key schema¶
| Redis key | Type | Contents |
|---|---|---|
shield:state:{path} |
String | JSON-serialised RouteState |
shield:audit |
List | JSON-serialised AuditEntry items (LPUSH, LTRIM to 1000) |
shield:global |
String | JSON-serialised GlobalMaintenanceConfig |
shield:changes |
Pub/sub channel | Published on every set_state() for SSE live updates |
Characteristics¶
subscribe()is implemented via Redis pub/sub onshield:changes, enabling live SSE updates in the admin dashboard.- Uses connection pooling via
redis.asyncio.ConnectionPoolfor efficiency under load. - Redis connection errors are handled gracefully — the backend surfaces them as exceptions and the engine fails open. Read more in ShieldEngine: fail-open.
Use the lifespan context manager
RedisBackend opens a connection pool on startup() and closes it on shutdown(). Always wrap the engine in the lifespan context manager to ensure clean teardown. Read more in ShieldEngine: lifecycle.
Writing a custom backend¶
Subclass ShieldBackend and implement the six required async methods. The contract is intentionally minimal.
from shield.core.backends.base import ShieldBackend
from shield.core.models import AuditEntry, RouteState
class MyBackend(ShieldBackend):
async def get_state(self, path: str) -> RouteState:
"""Return stored state. Must raise KeyError if path is not found."""
...
async def set_state(self, path: str, state: RouteState) -> None:
"""Persist state, overwriting any existing entry for this path."""
...
async def delete_state(self, path: str) -> None:
"""Remove state for this path. No-op if not found."""
...
async def list_states(self) -> list[RouteState]:
"""Return all registered route states."""
...
async def write_audit(self, entry: AuditEntry) -> None:
"""Append an entry to the audit log."""
...
async def get_audit_log(
self, path: str | None = None, limit: int = 100
) -> list[AuditEntry]:
"""Return audit entries newest-first, optionally filtered by path."""
...
Contract rules
| Rule | Detail |
|---|---|
get_state() must raise KeyError |
The engine uses this to distinguish "not registered" from "registered but active" |
| Let errors bubble up | The engine wraps every backend call and handles errors (fail-open). Do not swallow exceptions. |
| All methods must be async | Use your storage library's async client to avoid blocking the event loop |
subscribe() is optional |
Override it if your storage supports pub/sub; otherwise the default raises NotImplementedError and the dashboard falls back to polling |
Serialisation helpers
Use the Pydantic v2 model methods to convert between RouteState/AuditEntry and JSON:
Lifecycle hooks (startup and shutdown)
Override startup() and shutdown() to manage connections:
See Building your own backend for a complete working SQLite example.