Imperal Docs
Recipes

Webhook handler — Stripe-style HMAC verification

External HTTP receiver with manual HMAC-SHA256 signature verification (per S4)

Security first. The SDK provides no built-in signature helpersecret_header only declares which header carries the signature in the manifest. Your handler must retrieve the shared secret from ctx.config, compute the expected HMAC-SHA256 digest over the raw body, and compare using hmac.compare_digest (constant-time — == is timing-vulnerable). After verification, guard against replay with an idempotency check before writing any state.

Never skip verification

Any caller on the public internet can reach your webhook endpoint. Without HMAC verification, an attacker who knows your app ID and path can trigger arbitrary writes. Always verify before acting on the payload.


handlers_webhook.py — complete minimal example
from __future__ import annotations

import hashlib
import hmac
import json
import logging

from imperal_sdk import Extension

log = logging.getLogger(__name__)

ext = Extension(
    "billing-sync",
    version="1.0.0",
    display_name="Billing Sync",
    description="Billing Sync — keeps your billing records in sync with Stripe.",
    icon="icon.svg",
    actions_explicit=True,
)

# ── Stripe-style HMAC-SHA256 webhook ─────────────────────────────────────────
#
# Stripe's Stripe-Signature header format: "t=<timestamp>,v1=<hex_digest>"
# The signed payload is: "<timestamp>.<raw_body>"
# Tolerance window: reject events older than 5 minutes (replay protection).

_TOLERANCE_SECONDS = 300


@ext.webhook("stripe", method="POST", secret_header="Stripe-Signature")
async def handle_stripe(
    ctx,
    headers: dict,
    body: str,
    query_params: dict,
) -> dict:
    """Verify Stripe HMAC-SHA256, check idempotency, then process payment events."""

    # 1. Retrieve the shared secret from admin config — never hardcode.
    #    ctx.config.require raises if the key is not set, keeping the fail loud.
    stripe_secret = await ctx.config.require("stripe_webhook_secret")

    # 2. Read and parse the Stripe-Signature header.
    sig_header = headers.get("stripe-signature", "")
    if not sig_header:
        await ctx.log("stripe webhook: missing Stripe-Signature header", level="warning")
        return {"status_code": 400, "error": "missing signature"}

    # Parse "t=<ts>,v1=<sig>[,v1=<sig2>]" into a dict of lists.
    parts: dict[str, list[str]] = {}
    for chunk in sig_header.split(","):
        if "=" not in chunk:
            continue
        k, v = chunk.split("=", 1)
        parts.setdefault(k.strip(), []).append(v.strip())

    timestamp_strs = parts.get("t", [])
    signatures = parts.get("v1", [])
    if not timestamp_strs or not signatures:
        return {"status_code": 400, "error": "malformed Stripe-Signature"}

    try:
        event_timestamp = int(timestamp_strs[0])
    except ValueError:
        return {"status_code": 400, "error": "non-integer timestamp"}

    # 3. Replay protection: reject events outside the tolerance window.
    import time as _time
    now = int(_time.time())
    if abs(now - event_timestamp) > _TOLERANCE_SECONDS:
        await ctx.log("stripe webhook: replay rejected (timestamp out of tolerance)", level="warning")
        return {"status_code": 400, "error": "timestamp out of tolerance"}

    # 4. Reconstruct the signed payload and compute the expected digest.
    signed_payload = f"{event_timestamp}.{body}"
    expected = hmac.new(
        stripe_secret.encode(),
        signed_payload.encode(),
        hashlib.sha256,
    ).hexdigest()

    # 5. Constant-time comparison — mandatory; '==' is timing-vulnerable.
    if not any(hmac.compare_digest(expected, sig) for sig in signatures):
        await ctx.log("stripe webhook: signature mismatch", level="warning")
        return {"status_code": 401, "error": "invalid signature"}

    # 6. Parse the verified payload.
    event = json.loads(body)
    event_id: str = event.get("id", "")
    event_type: str = event.get("type", "")

    if not event_id:
        return {"status_code": 400, "error": "missing event id"}

    # 7. Idempotency check — Stripe retries on non-2xx; guard against duplicates.
    existing = await ctx.store.get("bs_stripe_events", event_id)
    if existing:
        await ctx.log(f"stripe webhook: duplicate {event_id} — skipped")
        return {"status": "already_processed"}

    # 8. Process the event by type.
    await ctx.log(f"stripe webhook: processing {event_type} ({event_id})")

    if event_type == "invoice.payment_succeeded":
        customer_id: str = (
            event.get("data", {}).get("object", {}).get("customer", "")
        )
        # Look up the internal user who connected this Stripe customer.
        customer_doc = await ctx.store.get("bs_stripe_customers", customer_id)
        if customer_doc:
            imperal_id: str = customer_doc.data.get("imperal_id", "")
            if imperal_id:
                # Obtain a user-scoped context to write into the user's partition.
                user_ctx = ctx.as_user(imperal_id)
                await user_ctx.notify("Your payment was processed successfully.")

    # 9. Record the event so step 7 short-circuits on retry.
    await ctx.store.create("bs_stripe_events", {"id": event_id, "type": event_type})
    return {"status": "ok"}

Walk-through

Why hmac.compare_digest and not ==. String equality (==) short-circuits as soon as two bytes differ. An attacker can exploit the resulting timing difference to guess the correct signature one byte at a time. hmac.compare_digest always inspects every byte regardless of where the mismatch occurs, making the comparison constant-time and timing-attack-resistant. Use it for every signature comparison — including plain shared secrets.

ctx.config.require("key"). Fetches an admin-configured setting and raises immediately if the key is missing. Prefer require over .get for secrets: a loud KeyError at startup is safer than a None comparison that silently accepts all requests.

Raw body — do not decode before verifying. Stripe (and most providers) sign the raw request body as received. Call json.loads(body) only after the HMAC check passes. Decoding first risks altering whitespace or encoding and breaking the digest.

headers keys are lowercase. The platform normalises all incoming header names to lowercase before passing them to your handler. Always read with the lowercase form: headers.get("stripe-signature", ""), not "Stripe-Signature".

Replay protection. The timestamp embedded in the Stripe-Signature header prevents replaying a captured request hours later. Reject events outside a tolerance window (_TOLERANCE_SECONDS = 300 here). Adjust for your provider's recommendation.

Idempotency before writes. External providers retry failed deliveries. Steps 7–9 implement the standard guard: read for an existing record keyed by event_id before writing, return "already_processed" on a hit, and write the sentinel only after all work succeeds. This makes the handler safe to receive twice.

ctx.as_user(imperal_id) for user-scoped writes. ctx.user.imperal_id is "__webhook__" in webhook context — there is no logged-in user. To write into a specific user's store partition, resolve their imperal_id from a previously stored mapping (step 8), then call ctx.as_user(...) exactly as in the scheduled fan-out pattern.

Return {"status_code": N, ...} to control HTTP status. The platform serialises your return dict as JSON with status 200 by default. Include "status_code" in the dict to override: {"status_code": 401, "error": "..."} returns HTTP 401 to the caller.


On this page