Imperal Docs
Recipes

Scheduled job — daily fan-out

Cron-driven job that fans out across all users using list_users + as_user

A scheduled handler runs once across the entire installation under system context — there is no single ctx.user. The canonical pattern is to enumerate every user who has data in the relevant collection using ctx.store.list_users(), obtain a user-scoped context with ctx.as_user(), and isolate each user's work inside a try/except so one failure never aborts the rest of the fan-out. This pattern is used directly in web-tools/handlers_schedule.py.


handlers_schedule.py — complete minimal example
from __future__ import annotations

import datetime
import logging

from imperal_sdk import Extension
from imperal_sdk.types import ActionResult

log = logging.getLogger(__name__)

ext = Extension(
    "billing-sync",
    version="1.0.0",
    display_name="Billing Sync",
    description="Billing Sync — keeps your billing records in sync with Stripe.",
    icon="icon.svg",
    actions_explicit=True,
)

# ── Daily fan-out — summarise unread items per user ──────────────────────────

@ext.schedule("daily_summary", cron="0 8 * * *")
async def daily_summary(ctx) -> None:
    """Daily at 08:00 UTC: notify each user about their unread item count."""
    now = datetime.datetime.now(datetime.timezone.utc)
    notified = 0

    # ctx.store.list_users yields one imperal_id per user who has data in the
    # collection. It is an async iterator — safe for large installations.
    async for user_id in ctx.store.list_users("bs_subscriptions"):
        # ctx.as_user scopes ALL store / notify / http calls to this user.
        # Raises RuntimeError outside system context; ValueError for empty id.
        user_ctx = ctx.as_user(user_id)

        # Per-user isolation: one bad user never aborts the rest of the job.
        try:
            # Check the user is still subscribed (idempotency guard).
            subs = await user_ctx.store.query(
                "bs_subscriptions",
                where={"active": True},
                limit=1,
            )
            if not subs.data:
                continue  # unsubscribed — skip silently

            # Timestamp guard: skip if we already notified within the last 20 h.
            last_doc = await user_ctx.store.get("bs_daily_runs", user_id)
            if last_doc:
                last_ts_str: str = last_doc.data.get("notified_at", "")
                if last_ts_str:
                    last_ts = datetime.datetime.fromisoformat(last_ts_str)
                    if last_ts.tzinfo is None:
                        last_ts = last_ts.replace(tzinfo=datetime.timezone.utc)
                    if (now - last_ts).total_seconds() < 72000:  # 20 h
                        continue  # too recent — skip

            # Fetch the count and notify the user.
            count_result = await user_ctx.store.query(
                "bs_invoices",
                where={"status": "pending"},
                limit=200,
            )
            count = len(count_result.data)
            if count > 0:
                await user_ctx.notify(
                    f"You have {count} pending invoice(s) waiting for review."
                )

            # Record the run timestamp so the timestamp guard works next time.
            await user_ctx.store.update(
                "bs_daily_runs",
                user_id,
                {"notified_at": now.isoformat()},
            )
            notified += 1

        except Exception as exc:  # noqa: BLE001
            log.warning("daily_summary: user %s failed: %s", user_id, exc)

    if notified:
        await ctx.log(f"daily_summary: notified {notified} user(s)", level="info")

Walk-through

Why system context has no ctx.user. The handler is not triggered by a user request. ctx.user.imperal_id is "__system__" and ctx.tenant is None. Using ctx.user.imperal_id as a real user ID — for example passing it to ctx.store.query(...) — silently queries the system namespace and returns nothing. Always fan out via ctx.store.list_users() first.

ctx.store.list_users(collection). Yields one user_id string per user who has at least one document in collection. It is a lazy async iterator backed by a cursor scan — it does not load all users into memory at once. The collection name must match the one your extension writes to. If the collection is empty the iterator yields nothing.

ctx.as_user(user_id). Returns a fully scoped Context where every store, notify, http, and cache call is isolated to that user. It does not make a network call. Raises RuntimeError if called outside system context; raises ValueError if user_id is empty or "__system__".

Per-user try/except isolation. Without it, any exception from one user (network blip, corrupt document, store timeout) raises out of the loop and aborts all remaining users. Catch at the per-user boundary, log a warning, and continue. The outer try/except in the example wraps the entire per-user block — one failed query does not break the loop.

Timestamp guard (idempotency). The web-kernel may re-run a scheduled handler after a worker restart or transient failure. Storing a notified_at timestamp and comparing it to the interval prevents double-notification and double-writes. The pattern (from web-tools/handlers_schedule.py) reads the sentinel before doing any work, skips if still within the interval, and writes the new timestamp after the work is done.

ActionResult is optional here. @ext.schedule handlers return None — the web-kernel does not surface a return value to any user. ActionResult.success / .error is not expected. Raise or log errors; do not return them.


On this page