Imperal Docs
SDK Reference

@ext.schedule reference

Cron-driven background jobs — system context, fan-out pattern, retry semantics

@ext.schedule registers a cron-driven background job that the web-kernel runs on a recurring timer. Unlike @chat.function (user-triggered) or @ext.skeleton (per-user ambient probe), a scheduled handler runs once across the entire installation — in system context. There is no ctx.user representing an individual user. The canonical pattern is to fan out across all users explicitly.

Use scheduled tasks for time-driven work that cannot wait for a user to ask: purging stale data, pre-warming caches, running health checks against external services, or aggregating metrics.

Where it lives

@ext.schedule is a method on the Extension instance, not on ChatExtension. Register it from the same file that holds your ext = Extension(...), or import ext from app.py into a dedicated handlers_schedule.py module — the pattern used in every production extension.

from imperal_sdk import Extension

ext = Extension(
    "my-app",
    version="1.0.0",
    display_name="My App",
    description="My App — AI-powered tool for managing your resources.",
    icon="icon.svg",
    actions_explicit=True,
)

@ext.schedule("hourly_cleanup", cron="0 * * * *")
async def hourly_cleanup(ctx) -> None:
    """Runs every hour under system context."""
    async for user_id in ctx.store.list_users("my_collection"):
        user_ctx = ctx.as_user(user_id)
        await user_ctx.store.delete("temp_items", "stale")

Signature

def schedule(
    self,
    name: str,
    cron: str,
) -> Callable:
    ...

Both parameters are positional. There are no keyword-only arguments.


Kwargs reference

name

Prop

Type

  • Use snake_case that describes the job, not a timing — "monitor_runner" not "every_hour".
  • The name appears in the Developer Portal, in the manifest, and in web-kernel logs. Make it recognisable.
  • Each @ext.schedule in an extension must use a different name.

cron

Prop

Type

The cron format is the standard Unix five-field format:

 ┌───── minute       (0–59)
 │ ┌─── hour         (0–23)
 │ │ ┌─ day of month (1–31)
 │ │ │ ┌ month       (1–12)
 │ │ │ │ ┌ day of week (0–7, where 0 and 7 = Sunday)
 │ │ │ │ │
 * * * * *

Common patterns:

ExpressionMeaning
"* * * * *"Every minute
"*/3 * * * *"Every 3 minutes
"0 * * * *"Every hour (on the hour)
"0 */4 * * *"Every 4 hours
"0 0 * * *"Daily at midnight UTC
"0 2 * * 1"Every Monday at 02:00 UTC
"0 9 1 * *"First day of every month at 09:00 UTC

All cron times are UTC

The web-kernel interprets cron expressions in UTC. There is no timezone parameter. If a job must run at a local business hour, convert the UTC offset manually in the cron expression.


Internal storage and manifest

Internally the decorator stores:

_schedules[name] = ScheduleDef(name=name, func=func, cron=cron)

The manifest includes a schedules array:

{
  "schedules": [
    { "name": "monitor_runner", "cron": "0 * * * *" }
  ]
}

No synthetic tool is emitted for @ext.schedule — unlike @ext.panel or @ext.webhook, there is no __schedule__{name} entry in the manifest's tools array. The web-kernel dispatches scheduled jobs through a separate scheduling path.


System context

Scheduled handlers run under a system context. The ctx object is fully constructed, but ctx.user represents the system actor rather than a real user:

  • ctx.user.imperal_id equals "__system__"
  • ctx.user.role is "system"
  • ctx.user.email is ""
  • ctx.tenant is None — there is no single tenant

This means ctx.store.query(...), ctx.store.get(...), and similar calls issued directly on ctx operate in the system namespace, not in any user's partition. Direct store reads on the system context will not return user data.

Available ctx attributes in scheduled handlers

