Skip to content

Backends

A backend is the storage layer for api-shield. It persists route state and the audit log. All backends implement the ShieldBackend abstract base class, so you can swap them with a one-line change and nothing else in your application needs to change.

from shield.core.backends.base import ShieldBackend
Backend Storage Survives restart Live push (SSE) Best for
MemoryBackend In-process dict No Yes (asyncio.Queue) Development, testing
FileBackend JSON file on disk Yes No (polling fallback) Single-instance, simple deployments
RedisBackend Redis Yes Yes (pub/sub) Multi-instance, production

ShieldBackend (ABC)

The contract all backends must implement. If you are building a custom backend, subclass this.

shield.core.backends.base.ShieldBackend

Bases: ABC

Contract that all storage backends must implement.

Backends are responsible for persisting route state and audit logs. The subscribe() method is optional — backends that don't support pub/sub should raise NotImplementedError and the dashboard will fall back to polling.

get_state abstractmethod async

get_state(path)

Return the current state for path.

Raises KeyError if no state has been registered for path.

set_state abstractmethod async

set_state(path, state)

Persist state for path, overwriting any existing entry.

delete_state abstractmethod async

delete_state(path)

Remove all state for path.

No-op if path is not registered.

list_states abstractmethod async

list_states()

Return all registered route states.

write_audit abstractmethod async

write_audit(entry)

Append entry to the audit log.

get_audit_log abstractmethod async

get_audit_log(path=None, limit=100)

Return audit entries, newest first.

If path is given, return only entries for that route. limit caps the number of entries returned.

get_global_config async

get_global_config()

Return the current global maintenance configuration.

set_global_config async

set_global_config(config)

Persist config as the global maintenance configuration.

startup async

startup()

Called by ShieldEngine on startup.

Override to open database connections, create tables, or perform any other async initialisation your backend requires. The default implementation is a no-op, so built-in backends (MemoryBackend, FileBackend, RedisBackend) require no changes.

shutdown async

shutdown()

Called by ShieldEngine on shutdown.

Override to close database connections or release resources. The default implementation is a no-op.

subscribe async

subscribe()

Stream live RouteState changes as they occur.

Backends that support pub/sub (e.g. Redis) should override this. Backends that do not support it raise NotImplementedError, and the dashboard will fall back to polling list_states().

try_claim_webhook_dispatch async

try_claim_webhook_dispatch(dedup_key, ttl_seconds=60)

Attempt to claim exclusive webhook dispatch rights for dedup_key.

Called by ShieldEngine._fire_webhooks before dispatching to any registered webhook URLs. Returns True if this instance should fire the webhooks, False if another instance has already claimed the right for the same event.

The default implementation always returns True — single-instance backends (MemoryBackend, FileBackend) never have concurrent instances so deduplication is unnecessary.

RedisBackend overrides this with a SET NX command to ensure only one instance fires webhooks per unique event across an entire multi-instance deployment.

Parameters

dedup_key: A deterministic string that uniquely identifies this event (derived from event + path + serialised RouteState). ttl_seconds: How long the claim key lives in the backend. After this window the key expires, allowing re-delivery if the claiming instance crashed before it could dispatch. Defaults to 60 seconds.

subscribe_global_config async

subscribe_global_config()

Stream a signal whenever the global maintenance config changes.

Yields None on each remote change so callers can invalidate their in-process cache and re-fetch from the backend.

Backends that support this (e.g. RedisBackend) override this method. Others raise NotImplementedErrorShieldEngine.start() checks for this and simply skips starting the listener, so the engine falls back to the per-process cache behaviour without any error.

get_registered_paths async

get_registered_paths()

Return the set of all registered path keys for deduplication.

Used by ShieldEngine.register_batch() to detect already-registered routes without re-querying the full state list. The default implementation derives the set from list_states(); backends that store routes under transformed keys (e.g. ShieldServerBackend which adds a service prefix) should override this to return local/plain keys.

write_rate_limit_hit async

write_rate_limit_hit(hit)

Append a rate limit hit record to the backend log.

Default implementation is a no-op — backends that support persistent hit logs (FileBackend, RedisBackend) override this. MemoryBackend provides an in-memory list implementation.

get_rate_limit_hits async

get_rate_limit_hits(path=None, limit=100)

Return recent rate limit hits, newest first.

When path is given, return only hits for that route. Default implementation returns an empty list — override in backends that store hits persistently.

set_rate_limit_policy async

set_rate_limit_policy(path, method, policy_data)

Persist a rate limit policy for path/method.

