Skip to main content

Adding a custom event and handler

Evolve uses event-driven messaging for asynchronous communication between services. This guide walks through adding a new internal event type, publishing it from a service, and consuming it in a handler.

1. Define the event schema

Events are validated at runtime using Zod schemas. Define your event type in the shared schemas directory:

// packages/schemas/src/events/inventory-low.ts
import { z } from "zod";

export const inventoryLowEventSchema = z.object({
type: z.literal("inventory-low"),
sku: z.string(),
availableQuantity: z.number(),
threshold: z.number(),
storeKey: z.string(),
origin: z.string(),
timestamp: z.string().datetime({ offset: true }),
});

export type InventoryLowEvent = z.infer<typeof inventoryLowEventSchema>;

2. Publish the event

Use publishMessage from @evolve-packages/messaging to publish the event. The factory inspects the target URL to select the correct cloud provider (SQS, Service Bus, Pub/Sub, or Redis for local development):

import { publishMessage } from "@evolve-packages/messaging";
import { inventoryLowEventSchema } from "@evolve-packages/schemas";

const publish = publishMessage(config.INTERNAL_EVENTS_TARGET);

const event = inventoryLowEventSchema.parse({
type: "inventory-low",
sku: variant.sku,
availableQuantity: variant.quantity,
threshold: 10,
storeKey: storeContext.storeKey,
origin: "catalog-commercetools",
timestamp: new Date().toISOString(),
});

await publish(event, {
type: event.type,
origin: event.origin,
messageGroupId: variant.sku,
});

The type and origin are passed as message attributes for routing. The messageGroupId ensures ordering per SKU on transports that support it.

3. Create the event handler

Add a handler function that processes the event. The handler receives the event payload directly (not wrapped in a .data property). Use a switch on event.type to dispatch:

import type { CloudEventsPayload } from "@evolve-packages/messaging";

const handleInternalEvent = async (event: CloudEventsPayload) => {
switch (event.type) {
case "inventory-low":
return handleInventoryLow(event);
// ... other event types
}
};

const handleInventoryLow = async (event) => {
await notifyOpsTeam(event.sku, event.availableQuantity);
};

4. Wire the cloud trigger

Each cloud provider has a handler factory that wraps the trigger into the common callback signature:

AWS (SQS + Lambda):

import { createSQSHandler } from "@evolve-packages/messaging";
import { lambdaHandlerFactory } from "@evolve-packages/observability/lambda";

const sqsHandler = createSQSHandler(handleInternalEvent);
export const handler = lambdaHandlerFactory(
"Internal event handler",
() => sqsHandler,
);

Azure (Service Bus):

import { createServiceBusHandler } from "@evolve-packages/messaging";

const serviceBusHandler = createServiceBusHandler(
handleInternalEvent,
(message) => (message as CloudEvent<unknown>).data,
);

app.serviceBusQueue(`${config.COMPONENT_NAME}-events`, {
handler: serviceBusHandler,
connection: "SERVICE_BUS_CONNECTION_STRING",
queueName: config.QUEUE_NAME,
cardinality: "one",
});

GCP (Pub/Sub):

import { createPubSubHandler } from "@evolve-packages/messaging";

const pubsubHandler = createPubSubHandler((event) => {
return handleInternalEvent(event as CloudEventsPayload);
});

await pubsubHandler.start({
path: "/internal-events",
httpHost: config.HTTP_HOST,
httpPort: config.HTTP_PORT,
});

5. Configure the queue in Terraform

Create the cloud queue and subscribe it to the internal event bus. For AWS:

resource "aws_sqs_queue" "inventory_events" {
name = "${var.component_name}-inventory-events"
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.inventory_events_dlq.arn
maxReceiveCount = 4
})
}

Always include a dead-letter queue for messages that fail after the maximum number of delivery attempts.

6. Test locally

For local development, the messaging package uses Redis as its transport. The startDevelopmentListener polls the queue with brPop so you can test the full publish/consume cycle without cloud infrastructure:

import { startDevelopmentListener } from "@evolve-packages/messaging";

startDevelopmentListener(
config.REDIS_URL,
async (event) => {
await handleInternalEvent(event as CloudEventsPayload);
},
{ continueOnError: true, dlq: `${config.COMPONENT_NAME}-dlq` },
);

Further reading