AttributeAvailable?Notes
ctx.userYesimperal_id == "__system__" — not a real user
ctx.tenantNo — NoneNo single tenant in system context
ctx.storeYesSystem namespace; use list_users + as_user for user data
ctx.store.list_users(collection)YesSystem-context only; yields user IDs
ctx.store.query_all(collection)YesSystem-context only; all docs across all users
ctx.as_user(user_id)YesReturns a user-scoped Context for fan-out
ctx.httpYesOutbound HTTP — no per-user auth state
ctx.configYesAdmin-configured extension settings
ctx.aiYesLLM completions
ctx.billingYesLimits checks
ctx.notifyYesUse user_ctx.notify(...) after fan-out
ctx.cacheYesScoped to system — use user_ctx.cache for user data
ctx.storageYesShared file storage
ctx.extensionsYesIPC calls to other extensions
ctx.log(msg)YesStructured log in extension dashboard
ctx.progress(pct)NoOnly meaningful in user-facing tool calls
ctx.skeletonNoGuarded — raises SkeletonAccessForbidden outside @ext.skeleton
ctx.timeYesWeb-kernel-injected time context

Do not read ctx.user in scheduled handlers

ctx.user.imperal_id is "__system__" in scheduled context. Any code path that treats ctx.user.imperal_id as a real user ID will silently write or read from the system namespace. Always fan out via ctx.store.list_users() and operate on the returned user_ctx.


Fan-out pattern

The canonical pattern for scheduled tasks that must act on behalf of every user is:

  1. Call ctx.store.list_users(collection) — async iterator that yields user IDs for every user who has at least one document in collection.
  2. For each user ID, call ctx.as_user(user_id) to get a user_ctx scoped to that user.
  3. Use user_ctx.store, user_ctx.cache, user_ctx.notify, etc. for all per-user operations.
import logging
from imperal_sdk import Extension

log = logging.getLogger(__name__)
ext = Extension(
    "my-app",
    version="1.0.0",
    display_name="My App",
    description="My App — AI-powered tool for managing your resources.",
    icon="icon.svg",
    actions_explicit=True,
)


@ext.schedule("daily_digest", cron="0 8 * * *")
async def daily_digest(ctx) -> None:
    """Send a daily digest to each user at 08:00 UTC."""
    async for user_id in ctx.store.list_users("subscriptions"):
        user_ctx = ctx.as_user(user_id)
        try:
            sub = await user_ctx.store.query(
                "subscriptions", where={"active": True}, limit=1
            )
            if not sub.data:
                continue
            count = await user_ctx.store.count("items", where={"unread": True})
            if count > 0:
                await user_ctx.notify(f"You have {count} unread item(s).")
        except Exception as exc:
            log.warning("daily_digest: user %s failed: %s", user_id, exc)

ctx.store.list_users(collection)

Prop

Type

  • System-context only. Raises RuntimeError if called on a non-system context.
  • Yields each user_id as a string — the same imperal_id that ctx.user.imperal_id would hold for a real user call.
  • Iteration is lazy — documents are fetched in pages. Safe for installations with thousands of users.

ctx.as_user(user_id)

Prop

Type

  • Returns a Context with ctx.user.imperal_id == user_id.
  • The returned context is fully functional: user_ctx.store, user_ctx.cache, user_ctx.notify, user_ctx.http all work normally, scoped to that user.
  • Raises RuntimeError if called outside system context.
  • Raises ValueError if user_id is empty or "__system__".

Idempotency

The web-kernel may fire a scheduled handler more than once in rare cases — for example, after a worker restart during a job or after a transient failure followed by a retry. Design handlers to be safe to run more than once:

  • Check before writing. Read existing state before creating or updating records. If the record already reflects the expected outcome, skip the write.
  • Use timestamps as guards. Store last_run_at on each per-user document and skip if the elapsed time is less than the scheduled interval.
  • Prefer upserts over creates. ctx.store.update(...) on an existing doc is idempotent if the payload is the same.

The web-tools monitor runner pattern (from web-tools/handlers_schedule.py) demonstrates the timestamp-guard approach:

import asyncio
import datetime
import logging
from imperal_sdk import Extension

