@ext.schedule reference
Cron-driven background jobs — system context, fan-out pattern, retry semantics
@ext.schedule registers a cron-driven background job that the web-kernel runs on a recurring timer. Unlike @chat.function (user-triggered) or @ext.skeleton (per-user ambient probe), a scheduled handler runs once across the entire installation — in system context. There is no ctx.user representing an individual user. The canonical pattern is to fan out across all users explicitly.
Use scheduled tasks for time-driven work that cannot wait for a user to ask: purging stale data, pre-warming caches, running health checks against external services, or aggregating metrics.
Where it lives
@ext.schedule is a method on the Extension instance, not on ChatExtension. Register it from the same file that holds your ext = Extension(...), or import ext from app.py into a dedicated handlers_schedule.py module — the pattern used in every production extension.
from imperal_sdk import Extension
ext = Extension(
"my-app",
version="1.0.0",
display_name="My App",
description="My App — AI-powered tool for managing your resources.",
icon="icon.svg",
actions_explicit=True,
)
@ext.schedule("hourly_cleanup", cron="0 * * * *")
async def hourly_cleanup(ctx) -> None:
"""Runs every hour under system context."""
async for user_id in ctx.store.list_users("my_collection"):
user_ctx = ctx.as_user(user_id)
await user_ctx.store.delete("temp_items", "stale")Signature
def schedule(
self,
name: str,
cron: str,
) -> Callable:
...Both parameters are positional. There are no keyword-only arguments.
Kwargs reference
name
Prop
Type
- Use
snake_casethat describes the job, not a timing —"monitor_runner"not"every_hour". - The name appears in the Developer Portal, in the manifest, and in web-kernel logs. Make it recognisable.
- Each
@ext.schedulein an extension must use a differentname.
cron
Prop
Type
The cron format is the standard Unix five-field format:
┌───── minute (0–59)
│ ┌─── hour (0–23)
│ │ ┌─ day of month (1–31)
│ │ │ ┌ month (1–12)
│ │ │ │ ┌ day of week (0–7, where 0 and 7 = Sunday)
│ │ │ │ │
* * * * *Common patterns:
| Expression | Meaning |
|---|---|
"* * * * *" | Every minute |
"*/3 * * * *" | Every 3 minutes |
"0 * * * *" | Every hour (on the hour) |
"0 */4 * * *" | Every 4 hours |
"0 0 * * *" | Daily at midnight UTC |
"0 2 * * 1" | Every Monday at 02:00 UTC |
"0 9 1 * *" | First day of every month at 09:00 UTC |
All cron times are UTC
The web-kernel interprets cron expressions in UTC. There is no timezone parameter. If a job must run at a local business hour, convert the UTC offset manually in the cron expression.
Internal storage and manifest
Internally the decorator stores:
_schedules[name] = ScheduleDef(name=name, func=func, cron=cron)The manifest includes a schedules array:
{
"schedules": [
{ "name": "monitor_runner", "cron": "0 * * * *" }
]
}No synthetic tool is emitted for @ext.schedule — unlike @ext.panel or @ext.webhook, there is no __schedule__{name} entry in the manifest's tools array. The web-kernel dispatches scheduled jobs through a separate scheduling path.
System context
Scheduled handlers run under a system context. The ctx object is fully constructed, but ctx.user represents the system actor rather than a real user:
ctx.user.imperal_idequals"__system__"ctx.user.roleis"system"ctx.user.emailis""ctx.tenantisNone— there is no single tenant
This means ctx.store.query(...), ctx.store.get(...), and similar calls issued directly on ctx operate in the system namespace, not in any user's partition. Direct store reads on the system context will not return user data.
Available ctx attributes in scheduled handlers
| Attribute | Available? | Notes |
|---|---|---|
ctx.user | Yes | imperal_id == "__system__" — not a real user |
ctx.tenant | No — None | No single tenant in system context |
ctx.store | Yes | System namespace; use list_users + as_user for user data |
ctx.store.list_users(collection) | Yes | System-context only; yields user IDs |
ctx.store.query_all(collection) | Yes | System-context only; all docs across all users |
ctx.as_user(user_id) | Yes | Returns a user-scoped Context for fan-out |
ctx.http | Yes | Outbound HTTP — no per-user auth state |
ctx.config | Yes | Admin-configured extension settings |
ctx.ai | Yes | LLM completions |
ctx.billing | Yes | Limits checks |
ctx.notify | Yes | Use user_ctx.notify(...) after fan-out |
ctx.cache | Yes | Scoped to system — use user_ctx.cache for user data |
ctx.storage | Yes | Shared file storage |
ctx.extensions | Yes | IPC calls to other extensions |
ctx.log(msg) | Yes | Structured log in extension dashboard |
ctx.progress(pct) | No | Only meaningful in user-facing tool calls |
ctx.skeleton | No | Guarded — raises SkeletonAccessForbidden outside @ext.skeleton |
ctx.time | Yes | Web-kernel-injected time context |
Do not read ctx.user in scheduled handlers
ctx.user.imperal_id is "__system__" in scheduled context. Any code path that
treats ctx.user.imperal_id as a real user ID will silently write or read from the
system namespace. Always fan out via ctx.store.list_users() and operate on the
returned user_ctx.
Fan-out pattern
The canonical pattern for scheduled tasks that must act on behalf of every user is:
- Call
ctx.store.list_users(collection)— async iterator that yields user IDs for every user who has at least one document incollection. - For each user ID, call
ctx.as_user(user_id)to get auser_ctxscoped to that user. - Use
user_ctx.store,user_ctx.cache,user_ctx.notify, etc. for all per-user operations.
import logging
from imperal_sdk import Extension
log = logging.getLogger(__name__)
ext = Extension(
"my-app",
version="1.0.0",
display_name="My App",
description="My App — AI-powered tool for managing your resources.",
icon="icon.svg",
actions_explicit=True,
)
@ext.schedule("daily_digest", cron="0 8 * * *")
async def daily_digest(ctx) -> None:
"""Send a daily digest to each user at 08:00 UTC."""
async for user_id in ctx.store.list_users("subscriptions"):
user_ctx = ctx.as_user(user_id)
try:
sub = await user_ctx.store.query(
"subscriptions", where={"active": True}, limit=1
)
if not sub.data:
continue
count = await user_ctx.store.count("items", where={"unread": True})
if count > 0:
await user_ctx.notify(f"You have {count} unread item(s).")
except Exception as exc:
log.warning("daily_digest: user %s failed: %s", user_id, exc)ctx.store.list_users(collection)
Prop
Type
- System-context only. Raises
RuntimeErrorif called on a non-system context. - Yields each
user_idas a string — the sameimperal_idthatctx.user.imperal_idwould hold for a real user call. - Iteration is lazy — documents are fetched in pages. Safe for installations with thousands of users.
ctx.as_user(user_id)
Prop
Type
- Returns a
Contextwithctx.user.imperal_id == user_id. - The returned context is fully functional:
user_ctx.store,user_ctx.cache,user_ctx.notify,user_ctx.httpall work normally, scoped to that user. - Raises
RuntimeErrorif called outside system context. - Raises
ValueErrorifuser_idis empty or"__system__".
Idempotency
The web-kernel may fire a scheduled handler more than once in rare cases — for example, after a worker restart during a job or after a transient failure followed by a retry. Design handlers to be safe to run more than once:
- Check before writing. Read existing state before creating or updating records. If the record already reflects the expected outcome, skip the write.
- Use timestamps as guards. Store
last_run_aton each per-user document and skip if the elapsed time is less than the scheduled interval. - Prefer upserts over creates.
ctx.store.update(...)on an existing doc is idempotent if the payload is the same.
The web-tools monitor runner pattern (from web-tools/handlers_schedule.py) demonstrates the timestamp-guard approach:
import asyncio
import datetime
import logging
from imperal_sdk import Extension
log = logging.getLogger(__name__)
ext = Extension(
"web-tools",
version="1.0.0",
display_name="Web Tools",
description="Web Tools — monitor websites and APIs with AI assistance.",
icon="icon.svg",
actions_explicit=True,
)
@ext.schedule("wt_monitor_runner", cron="0 * * * *")
async def run_scheduled_monitors(ctx) -> None:
"""Hourly: fan-out across all users, scan overdue monitors."""
now = datetime.datetime.now(datetime.timezone.utc)
run_count = 0
async for user_id in ctx.store.list_users("wt_monitors"):
user_ctx = ctx.as_user(user_id)
try:
page = await user_ctx.store.query(
"wt_monitors", where={"enabled": True}, limit=100
)
for mon in page.data:
try:
last_run = mon.data.get("last_run_at")
interval_h = mon.data.get("interval_hours", 24)
if last_run:
last_dt = datetime.datetime.fromisoformat(last_run)
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=datetime.timezone.utc)
if (now - last_dt).total_seconds() / 3600 < interval_h:
continue # not yet overdue — skip
run_count += 1
await user_ctx.store.update("wt_monitors", mon.id, {
"last_run_at": now.isoformat(),
})
except Exception as exc:
log.warning("monitor %s failed: %s", mon.id, exc)
except Exception as exc:
log.warning("user %s failed: %s", user_id, exc)
if run_count:
log.info("ran %d monitor(s)", run_count)Production examples
Daily backfill across all users
Adapted from web-tools/handlers_schedule.py. The outer async for fan-out and per-user error isolation are the defining pattern. Each user failure logs a warning and continues to the next user — one bad user never aborts the whole job.
import asyncio
import datetime
import logging
from imperal_sdk import Extension
log = logging.getLogger(__name__)
ext = Extension(
"web-tools",
version="1.4.0",
display_name="Web Tools",
description="Web Tools — monitor websites and APIs with AI assistance.",
icon="icon.svg",
actions_explicit=True,
)
@ext.schedule("wt_monitor_runner", cron="0 * * * *")
async def run_scheduled_monitors(ctx) -> None:
"""Hourly: fan-out, timestamp-guard, scan overdue monitors."""
now = datetime.datetime.now(datetime.timezone.utc)
sem = asyncio.Semaphore(10) # cap concurrent user scans
async def _scan_user(user_id: str) -> int:
async with sem:
user_ctx = ctx.as_user(user_id)
ran = 0
try:
page = await user_ctx.store.query(
"wt_monitors", where={"enabled": True}, limit=100
)
for mon in page.data:
last_run = mon.data.get("last_run_at")
interval_h = mon.data.get("interval_hours", 24)
if last_run:
last_dt = datetime.datetime.fromisoformat(last_run)
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=datetime.timezone.utc)
if (now - last_dt).total_seconds() / 3600 < interval_h:
continue
await user_ctx.store.update("wt_monitors", mon.id, {
"last_run_at": now.isoformat(),
})
ran += 1
except Exception as exc:
log.warning("wt_monitor_runner: user %s error: %s", user_id, exc)
return ran
user_ids = [uid async for uid in ctx.store.list_users("wt_monitors")]
results = await asyncio.gather(*[_scan_user(uid) for uid in user_ids])
total = sum(results)
if total:
log.info("wt_monitor_runner: scanned %d monitor(s) across %d user(s)", total, len(user_ids))Inbox cache pre-warmer
Adapted from mail-client/panels_schedule.py. Runs every 3 minutes to pre-populate the inbox cache so panel loads are instant. Combines fan-out with ctx.cache.get / ctx.cache.set.
import logging
from datetime import datetime, timezone
from imperal_sdk import Extension
log = logging.getLogger("mail")
ext = Extension(
"mail-client",
version="4.3.0",
display_name="Mail Client",
description="Mail Client — read and send email with AI assistance.",
icon="icon.svg",
actions_explicit=True,
)
@ext.schedule("inbox_warmup", cron="*/3 * * * *")
async def inbox_warmup(ctx) -> None:
"""Pre-warm inbox page 2 for all users every 3 minutes."""
async for user_id in ctx.store.list_users("mail_accounts"):
user_ctx = ctx.as_user(user_id)
try:
accounts_page = await user_ctx.store.query(
"mail_accounts", where={"is_active": True}
)
for acc in accounts_page.data:
try:
cached = await user_ctx.cache.get(
f"inbox_manifest:{acc.data['email']}", object
)
if not cached:
continue # manifest not built yet — skip
# fetch page 2 and write into cache
await user_ctx.store.update("mail_accounts", acc.id, {
"cache_warmed_at": datetime.now(timezone.utc).isoformat(),
})
except Exception as exc:
log.debug("inbox_warmup account error: %s", exc)
except Exception as exc:
log.debug("inbox_warmup user error: %s", exc)Periodic external API sync
Single-user cleanup variant. Some scheduled tasks do not need fan-out — for example, syncing a shared configuration or purging a global staging area. In this case, operate directly on ctx (system namespace) or on a known fixed collection.
import logging
from imperal_sdk import Extension
log = logging.getLogger(__name__)
ext = Extension(
"ad-sync",
version="1.0.0",
display_name="Ad Sync",
description="Ad Sync — sync campaign data from external ad platforms.",
icon="icon.svg",
actions_explicit=True,
)
@ext.schedule("platform_token_refresh", cron="0 */6 * * *")
async def refresh_platform_tokens(ctx) -> None:
"""Refresh OAuth tokens for all connected ad accounts every 6 hours."""
async for user_id in ctx.store.list_users("ad_accounts"):
user_ctx = ctx.as_user(user_id)
try:
page = await user_ctx.store.query("ad_accounts", where={"active": True})
for acc in page.data:
expires_at = acc.data.get("token_expires_at", "")
if not expires_at:
continue
resp = await user_ctx.http.post(
"https://api.ad-platform.example/oauth/token",
json={
"grant_type": "refresh_token",
"refresh_token": acc.data.get("refresh_token", ""),
},
)
if resp.ok:
body = resp.json()
await user_ctx.store.update("ad_accounts", acc.id, {
"access_token": body.get("access_token", ""),
"token_expires_at": body.get("expires_at", ""),
})
except Exception as exc:
log.warning("token_refresh: user %s failed: %s", user_id, exc)V20 effects requirement
@ext.schedule handlers are not @chat.function handlers and are not subject to the V14–V24 validators that govern chat functions. There is no effects= kwarg on @ext.schedule — the decorator only accepts name and cron.
However, if your scheduled handler calls other extensions via ctx.extensions.emit(...) or writes data that triggers downstream chat flows, document those side-effects in the job's docstring and in your extension's CLAUDE.md or operator notes. This is an operational convention, not an SDK enforcement.
Common pitfalls
Reading ctx.user directly
from imperal_sdk import Extension
ext = Extension(
"my-app",
display_name="My App",
description="My App — manage resources with AI assistance.",
icon="icon.svg",
actions_explicit=True,
)
# WRONG — ctx.user.imperal_id is "__system__" in scheduled context
@ext.schedule("broken_job", cron="0 * * * *")
async def broken_job(ctx) -> None:
items = await ctx.store.query("items", where={"owner": ctx.user.imperal_id})
# This queries the system namespace — returns nothing for real users
# CORRECT — fan out, then use user_ctx
@ext.schedule("correct_job", cron="0 * * * *")
async def correct_job(ctx) -> None:
async for user_id in ctx.store.list_users("items"):
user_ctx = ctx.as_user(user_id)
items = await user_ctx.store.query("items")
# operates correctly in the user's namespaceMissing per-user error isolation
import logging
from imperal_sdk import Extension
log = logging.getLogger(__name__)
ext = Extension(
"my-app",
display_name="My App",
description="My App — manage resources with AI assistance.",
icon="icon.svg",
actions_explicit=True,
)
# WRONG — one failing user aborts all remaining users
@ext.schedule("fragile_job", cron="0 * * * *")
async def fragile_job(ctx) -> None:
async for user_id in ctx.store.list_users("items"):
user_ctx = ctx.as_user(user_id)
await user_ctx.store.delete("items", "stale_item_id") # may raise
# CORRECT — isolate exceptions per user
@ext.schedule("resilient_job", cron="0 * * * *")
async def resilient_job(ctx) -> None:
async for user_id in ctx.store.list_users("items"):
user_ctx = ctx.as_user(user_id)
try:
await user_ctx.store.delete("items", "stale_item_id")
except Exception as exc:
log.warning("resilient_job: user %s failed: %s", user_id, exc)Cron fires on every minute instead of hourly
from imperal_sdk import Extension
ext = Extension(
"my-app",
display_name="My App",
description="My App — manage resources with AI assistance.",
icon="icon.svg",
actions_explicit=True,
)
# WRONG — "* * * * *" fires every single minute, not hourly
@ext.schedule("too_frequent", cron="* * * * *") # intended: hourly; actually: every minute
async def too_frequent(ctx) -> None: # type: ignore[misc]
pass # no-op for illustration
# CORRECT — "0 * * * *" fires at minute 0 of every hour (once per hour)
@ext.schedule("hourly", cron="0 * * * *")
async def hourly_job(ctx) -> None:
pass # hourlyValidate your cron expressions
Use a cron expression tester (e.g. crontab.guru) before deploying. An accidentally high-frequency schedule can generate thousands of ICNLI Worker runs per day.
Registering @ext.schedule on ChatExtension
from imperal_sdk import Extension, ChatExtension
ext = Extension(
"my-app",
display_name="My App",
description="My App — manage resources with AI assistance.",
icon="icon.svg",
actions_explicit=True,
)
chat = ChatExtension(
ext,
tool_name="my_app_chat",
description="My App AI assistant.",
)
# WRONG — @ext.schedule must be called on the Extension instance, not ChatExtension
# chat.schedule(...) # AttributeError — no such method
# CORRECT
@ext.schedule("my_job", cron="0 * * * *")
async def my_job(ctx) -> None:
passTesting
Import your handler function directly and pass a MockContext configured as a system context. Set user_id="__system__" to match the production context. Wire MockStore.list_users_result with a list of user IDs to simulate the fan-out iteration.
import pytest
from imperal_sdk.testing import MockContext, MockStore
# Assume your schedule handler is in handlers_schedule.py:
# from handlers_schedule import daily_digest
@pytest.mark.asyncio
async def test_daily_digest_skips_users_without_subscriptions():
ctx = MockContext(user_id="__system__", tool_type="system")
ctx.store = MockStore()
# Simulate two users in the "subscriptions" collection
await ctx.store.create("subscriptions", {"active": True, "user_id": "u_alice"})
await ctx.store.create("subscriptions", {"active": True, "user_id": "u_bob"})
# Call the handler directly
# await daily_digest(ctx)
# Assert per-user notifications were sent via ctx.notify
# assert len(ctx.notify.sent) == 2Cross-references
Scheduled tasks concept
Why scheduled tasks exist, how system context works, and when to use them versus skeletons and webhooks.
@ext.skeleton reference
Per-user ambient probes — the complement to scheduled jobs for surfacing live state.
@chat.function reference
User-triggered actions — the synchronous counterpart to scheduled tasks.
Decorators reference
Quick reference for all SDK decorators in one place.