policy_data is a JSON-serialisable dict matching the RateLimitPolicy schema. Overwrites any existing policy for the same path/method pair.

Default is a no-op. MemoryBackend, FileBackend, and RedisBackend override this to provide real persistence.

get_rate_limit_policies async

get_rate_limit_policies()

Return all persisted rate limit policies.

Each item is a JSON-serialisable dict matching the RateLimitPolicy schema. Returns an empty list by default.

delete_rate_limit_policy async

delete_rate_limit_policy(path, method)

Remove the persisted rate limit policy for path/method.

No-op if no policy is stored for that pair. Default implementation is a no-op.

get_global_rate_limit_policy async

get_global_rate_limit_policy()

Return the persisted global rate limit policy dict, or None.

set_global_rate_limit_policy async

set_global_rate_limit_policy(policy_data)

Persist policy_data as the global rate limit policy.

delete_global_rate_limit_policy async

delete_global_rate_limit_policy()

Remove the persisted global rate limit policy.

get_service_rate_limit_policy async

get_service_rate_limit_policy(service)

Return the persisted per-service rate limit policy dict, or None.

set_service_rate_limit_policy async

set_service_rate_limit_policy(service, policy_data)

Persist policy_data as the rate limit policy for service.

delete_service_rate_limit_policy async

delete_service_rate_limit_policy(service)

Remove the persisted rate limit policy for service.

get_all_service_rate_limit_policies async

get_all_service_rate_limit_policies()

Return all persisted per-service rate limit policies as {service: policy_data}.

subscribe_rate_limit_policy async

subscribe_rate_limit_policy()

Stream rate limit policy changes as they occur.

Each yielded dict has the shape::

{"action": "set",    "key": "GET:/api/orders", "policy": {...}}
{"action": "delete", "key": "GET:/api/orders"}

Backends that support pub/sub (e.g. RedisBackend) override this. Others raise NotImplementedErrorShieldEngine.start() treats that as "single-instance mode" and skips the listener task.

load_all_flags async

load_all_flags()

Return all stored feature flags.

Returns a list of FeatureFlag objects. The default implementation uses an in-memory store. Override for persistent backends.

save_flag async

save_flag(flag)

Persist flag (a FeatureFlag instance) by its key.

Default implementation keeps flags in memory. Override for persistent backends.

delete_flag async

delete_flag(flag_key)

Remove the flag with flag_key from storage.

No-op if the flag does not exist.

load_all_segments async

load_all_segments()

Return all stored segments.

Returns a list of Segment objects. The default implementation uses an in-memory store. Override for persistent backends.

save_segment async

save_segment(segment)

Persist segment (a Segment instance) by its key.

Default implementation keeps segments in memory. Override for persistent backends.

delete_segment async

delete_segment(segment_key)

Remove the segment with segment_key from storage.

No-op if the segment does not exist.


MemoryBackend

Stores all state in a Python dict in the current process. No installation required and no configuration needed — the default choice for getting started.

shield.core.backends.memory.MemoryBackend

MemoryBackend(max_rl_hit_entries=_DEFAULT_MAX_RL_HIT_ENTRIES)

Bases: ShieldBackend

Backend that stores all state in-process.

Default backend. Ideal for single-instance apps and testing. State is lost when the process restarts.

Audit log is stored in a deque (O(1) append/evict) with a parallel per-path index (dict[path, list[AuditEntry]]) so that filtered queries — get_audit_log(path=...) — are O(k) where k is the number of entries for that specific path, not O(total entries).

Rate limit hits are aggregated: consecutive blocks from the same (path, method, key) are grouped into a single RateLimitHit entry whose count increments and last_hit_at advances on every subsequent block. A new group is started once max_rl_hits_per_group is reached. This prevents a flood of 429s from a single client from saturating the hit log.

get_state async

get_state(path)

Return the current state for path.

Raises KeyError if no state has been registered for path.

set_state async

set_state(path, state)

Persist state for path and notify any subscribers.

delete_state async

delete_state(path)

Remove state for path. No-op if not registered.

list_states async

list_states()

Return all registered route states.

write_audit async

write_audit(entry)

Append entry to the audit log, capping at 1000 entries.

When the cap is reached the oldest entry is evicted from both the ordered deque and the per-path index in O(1) / O(k) time respectively, where k is the number of entries for the evicted path (≪ total entries).

get_audit_log async

get_audit_log(path=None, limit=100)

Return audit entries, newest first, optionally filtered by path.

When path is provided the per-path index is used — O(k) where k is the number of entries for that route — instead of scanning all 1000 entries (O(N)).