log = logging.getLogger(__name__)
ext = Extension(
    "web-tools",
    version="1.0.0",
    display_name="Web Tools",
    description="Web Tools — monitor websites and APIs with AI assistance.",
    icon="icon.svg",
    actions_explicit=True,
)


@ext.schedule("wt_monitor_runner", cron="0 * * * *")
async def run_scheduled_monitors(ctx) -> None:
    """Hourly: fan-out across all users, scan overdue monitors."""
    now = datetime.datetime.now(datetime.timezone.utc)
    run_count = 0
    async for user_id in ctx.store.list_users("wt_monitors"):
        user_ctx = ctx.as_user(user_id)
        try:
            page = await user_ctx.store.query(
                "wt_monitors", where={"enabled": True}, limit=100
            )
            for mon in page.data:
                try:
                    last_run = mon.data.get("last_run_at")
                    interval_h = mon.data.get("interval_hours", 24)
                    if last_run:
                        last_dt = datetime.datetime.fromisoformat(last_run)
                        if last_dt.tzinfo is None:
                            last_dt = last_dt.replace(tzinfo=datetime.timezone.utc)
                        if (now - last_dt).total_seconds() / 3600 < interval_h:
                            continue  # not yet overdue — skip
                    run_count += 1
                    await user_ctx.store.update("wt_monitors", mon.id, {
                        "last_run_at": now.isoformat(),
                    })
                except Exception as exc:
                    log.warning("monitor %s failed: %s", mon.id, exc)
        except Exception as exc:
            log.warning("user %s failed: %s", user_id, exc)
    if run_count:
        log.info("ran %d monitor(s)", run_count)

Production examples

Daily backfill across all users

Adapted from web-tools/handlers_schedule.py. The outer async for fan-out and per-user error isolation are the defining pattern. Each user failure logs a warning and continues to the next user — one bad user never aborts the whole job.

import asyncio
import datetime
import logging
from imperal_sdk import Extension

log = logging.getLogger(__name__)
ext = Extension(
    "web-tools",
    version="1.4.0",
    display_name="Web Tools",
    description="Web Tools — monitor websites and APIs with AI assistance.",
    icon="icon.svg",
    actions_explicit=True,
)


@ext.schedule("wt_monitor_runner", cron="0 * * * *")
async def run_scheduled_monitors(ctx) -> None:
    """Hourly: fan-out, timestamp-guard, scan overdue monitors."""
    now = datetime.datetime.now(datetime.timezone.utc)
    sem = asyncio.Semaphore(10)  # cap concurrent user scans

    async def _scan_user(user_id: str) -> int:
        async with sem:
            user_ctx = ctx.as_user(user_id)
            ran = 0
            try:
                page = await user_ctx.store.query(
                    "wt_monitors", where={"enabled": True}, limit=100
                )
                for mon in page.data:
                    last_run = mon.data.get("last_run_at")
                    interval_h = mon.data.get("interval_hours", 24)
                    if last_run:
                        last_dt = datetime.datetime.fromisoformat(last_run)
                        if last_dt.tzinfo is None:
                            last_dt = last_dt.replace(tzinfo=datetime.timezone.utc)
                        if (now - last_dt).total_seconds() / 3600 < interval_h:
                            continue
                    await user_ctx.store.update("wt_monitors", mon.id, {
                        "last_run_at": now.isoformat(),
                    })
                    ran += 1
            except Exception as exc:
                log.warning("wt_monitor_runner: user %s error: %s", user_id, exc)
            return ran

    user_ids = [uid async for uid in ctx.store.list_users("wt_monitors")]
    results = await asyncio.gather(*[_scan_user(uid) for uid in user_ids])
    total = sum(results)
    if total:
        log.info("wt_monitor_runner: scanned %d monitor(s) across %d user(s)", total, len(user_ids))

Inbox cache pre-warmer

Adapted from mail-client/panels_schedule.py. Runs every 3 minutes to pre-populate the inbox cache so panel loads are instant. Combines fan-out with ctx.cache.get / ctx.cache.set.

