Scheduled job — daily fan-out
Cron-driven job that fans out across all users using list_users + as_user
A scheduled handler runs once across the entire installation under system context — there is no single ctx.user. The canonical pattern is to enumerate every user who has data in the relevant collection using ctx.store.list_users(), obtain a user-scoped context with ctx.as_user(), and isolate each user's work inside a try/except so one failure never aborts the rest of the fan-out. This pattern is used directly in web-tools/handlers_schedule.py.
from __future__ import annotations
import datetime
import logging
from imperal_sdk import Extension
from imperal_sdk.types import ActionResult
log = logging.getLogger(__name__)
ext = Extension(
"billing-sync",
version="1.0.0",
display_name="Billing Sync",
description="Billing Sync — keeps your billing records in sync with Stripe.",
icon="icon.svg",
actions_explicit=True,
)
# ── Daily fan-out — summarise unread items per user ──────────────────────────
@ext.schedule("daily_summary", cron="0 8 * * *")
async def daily_summary(ctx) -> None:
"""Daily at 08:00 UTC: notify each user about their unread item count."""
now = datetime.datetime.now(datetime.timezone.utc)
notified = 0
# ctx.store.list_users yields one imperal_id per user who has data in the
# collection. It is an async iterator — safe for large installations.
async for user_id in ctx.store.list_users("bs_subscriptions"):
# ctx.as_user scopes ALL store / notify / http calls to this user.
# Raises RuntimeError outside system context; ValueError for empty id.
user_ctx = ctx.as_user(user_id)
# Per-user isolation: one bad user never aborts the rest of the job.
try:
# Check the user is still subscribed (idempotency guard).
subs = await user_ctx.store.query(
"bs_subscriptions",
where={"active": True},
limit=1,
)
if not subs.data:
continue # unsubscribed — skip silently
# Timestamp guard: skip if we already notified within the last 20 h.
last_doc = await user_ctx.store.get("bs_daily_runs", user_id)
if last_doc:
last_ts_str: str = last_doc.data.get("notified_at", "")
if last_ts_str:
last_ts = datetime.datetime.fromisoformat(last_ts_str)
if last_ts.tzinfo is None:
last_ts = last_ts.replace(tzinfo=datetime.timezone.utc)
if (now - last_ts).total_seconds() < 72000: # 20 h
continue # too recent — skip
# Fetch the count and notify the user.
count_result = await user_ctx.store.query(
"bs_invoices",
where={"status": "pending"},
limit=200,
)
count = len(count_result.data)
if count > 0:
await user_ctx.notify(
f"You have {count} pending invoice(s) waiting for review."
)
# Record the run timestamp so the timestamp guard works next time.
await user_ctx.store.update(
"bs_daily_runs",
user_id,
{"notified_at": now.isoformat()},
)
notified += 1
except Exception as exc: # noqa: BLE001
log.warning("daily_summary: user %s failed: %s", user_id, exc)
if notified:
await ctx.log(f"daily_summary: notified {notified} user(s)", level="info")Walk-through
Why system context has no ctx.user.
The handler is not triggered by a user request. ctx.user.imperal_id is "__system__" and ctx.tenant is None. Using ctx.user.imperal_id as a real user ID — for example passing it to ctx.store.query(...) — silently queries the system namespace and returns nothing. Always fan out via ctx.store.list_users() first.
ctx.store.list_users(collection).
Yields one user_id string per user who has at least one document in collection. It is a lazy async iterator backed by a cursor scan — it does not load all users into memory at once. The collection name must match the one your extension writes to. If the collection is empty the iterator yields nothing.
ctx.as_user(user_id).
Returns a fully scoped Context where every store, notify, http, and cache call is isolated to that user. It does not make a network call. Raises RuntimeError if called outside system context; raises ValueError if user_id is empty or "__system__".
Per-user try/except isolation.
Without it, any exception from one user (network blip, corrupt document, store timeout) raises out of the loop and aborts all remaining users. Catch at the per-user boundary, log a warning, and continue. The outer try/except in the example wraps the entire per-user block — one failed query does not break the loop.
Timestamp guard (idempotency).
The web-kernel may re-run a scheduled handler after a worker restart or transient failure. Storing a notified_at timestamp and comparing it to the interval prevents double-notification and double-writes. The pattern (from web-tools/handlers_schedule.py) reads the sentinel before doing any work, skips if still within the interval, and writes the new timestamp after the work is done.
ActionResult is optional here.
@ext.schedule handlers return None — the web-kernel does not surface a return value to any user. ActionResult.success / .error is not expected. Raise or log errors; do not return them.
Cross-links
@ext.schedule reference
Full parameter reference, cron format table, system context availability table, and all common pitfalls.
Scheduled tasks concept
Why scheduled tasks exist, how system context works, and when to use them versus skeletons.
Recipe — webhook handler
The event-driven counterpart: receive an external HTTP call instead of waking on a timer.