subscribe async

subscribe()

Yield RouteState objects as they are updated.

write_rate_limit_hit async

write_rate_limit_hit(hit)

Append a rate limit hit record, evicting the oldest when the cap is reached.

get_rate_limit_hits async

get_rate_limit_hits(path=None, limit=100)

Return rate limit hits, newest first, optionally filtered by path.

set_rate_limit_policy async

set_rate_limit_policy(path, method, policy_data)

Persist policy_data for path/method and notify subscribers.

get_rate_limit_policies async

get_rate_limit_policies()

Return all persisted rate limit policies.

delete_rate_limit_policy async

delete_rate_limit_policy(path, method)

Remove the persisted rate limit policy for path/method and notify subscribers.

subscribe_rate_limit_policy async

subscribe_rate_limit_policy()

Yield rate limit policy change events as they occur.

Usage

main.py
from shield.core.engine import ShieldEngine

# MemoryBackend is the default — no need to pass it explicitly
engine = ShieldEngine()

# Or explicitly
from shield.core.backends.memory import MemoryBackend
engine = ShieldEngine(backend=MemoryBackend())

Characteristics

  • State is stored in a Python dict and lost on process restart.
  • subscribe() is implemented via asyncio.Queue, enabling live SSE updates in the admin dashboard.
  • The audit log is capped at 1000 entries (oldest entries are discarded).

Not for production

MemoryBackend state is reset every time the process restarts. If you restart your server, all runtime state changes (routes disabled via CLI, maintenance mode set via dashboard) are lost. Use FileBackend or RedisBackend in production.


FileBackend

Reads and writes a JSON file using aiofiles. State survives process restarts and can be shared between processes on the same machine by pointing them at the same file.

shield.core.backends.file.FileBackend

FileBackend(path, max_rl_hit_entries=10000)

Bases: ShieldBackend

Backend that persists state to a file via aiofiles.

Survives process restarts. Suitable for simple single-instance deployments.

All reads are served from an in-memory cache populated on first access. Writes update the cache immediately then schedule a debounced disk flush — meaning rapid bursts of state changes (e.g. startup route registration) result in a single file write rather than N writes.

The file format is auto-detected from the extension:

Extension Format Extra dependency
.json JSON (none — stdlib)
.yaml / .yml YAML pip install pyyaml
.toml TOML pip install tomli-w

Data structure (shown as JSON — equivalent across all formats)::

{
    "states": { "GET:/payments": { ...RouteState... }, ... },
    "audit":  [ { ...AuditEntry... }, ... ]
}

subscribe() raises NotImplementedError — use polling.

Parameters

path: Path to the state file. Created automatically if absent. The extension determines the serialisation format.

Raises

ValueError If the file extension is not one of .json, .yaml, .yml, or .toml.

get_state async

get_state(path)

Return the current state for path from the in-memory cache.

Raises KeyError if no state has been registered for path. Zero file I/O after the cache is warm.

set_state async

set_state(path, state)

Update the in-memory cache and flush to disk immediately.

State changes are written synchronously so that a second FileBackend instance (e.g. the CLI) reading the same file sees the update right away. Unlike write_audit, state mutations are not debounced — durability is more important than batching here.

delete_state async

delete_state(path)

Remove state for path from cache and flush to disk immediately.

No-op if path is not registered.

list_states async

list_states()

Return all registered route states from the in-memory cache.

Zero file I/O after the cache is warm.

write_audit async

write_audit(entry)

Append entry to the in-memory audit log (capped at 1000 entries) and schedule a debounced disk flush.

get_audit_log async

get_audit_log(path=None, limit=100)

Return audit entries, newest first, optionally filtered by path.

Served entirely from the in-memory cache — zero file I/O.

subscribe async

subscribe()

Not supported — raises NotImplementedError.

write_rate_limit_hit async

write_rate_limit_hit(hit)

Append a rate limit hit record, evicting the oldest when the cap is reached.

get_rate_limit_hits async

get_rate_limit_hits(path=None, limit=100)

Return rate limit hits, newest first, optionally filtered by path.

set_rate_limit_policy async

set_rate_limit_policy(path, method, policy_data)

Persist policy_data for path/method and flush to disk.

get_rate_limit_policies async

get_rate_limit_policies()

Return all persisted rate limit policies.

delete_rate_limit_policy async

delete_rate_limit_policy(path, method)

Remove the persisted rate limit policy for path/method.

shutdown async

shutdown()

Flush any pending write and release resources.

Cancels the debounce timer and performs a synchronous flush so that in-flight state changes are not lost on graceful shutdown.