import logging
from datetime import datetime, timezone
from imperal_sdk import Extension

log = logging.getLogger("mail")
ext = Extension(
    "mail-client",
    version="4.3.0",
    display_name="Mail Client",
    description="Mail Client — read and send email with AI assistance.",
    icon="icon.svg",
    actions_explicit=True,
)


@ext.schedule("inbox_warmup", cron="*/3 * * * *")
async def inbox_warmup(ctx) -> None:
    """Pre-warm inbox page 2 for all users every 3 minutes."""
    async for user_id in ctx.store.list_users("mail_accounts"):
        user_ctx = ctx.as_user(user_id)
        try:
            accounts_page = await user_ctx.store.query(
                "mail_accounts", where={"is_active": True}
            )
            for acc in accounts_page.data:
                try:
                    cached = await user_ctx.cache.get(
                        f"inbox_manifest:{acc.data['email']}", object
                    )
                    if not cached:
                        continue  # manifest not built yet — skip
                    # fetch page 2 and write into cache
                    await user_ctx.store.update("mail_accounts", acc.id, {
                        "cache_warmed_at": datetime.now(timezone.utc).isoformat(),
                    })
                except Exception as exc:
                    log.debug("inbox_warmup account error: %s", exc)
        except Exception as exc:
            log.debug("inbox_warmup user error: %s", exc)

Periodic external API sync

Single-user cleanup variant. Some scheduled tasks do not need fan-out — for example, syncing a shared configuration or purging a global staging area. In this case, operate directly on ctx (system namespace) or on a known fixed collection.

import logging
from imperal_sdk import Extension

log = logging.getLogger(__name__)
ext = Extension(
    "ad-sync",
    version="1.0.0",
    display_name="Ad Sync",
    description="Ad Sync — sync campaign data from external ad platforms.",
    icon="icon.svg",
    actions_explicit=True,
)


@ext.schedule("platform_token_refresh", cron="0 */6 * * *")
async def refresh_platform_tokens(ctx) -> None:
    """Refresh OAuth tokens for all connected ad accounts every 6 hours."""
    async for user_id in ctx.store.list_users("ad_accounts"):
        user_ctx = ctx.as_user(user_id)
        try:
            page = await user_ctx.store.query("ad_accounts", where={"active": True})
            for acc in page.data:
                expires_at = acc.data.get("token_expires_at", "")
                if not expires_at:
                    continue
                resp = await user_ctx.http.post(
                    "https://api.ad-platform.example/oauth/token",
                    json={
                        "grant_type": "refresh_token",
                        "refresh_token": acc.data.get("refresh_token", ""),
                    },
                )
                if resp.ok:
                    body = resp.json()
                    await user_ctx.store.update("ad_accounts", acc.id, {
                        "access_token": body.get("access_token", ""),
                        "token_expires_at": body.get("expires_at", ""),
                    })
        except Exception as exc:
            log.warning("token_refresh: user %s failed: %s", user_id, exc)

V20 effects requirement

@ext.schedule handlers are not @chat.function handlers and are not subject to the V14–V24 validators that govern chat functions. There is no effects= kwarg on @ext.schedule — the decorator only accepts name and cron.

However, if your scheduled handler calls other extensions via ctx.extensions.emit(...) or writes data that triggers downstream chat flows, document those side-effects in the job's docstring and in your extension's CLAUDE.md or operator notes. This is an operational convention, not an SDK enforcement.


Common pitfalls

Reading ctx.user directly

from imperal_sdk import Extension

ext = Extension(
    "my-app",
    display_name="My App",
    description="My App — manage resources with AI assistance.",
    icon="icon.svg",
    actions_explicit=True,
)


# WRONG — ctx.user.imperal_id is "__system__" in scheduled context
@ext.schedule("broken_job", cron="0 * * * *")
async def broken_job(ctx) -> None:
    items = await ctx.store.query("items", where={"owner": ctx.user.imperal_id})
    # This queries the system namespace — returns nothing for real users