Usage

main.py
from shield.core.backends.file import FileBackend
from shield.core.engine import ShieldEngine

engine = ShieldEngine(backend=FileBackend(path="shield-state.json"))

Parameters

Parameter Type Default Description
path str required File path for the JSON state file. Relative to the working directory of the process.

File format

{
  "states": {
    "GET:/payments": {
      "path": "GET:/payments",
      "status": "maintenance",
      "reason": "DB migration",
      "window": null
    }
  },
  "audit": [
    {
      "id": "...",
      "timestamp": "2025-06-01T02:00:00Z",
      "path": "GET:/payments",
      "action": "maintenance",
      "actor": "alice",
      "platform": "cli"
    }
  ]
}

Characteristics

  • File writes go through an asyncio.Lock to prevent concurrent write corruption.
  • subscribe() raises NotImplementedError; the admin dashboard falls back to polling every few seconds.
  • Supports JSON, YAML, and TOML files — the format is detected from the file extension.

Commit the .gitignore entry

Add the state file path to .gitignore to avoid accidentally committing runtime state to version control. The file is machine-generated and changes frequently.


RedisBackend

Uses redis.asyncio for fully async, multi-instance deployments. State is shared across all app instances, and the subscribe() method enables real-time SSE push in the admin dashboard via Redis pub/sub.

shield.core.backends.redis.RedisBackend

RedisBackend(url='redis://localhost:6379/0', max_rl_hit_entries=10000)

Bases: ShieldBackend

Backend that stores all state in Redis.

Supports multi-instance deployments. subscribe() uses Redis pub/sub so that state changes made by one instance are immediately visible in the dashboard on any other instance.

Parameters

url: Redis connection URL (e.g. "redis://localhost:6379/0").

get_state async

get_state(path)

Return the current state for path.

Raises KeyError if no state has been registered for path.

set_state async

set_state(path, state)

Persist state for path, update the route-index, and publish to shield:changes.

The state key and the route-index entry are written atomically in a single pipeline so list_states() can never see a state key that is missing from the index (or vice-versa).

delete_state async

delete_state(path)

Remove state for path and remove it from the route-index.

No-op if path is not registered.

list_states async

list_states()

Return all registered route states.

Uses SMEMBERS shield:route-index + MGET instead of the dangerous KEYS shield:state:* pattern. KEYS is an O(keyspace) blocking command that can freeze a busy Redis server; SMEMBERS on the dedicated route-index set is safe to use in production.

write_audit async

write_audit(entry)

Append entry to both the global audit list and the per-path list.

Both lists are capped at 1000 entries via LTRIM. Writing to a per-path list means get_audit_log(path=X) can fetch exactly the required entries directly — no full-list fetch-then-filter in Python.

get_audit_log async

get_audit_log(path=None, limit=100)

Return audit entries, newest first.

When path is provided the per-path list is used — fetches exactly limit entries via a single LRANGE call, eliminating the fetch-all-then-filter pattern of the previous implementation.

subscribe async

subscribe()

Yield RouteState objects as they are updated via pub/sub.

set_global_config async

set_global_config(config)

Persist config and broadcast a cache-invalidation signal.

Calls the base implementation (which stores the config via set_state and publishes to shield:changes), then additionally publishes a lightweight "1" signal to shield:global_invalidate. Any other instance running subscribe_global_config() receives this signal and immediately drops its in-process GlobalMaintenanceConfig cache so that the next check() call re-reads from Redis.

The extra publish is best-effort: a Redis error here is logged and swallowed so that a transient Redis blip never prevents a global maintenance toggle from taking effect locally.

try_claim_webhook_dispatch async

try_claim_webhook_dispatch(dedup_key, ttl_seconds=60)

Use Redis SET NX to claim exclusive webhook dispatch rights.

The key shield:webhook:dedup:{dedup_key} is written with NX (only if absent) and a TTL. The instance that wins the atomic write fires the webhooks; all others receive None from Redis and skip dispatch.

Fails open: a Redis error logs a warning and returns True so that webhooks are over-delivered rather than silently dropped.

Parameters

dedup_key: Deterministic key identifying the event (hash of event + path + serialised state — computed by ShieldEngine). ttl_seconds: Key TTL. After this window the key expires, allowing re-delivery if the winning instance crashed mid-dispatch.

write_rate_limit_hit async

write_rate_limit_hit(hit)

Append a rate limit hit record, evicting the oldest when the cap is reached.

get_rate_limit_hits async

get_rate_limit_hits(path=None, limit=100)

Return rate limit hits, newest first, optionally filtered by path.

set_rate_limit_policy async

set_rate_limit_policy(path, method, policy_data)

Persist policy_data for path/method in Redis and broadcast to all other instances via shield:rl-policy-change.

get_rate_limit_policies async

get_rate_limit_policies()

Return all persisted rate limit policies from Redis.

delete_rate_limit_policy async

delete_rate_limit_policy(path, method)

Remove the persisted rate limit policy for path/method from Redis and broadcast the deletion to all other instances.

subscribe_global_config async

subscribe_global_config()

Yield None whenever the global maintenance config changes.

Subscribes to shield:global_invalidate. Each message arrival means another instance has written a new GlobalMaintenanceConfig to Redis and this instance should drop its in-process cache.

The generator runs indefinitely — callers (ShieldEngine) are expected to run it inside a cancellable asyncio.Task.

subscribe_rate_limit_policy async

subscribe_rate_limit_policy()

Yield policy-change events whenever another instance sets or deletes a rate limit policy.

Each yielded dict has one of two shapes::

{"action": "set",    "key": "GET:/api/orders", "policy": {...}}
{"action": "delete", "key": "GET:/api/orders"}

The generator runs indefinitely inside a cancellable asyncio.Task managed by ShieldEngine.

Installation

uv add "api-shield[redis]"

Usage

main.py
from shield.core.backends.redis import RedisBackend
from shield.core.engine import ShieldEngine

engine = ShieldEngine(backend=RedisBackend(url="redis://localhost:6379/0"))

Parameters

Parameter Type Default Description
url str required Redis connection URL. Supports redis://, rediss:// (TLS), and redis+unix://.

Key schema

Redis key Type Contents
shield:state:{path} String JSON-serialised RouteState
shield:audit List JSON-serialised AuditEntry items (LPUSH, LTRIM to 1000)
shield:global String JSON-serialised GlobalMaintenanceConfig
shield:changes Pub/sub channel Published on every set_state() for SSE live updates

Characteristics

  • subscribe() is implemented via Redis pub/sub on shield:changes, enabling live SSE updates in the admin dashboard.
  • Uses connection pooling via redis.asyncio.ConnectionPool for efficiency under load.
  • Redis connection errors are handled gracefully — the backend surfaces them as exceptions and the engine fails open. Read more in ShieldEngine: fail-open.

Use the lifespan context manager

RedisBackend opens a connection pool on startup() and closes it on shutdown(). Always wrap the engine in the lifespan context manager to ensure clean teardown. Read more in ShieldEngine: lifecycle.


Writing a custom backend

Subclass ShieldBackend and implement the six required async methods. The contract is intentionally minimal.

my_backend.py
from shield.core.backends.base import ShieldBackend
from shield.core.models import AuditEntry, RouteState


class MyBackend(ShieldBackend):

    async def get_state(self, path: str) -> RouteState:
        """Return stored state. Must raise KeyError if path is not found."""
        ...

    async def set_state(self, path: str, state: RouteState) -> None:
        """Persist state, overwriting any existing entry for this path."""
        ...

    async def delete_state(self, path: str) -> None:
        """Remove state for this path. No-op if not found."""
        ...

    async def list_states(self) -> list[RouteState]:
        """Return all registered route states."""
        ...

    async def write_audit(self, entry: AuditEntry) -> None:
        """Append an entry to the audit log."""
        ...

    async def get_audit_log(
        self, path: str | None = None, limit: int = 100
    ) -> list[AuditEntry]:
        """Return audit entries newest-first, optionally filtered by path."""
        ...
Contract rules
Rule Detail
get_state() must raise KeyError The engine uses this to distinguish "not registered" from "registered but active"
Let errors bubble up The engine wraps every backend call and handles errors (fail-open). Do not swallow exceptions.
All methods must be async Use your storage library's async client to avoid blocking the event loop
subscribe() is optional Override it if your storage supports pub/sub; otherwise the default raises NotImplementedError and the dashboard falls back to polling
Serialisation helpers

Use the Pydantic v2 model methods to convert between RouteState/AuditEntry and JSON:

# Serialise to a JSON string
json_str = state.model_dump_json()

# Deserialise from a JSON string
state = RouteState.model_validate_json(json_str)
Lifecycle hooks (startup and shutdown)

Override startup() and shutdown() to manage connections:

class MyBackend(ShieldBackend):

    async def startup(self) -> None:
        self._conn = await create_connection()

    async def shutdown(self) -> None:
        await self._conn.close()

See Building your own backend for a complete working SQLite example.