# CORRECT — fan out, then use user_ctx
@ext.schedule("correct_job", cron="0 * * * *")
async def correct_job(ctx) -> None:
    async for user_id in ctx.store.list_users("items"):
        user_ctx = ctx.as_user(user_id)
        items = await user_ctx.store.query("items")
        # operates correctly in the user's namespace

Missing per-user error isolation

import logging
from imperal_sdk import Extension

log = logging.getLogger(__name__)
ext = Extension(
    "my-app",
    display_name="My App",
    description="My App — manage resources with AI assistance.",
    icon="icon.svg",
    actions_explicit=True,
)


# WRONG — one failing user aborts all remaining users
@ext.schedule("fragile_job", cron="0 * * * *")
async def fragile_job(ctx) -> None:
    async for user_id in ctx.store.list_users("items"):
        user_ctx = ctx.as_user(user_id)
        await user_ctx.store.delete("items", "stale_item_id")  # may raise


# CORRECT — isolate exceptions per user
@ext.schedule("resilient_job", cron="0 * * * *")
async def resilient_job(ctx) -> None:
    async for user_id in ctx.store.list_users("items"):
        user_ctx = ctx.as_user(user_id)
        try:
            await user_ctx.store.delete("items", "stale_item_id")
        except Exception as exc:
            log.warning("resilient_job: user %s failed: %s", user_id, exc)

Cron fires on every minute instead of hourly

from imperal_sdk import Extension

ext = Extension(
    "my-app",
    display_name="My App",
    description="My App — manage resources with AI assistance.",
    icon="icon.svg",
    actions_explicit=True,
)


# WRONG — "* * * * *" fires every single minute, not hourly
@ext.schedule("too_frequent", cron="* * * * *")  # intended: hourly; actually: every minute
async def too_frequent(ctx) -> None:  # type: ignore[misc]
    pass  # no-op for illustration


# CORRECT — "0 * * * *" fires at minute 0 of every hour (once per hour)
@ext.schedule("hourly", cron="0 * * * *")
async def hourly_job(ctx) -> None:
    pass  # hourly

Validate your cron expressions

Use a cron expression tester (e.g. crontab.guru) before deploying. An accidentally high-frequency schedule can generate thousands of ICNLI Worker runs per day.


Registering @ext.schedule on ChatExtension

from imperal_sdk import Extension, ChatExtension

ext = Extension(
    "my-app",
    display_name="My App",
    description="My App — manage resources with AI assistance.",
    icon="icon.svg",
    actions_explicit=True,
)
chat = ChatExtension(
    ext,
    tool_name="my_app_chat",
    description="My App AI assistant.",
)


# WRONG — @ext.schedule must be called on the Extension instance, not ChatExtension
# chat.schedule(...)  # AttributeError — no such method


# CORRECT
@ext.schedule("my_job", cron="0 * * * *")
async def my_job(ctx) -> None:
    pass

Testing

Import your handler function directly and pass a MockContext configured as a system context. Set user_id="__system__" to match the production context. Wire MockStore.list_users_result with a list of user IDs to simulate the fan-out iteration.

import pytest
from imperal_sdk.testing import MockContext, MockStore

# Assume your schedule handler is in handlers_schedule.py:
# from handlers_schedule import daily_digest

@pytest.mark.asyncio
async def test_daily_digest_skips_users_without_subscriptions():
    ctx = MockContext(user_id="__system__", tool_type="system")
    ctx.store = MockStore()
    # Simulate two users in the "subscriptions" collection
    await ctx.store.create("subscriptions", {"active": True, "user_id": "u_alice"})
    await ctx.store.create("subscriptions", {"active": True, "user_id": "u_bob"})

    # Call the handler directly
    # await daily_digest(ctx)

    # Assert per-user notifications were sent via ctx.notify
    # assert len(ctx.notify.sent) == 2

Cross-references

On this page