event-driven
π―Skillfrom cosmix/claude-loom
Enables scalable, loosely-coupled systems by implementing event-driven architectures with message queues, pub/sub patterns, and distributed transaction management across various messaging platforms.
Part of
cosmix/claude-loom(12 items)
Installation
curl -fsSL https://raw.githubusercontent.com/cosmix/loom/main/install.sh | bashgit clone https://github.com/cosmix/loom.gitbash install.shSkill Details
Event-driven architecture patterns including message queues, pub/sub, event sourcing, CQRS, and sagas. Use when implementing async messaging, distributed transactions, event stores, command query separation, domain events, integration events, data streaming, choreography, orchestration, or integrating with RabbitMQ, Kafka, Apache Pulsar, AWS SQS, AWS SNS, NATS, event buses, or message brokers.
Overview
# Event-Driven Architecture
Overview
Event-driven architecture (EDA) enables loosely coupled, scalable systems by communicating through events rather than direct calls. This skill covers message queues, pub/sub patterns, event sourcing, CQRS, distributed transaction management with sagas, and data streaming with Kafka.
Available Agents
- senior-software-engineer (Opus) - Architecture design, pattern selection, distributed system design
- software-engineer (Sonnet) - Event handler implementation, consumer/producer code
- senior-software-engineer (Opus) - Event security, authorization patterns, message encryption
- senior-software-engineer (Opus) - Message broker setup, Kafka clusters, queue configuration
Key Concepts
Message Queues
RabbitMQ Implementation:
```typescript
import amqp, { Channel, Connection } from "amqplib";
interface QueueConfig {
name: string;
durable: boolean;
deadLetterExchange?: string;
messageTtl?: number;
maxRetries?: number;
}
class RabbitMQClient {
private connection: Connection | null = null;
private channel: Channel | null = null;
async connect(url: string): Promise
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// Handle connection errors
this.connection.on("error", (err) => {
console.error("RabbitMQ connection error:", err);
this.reconnect(url);
});
}
async setupQueue(config: QueueConfig): Promise
if (!this.channel) throw new Error("Not connected");
const options: amqp.Options.AssertQueue = {
durable: config.durable,
arguments: {},
};
if (config.deadLetterExchange) {
options.arguments!["x-dead-letter-exchange"] = config.deadLetterExchange;
}
if (config.messageTtl) {
options.arguments!["x-message-ttl"] = config.messageTtl;
}
await this.channel.assertQueue(config.name, options);
}
async publish(
queue: string,
message: unknown,
options?: PublishOptions
): Promise
if (!this.channel) throw new Error("Not connected");
const content = Buffer.from(JSON.stringify(message));
const publishOptions: amqp.Options.Publish = {
persistent: true,
messageId: options?.messageId || crypto.randomUUID(),
timestamp: Date.now(),
headers: options?.headers,
};
this.channel.sendToQueue(queue, content, publishOptions);
}
async consume
queue: string,
handler: (
message: T,
ack: () => void,
nack: (requeue?: boolean) => void
) => Promise
options?: ConsumeOptions
): Promise
if (!this.channel) throw new Error("Not connected");
await this.channel.prefetch(options?.prefetch || 10);
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const content: T = JSON.parse(msg.content.toString());
const retryCount =
(msg.properties.headers?.["x-retry-count"] as number) || 0;
await handler(
content,
() => this.channel!.ack(msg),
(requeue = false) => {
if (requeue && retryCount < (options?.maxRetries || 3)) {
// Requeue with incremented retry count
this.channel!.nack(msg, false, false);
this.publish(queue, content, {
headers: { "x-retry-count": retryCount + 1 },
});
} else {
this.channel!.nack(msg, false, false); // Send to DLQ
}
}
);
} catch (error) {
console.error("Message processing error:", error);
this.channel!.nack(msg, false, false);
}
});
}
}
```
AWS SQS Implementation:
```typescript
import {
SQSClient,
SendMessageCommand,
ReceiveMessageCommand,
DeleteMessageCommand,
} from "@aws-sdk/client-sqs";
interface SQSMessage
id: string;
body: T;
receiptHandle: string;
approximateReceiveCount: number;
}
class SQSQueue
private client: SQSClient;
private queueUrl: string;
constructor(queueUrl: string, region: string = "us-east-1") {
this.client = new SQSClient({ region });
this.queueUrl = queueUrl;
}
async send(
message: T,
options?: { delaySeconds?: number; deduplicationId?: string }
): Promise
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(message),
DelaySeconds: options?.delaySeconds,
MessageDeduplicationId: options?.deduplicationId,
MessageGroupId: options?.deduplicationId ? "default" : undefined,
});
const response = await this.client.send(command);
return response.MessageId!;
}
async receive(
maxMessages: number = 10,
waitTimeSeconds: number = 20
): Promise
const command = new ReceiveMessageCommand({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: maxMessages,
WaitTimeSeconds: waitTimeSeconds,
AttributeNames: ["ApproximateReceiveCount"],
});
const response = await this.client.send(command);
return (response.Messages || []).map((msg) => ({
id: msg.MessageId!,
body: JSON.parse(msg.Body!) as T,
receiptHandle: msg.ReceiptHandle!,
approximateReceiveCount: parseInt(
msg.Attributes?.ApproximateReceiveCount || "1"
),
}));
}
async delete(receiptHandle: string): Promise
const command = new DeleteMessageCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle,
});
await this.client.send(command);
}
async processMessages(
handler: (message: T) => Promise
options?: { maxRetries?: number; pollInterval?: number }
): Promise
const maxRetries = options?.maxRetries || 3;
while (true) {
const messages = await this.receive();
await Promise.all(
messages.map(async (msg) => {
try {
await handler(msg.body);
await this.delete(msg.receiptHandle);
} catch (error) {
console.error(Error processing message ${msg.id}:, error);
if (msg.approximateReceiveCount >= maxRetries) {
// Message will go to DLQ after visibility timeout
console.warn(Message ${msg.id} exceeded max retries);
}
// Don't delete - will be reprocessed after visibility timeout
}
})
);
if (messages.length === 0 && options?.pollInterval) {
await new Promise((r) => setTimeout(r, options.pollInterval));
}
}
}
}
```
Pub/Sub Patterns
Kafka Implementation:
```typescript
import { Kafka, Producer, Consumer, EachMessagePayload } from "kafkajs";
interface Event
id: string;
type: string;
timestamp: Date;
source: string;
data: T;
metadata?: Record
}
class KafkaEventBus {
private kafka: Kafka;
private producer: Producer | null = null;
private consumers: Map
constructor(config: { brokers: string[]; clientId: string }) {
this.kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
});
}
async connect(): Promise
this.producer = this.kafka.producer({
idempotent: true,
maxInFlightRequests: 5,
});
await this.producer.connect();
}
async publish
topic: string,
event: Omit
): Promise
if (!this.producer) throw new Error("Producer not connected");
const fullEvent: Event
...event,
id: crypto.randomUUID(),
timestamp: new Date(),
};
await this.producer.send({
topic,
messages: [
{
key:
event.data && typeof event.data === "object" && "id" in event.data
? String((event.data as { id: unknown }).id)
: fullEvent.id,
value: JSON.stringify(fullEvent),
headers: {
"event-type": event.type,
"event-source": event.source,
},
},
],
});
}
async subscribe
topics: string[],
groupId: string,
handler: (event: Event
options?: { fromBeginning?: boolean }
): Promise
const consumer = this.kafka.consumer({ groupId });
await consumer.connect();
for (const topic of topics) {
await consumer.subscribe({
topic,
fromBeginning: options?.fromBeginning,
});
}
this.consumers.set(groupId, consumer);
await consumer.run({
eachMessage: async ({
topic,
partition,
message,
}: EachMessagePayload) => {
try {
const event: Event
await handler(event);
} catch (error) {
console.error(
Error processing message from ${topic}:${partition}:,
error
);
throw error; // Will trigger retry based on consumer config
}
},
});
}
async disconnect(): Promise
await this.producer?.disconnect();
for (const consumer of this.consumers.values()) {
await consumer.disconnect();
}
}
}
// Usage
const eventBus = new KafkaEventBus({
brokers: ["localhost:9092"],
clientId: "order-service",
});
await eventBus.connect();
// Publish
await eventBus.publish
type: "order.created",
source: "order-service",
data: { orderId: "123", items: [], total: 99.99 },
});
// Subscribe
await eventBus.subscribe
["orders"],
"inventory-service",
async (event) => {
if (event.type === "order.created") {
await reserveInventory(event.data);
}
}
);
```
NATS Implementation:
```typescript
import {
connect,
NatsConnection,
StringCodec,
JetStreamManager,
JetStreamClient,
} from "nats";
class NATSEventBus {
private nc: NatsConnection | null = null;
private js: JetStreamClient | null = null;
private sc = StringCodec();
async connect(servers: string[]): Promise
this.nc = await connect({ servers });
// Setup JetStream for persistence
const jsm = await this.nc.jetstreamManager();
this.js = this.nc.jetstream();
// Create stream if not exists
try {
await jsm.streams.add({
name: "EVENTS",
subjects: ["events.*"],
retention: "limits",
max_msgs: 1000000,
max_age: 7 24 60 60 1000000000, // 7 days in nanoseconds
});
} catch (e) {
// Stream might already exist
}
}
async publish(subject: string, data: unknown): Promise
if (!this.js) throw new Error("Not connected");
await this.js.publish(
events.${subject},
this.sc.encode(JSON.stringify(data))
);
}
async subscribe(
subject: string,
durableName: string,
handler: (data: unknown) => Promise
): Promise
if (!this.js) throw new Error("Not connected");
const consumer = await this.js.consumers
.get("EVENTS", durableName)
.catch(async () => {
// Create consumer if not exists
const jsm = await this.nc!.jetstreamManager();
await jsm.consumers.add("EVENTS", {
durable_name: durableName,
filter_subject: events.${subject},
ack_policy: "explicit",
max_deliver: 3,
});
return this.js!.consumers.get("EVENTS", durableName);
});
const messages = await consumer.consume();
for await (const msg of messages) {
try {
const data = JSON.parse(this.sc.decode(msg.data));
await handler(data);
msg.ack();
} catch (error) {
console.error("Error processing message:", error);
msg.nak();
}
}
}
}
```
Event Sourcing
```typescript
interface DomainEvent {
id: string;
aggregateId: string;
aggregateType: string;
type: string;
version: number;
timestamp: Date;
data: unknown;
metadata: {
userId?: string;
correlationId?: string;
causationId?: string;
};
}
interface EventStore {
append(events: DomainEvent[]): Promise
getEvents(aggregateId: string, fromVersion?: number): Promise
getEventsByType(type: string, fromTimestamp?: Date): Promise
}
// PostgreSQL Event Store
class PostgresEventStore implements EventStore {
constructor(private pool: Pool) {}
async append(events: DomainEvent[]): Promise
const client = await this.pool.connect();
try {
await client.query("BEGIN");
for (const event of events) {
// Optimistic concurrency check
const { rows } = await client.query(
"SELECT MAX(version) as max_version FROM events WHERE aggregate_id = $1",
[event.aggregateId]
);
const currentVersion = rows[0]?.max_version || 0;
if (event.version !== currentVersion + 1) {
throw new ConcurrencyError(
Expected version ${currentVersion + 1}, got ${event.version}
);
}
await client.query(
`INSERT INTO events (id, aggregate_id, aggregate_type, type, version, timestamp, data, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[
event.id,
event.aggregateId,
event.aggregateType,
event.type,
event.version,
event.timestamp,
JSON.stringify(event.data),
JSON.stringify(event.metadata),
]
);
}
await client.query("COMMIT");
// Publish to event bus for projections
for (const event of events) {
await this.eventBus.publish(event);
}
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
async getEvents(
aggregateId: string,
fromVersion: number = 0
): Promise
const { rows } = await this.pool.query(
`SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC`,
[aggregateId, fromVersion]
);
return rows.map(this.rowToEvent);
}
}
// Aggregate base class
abstract class Aggregate {
private _id: string;
private _version: number = 0;
private _uncommittedEvents: DomainEvent[] = [];
get id(): string {
return this._id;
}
get version(): number {
return this._version;
}
constructor(id: string) {
this._id = id;
}
protected apply(
event: Omit<
DomainEvent,
"id" | "aggregateId" | "aggregateType" | "version" | "timestamp"
>
): void {
const domainEvent: DomainEvent = {
...event,
id: crypto.randomUUID(),
aggregateId: this._id,
aggregateType: this.constructor.name,
version: this._version + 1,
timestamp: new Date(),
};
this.when(domainEvent);
this._version = domainEvent.version;
this._uncommittedEvents.push(domainEvent);
}
protected abstract when(event: DomainEvent): void;
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event);
this._version = event.version;
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this._uncommittedEvents];
}
clearUncommittedEvents(): void {
this._uncommittedEvents = [];
}
}
// Example: Order Aggregate
class Order extends Aggregate {
private status:
| "pending"
| "confirmed"
| "shipped"
| "delivered"
| "cancelled" = "pending";
private items: OrderItem[] = [];
private total: number = 0;
static create(id: string, customerId: string, items: OrderItem[]): Order {
const order = new Order(id);
order.apply({
type: "OrderCreated",
data: { customerId, items },
metadata: {},
});
return order;
}
confirm(): void {
if (this.status !== "pending") {
throw new Error("Can only confirm pending orders");
}
this.apply({
type: "OrderConfirmed",
data: { confirmedAt: new Date() },
metadata: {},
});
}
cancel(reason: string): void {
if (["shipped", "delivered", "cancelled"].includes(this.status)) {
throw new Error("Cannot cancel order in current status");
}
this.apply({
type: "OrderCancelled",
data: { reason, cancelledAt: new Date() },
metadata: {},
});
}
protected when(event: DomainEvent): void {
switch (event.type) {
case "OrderCreated":
const data = event.data as { items: OrderItem[] };
this.items = data.items;
this.total = data.items.reduce(
(sum, item) => sum + item.price * item.quantity,
0
);
this.status = "pending";
break;
case "OrderConfirmed":
this.status = "confirmed";
break;
case "OrderCancelled":
this.status = "cancelled";
break;
}
}
}
```
CQRS (Command Query Responsibility Segregation)
```typescript
// Commands
interface Command {
type: string;
payload: unknown;
metadata: {
userId: string;
correlationId: string;
timestamp: Date;
};
}
interface CommandHandler
handle(command: T): Promise
}
// Command Bus
class CommandBus {
private handlers: Map
register
this.handlers.set(type, handler as CommandHandler
}
async dispatch(command: Command): Promise
const handler = this.handlers.get(command.type);
if (!handler) {
throw new Error(No handler registered for command: ${command.type});
}
await handler.handle(command);
}
}
// Queries
interface Query
type: string;
params: unknown;
}
interface QueryHandler
handle(query: TQuery): Promise
}
// Query Bus
class QueryBus {
private handlers: Map
new Map();
register
type: string,
handler: QueryHandler
): void {
this.handlers.set(type, handler as QueryHandler
}
async execute
const handler = this.handlers.get(query.type);
if (!handler) {
throw new Error(No handler registered for query: ${query.type});
}
return handler.handle(query) as Promise
}
}
// Read Model (Projection)
interface OrderReadModel {
id: string;
customerId: string;
customerName: string;
status: string;
items: Array<{
productId: string;
productName: string;
quantity: number;
price: number;
}>;
total: number;
createdAt: Date;
updatedAt: Date;
}
class OrderProjection {
constructor(private db: Database, private eventBus: EventBus) {
this.setupSubscriptions();
}
private setupSubscriptions(): void {
this.eventBus.subscribe("OrderCreated", this.onOrderCreated.bind(this));
this.eventBus.subscribe("OrderConfirmed", this.onOrderConfirmed.bind(this));
this.eventBus.subscribe("OrderCancelled", this.onOrderCancelled.bind(this));
}
private async onOrderCreated(event: DomainEvent): Promise
const data = event.data as OrderCreatedData;
// Enrich with customer data
const customer = await this.db.customers.findById(data.customerId);
// Enrich with product data
const items = await Promise.all(
data.items.map(async (item) => {
const product = await this.db.products.findById(item.productId);
return {
...item,
productName: product.name,
};
})
);
await this.db.orderReadModels.create({
id: event.aggregateId,
customerId: data.customerId,
customerName: customer.name,
status: "pending",
items,
total: items.reduce((sum, i) => sum + i.price * i.quantity, 0),
createdAt: event.timestamp,
updatedAt: event.timestamp,
});
}
private async onOrderConfirmed(event: DomainEvent): Promise
await this.db.orderReadModels.update(event.aggregateId, {
status: "confirmed",
updatedAt: event.timestamp,
});
}
private async onOrderCancelled(event: DomainEvent): Promise
await this.db.orderReadModels.update(event.aggregateId, {
status: "cancelled",
updatedAt: event.timestamp,
});
}
}
```
Saga Pattern for Distributed Transactions
```typescript
interface SagaStep
name: string;
execute: (data: TData) => Promise
compensate: (data: TData) => Promise
}
interface SagaDefinition
name: string;
steps: SagaStep
}
interface SagaInstance {
id: string;
sagaName: string;
data: unknown;
currentStep: number;
status: "running" | "completed" | "compensating" | "failed";
completedSteps: string[];
error?: string;
startedAt: Date;
updatedAt: Date;
}
class SagaOrchestrator {
private sagas: Map
private store: SagaStore;
register
this.sagas.set(saga.name, saga as SagaDefinition
}
async start
const saga = this.sagas.get(sagaName);
if (!saga) throw new Error(Saga not found: ${sagaName});
const instance: SagaInstance = {
id: crypto.randomUUID(),
sagaName,
data,
currentStep: 0,
status: "running",
completedSteps: [],
startedAt: new Date(),
updatedAt: new Date(),
};
await this.store.save(instance);
await this.executeNextStep(instance, saga);
return instance.id;
}
private async executeNextStep(
instance: SagaInstance,
saga: SagaDefinition
): Promise
if (instance.currentStep >= saga.steps.length) {
instance.status = "completed";
await this.store.save(instance);
return;
}
const step = saga.steps[instance.currentStep];
try {
await step.execute(instance.data);
instance.completedSteps.push(step.name);
instance.currentStep++;
instance.updatedAt = new Date();
await this.store.save(instance);
await this.executeNextStep(instance, saga);
} catch (error) {
instance.status = "compensating";
instance.error = error instanceof Error ? error.message : String(error);
await this.store.save(instance);
await this.compensate(instance, saga);
}
}
private async compensate(
instance: SagaInstance,
saga: SagaDefinition
): Promise
// Execute compensations in reverse order
for (let i = instance.completedSteps.length - 1; i >= 0; i--) {
const stepName = instance.completedSteps[i];
const step = saga.steps.find((s) => s.name === stepName);
if (step) {
try {
await step.compensate(instance.data);
} catch (error) {
console.error(Compensation failed for step ${stepName}:, error);
// Continue with other compensations
}
}
}
instance.status = "failed";
instance.updatedAt = new Date();
await this.store.save(instance);
}
}
// Example: Order Fulfillment Saga
interface OrderFulfillmentData {
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
paymentId?: string;
shipmentId?: string;
}
const orderFulfillmentSaga: SagaDefinition
name: "order-fulfillment",
steps: [
{
name: "reserve-inventory",
execute: async (data) => {
await inventoryService.reserve(data.items);
},
compensate: async (data) => {
await inventoryService.release(data.items);
},
},
{
name: "process-payment",
execute: async (data) => {
const total = data.items.reduce(
(sum, i) => sum + i.price * i.quantity,
0
);
const payment = await paymentService.charge(data.customerId, total);
data.paymentId = payment.id;
},
compensate: async (data) => {
if (data.paymentId) {
await paymentService.refund(data.paymentId);
}
},
},
{
name: "create-shipment",
execute: async (data) => {
const shipment = await shippingService.createShipment(
data.orderId,
data.items
);
data.shipmentId = shipment.id;
},
compensate: async (data) => {
if (data.shipmentId) {
await shippingService.cancelShipment(data.shipmentId);
}
},
},
{
name: "confirm-order",
execute: async (data) => {
await orderService.confirm(data.orderId);
},
compensate: async (data) => {
await orderService.cancel(data.orderId, "Saga compensation");
},
},
],
};
```
Idempotency and Exactly-Once Delivery
```typescript
interface IdempotencyKey {
key: string;
response?: unknown;
createdAt: Date;
expiresAt: Date;
}
class IdempotencyService {
constructor(private redis: Redis) {}
async process
key: string,
operation: () => Promise
ttlSeconds: number = 86400 // 24 hours
): Promise
const lockKey = idempotency:lock:${key};
const dataKey = idempotency:data:${key};
// Try to acquire lock
const locked = await this.redis.set(lockKey, "1", "EX", 30, "NX");
if (!locked) {
// Another process is handling this request, wait for result
return this.waitForResult
}
try {
// Check if already processed
const existing = await this.redis.get(dataKey);
if (existing) {
return JSON.parse(existing) as T;
}
// Execute operation
const result = await operation();
// Store result
await this.redis.setex(dataKey, ttlSeconds, JSON.stringify(result));
return result;
} finally {
await this.redis.del(lockKey);
}
}
private async waitForResult
dataKey: string,
maxWaitMs: number = 30000
): Promise
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const data = await this.redis.get(dataKey);
if (data) {
return JSON.parse(data) as T;
}
await new Promise((r) => setTimeout(r, 100));
}
throw new Error("Timeout waiting for idempotent operation result");
}
}
// Message deduplication for consumers
class DeduplicatingConsumer
constructor(
private redis: Redis,
private windowSeconds: number = 3600 // 1 hour dedup window
) {}
async process(
messageId: string,
handler: () => Promise
): Promise<{ result: T; duplicate: boolean }> {
const dedupKey = dedup:${messageId};
// Check if already processed
const existing = await this.redis.get(dedupKey);
if (existing) {
return { result: JSON.parse(existing) as T, duplicate: true };
}
// Process message
const result = await handler();
// Mark as processed
await this.redis.setex(
dedupKey,
this.windowSeconds,
JSON.stringify(result)
);
return { result, duplicate: false };
}
}
```
Dead Letter Queues
```typescript
interface DeadLetterMessage {
id: string;
originalQueue: string;
originalMessage: unknown;
error: string;
failedAt: Date;
retryCount: number;
lastRetryAt?: Date;
}
class DeadLetterQueueManager {
constructor(
private dlqStore: DLQStore,
private originalQueue: MessageQueue
) {}
async moveToDeadLetter(
message: unknown,
originalQueue: string,
error: Error,
retryCount: number
): Promise
const dlqMessage: DeadLetterMessage = {
id: crypto.randomUUID(),
originalQueue,
originalMessage: message,
error: error.message,
failedAt: new Date(),
retryCount,
};
await this.dlqStore.save(dlqMessage);
// Alert on DLQ growth
const dlqSize = await this.dlqStore.count(originalQueue);
if (dlqSize > 100) {
await this.alerting.warn({
title: "DLQ Size Warning",
message: Dead letter queue for ${originalQueue} has ${dlqSize} messages,
});
}
}
async retry(messageId: string): Promise
const dlqMessage = await this.dlqStore.get(messageId);
if (!dlqMessage) throw new Error("Message not found in DLQ");
try {
await this.originalQueue.publish(
dlqMessage.originalQueue,
dlqMessage.originalMessage
);
await this.dlqStore.delete(messageId);
} catch (error) {
dlqMessage.lastRetryAt = new Date();
dlqMessage.retryCount++;
await this.dlqStore.save(dlqMessage);
throw error;
}
}
async retryAll(queue: string): Promise<{ success: number; failed: number }> {
const messages = await this.dlqStore.getByQueue(queue);
let success = 0;
let failed = 0;
for (const message of messages) {
try {
await this.retry(message.id);
success++;
} catch {
failed++;
}
}
return { success, failed };
}
async purge(queue: string, olderThan?: Date): Promise
return this.dlqStore.deleteByQueue(queue, olderThan);
}
}
```
Data Streaming with Kafka
Stream Processing:
```typescript
import { Kafka, CompressionTypes } from "kafkajs";
interface StreamRecord
key: string;
value: T;
timestamp: number;
partition: number;
offset: string;
}
class KafkaStreamProcessor {
private kafka: Kafka;
constructor(brokers: string[]) {
this.kafka = new Kafka({
clientId: "stream-processor",
brokers,
});
}
// Stateful stream aggregation
async aggregateStream
inputTopic: string,
outputTopic: string,
groupId: string,
initialState: TState,
aggregator: (state: TState, record: TInput) => TState,
windowMs: number = 60000
): Promise
const consumer = this.kafka.consumer({ groupId });
const producer = this.kafka.producer({
compression: CompressionTypes.GZIP,
});
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: inputTopic });
const stateByKey = new Map
const windowTimers = new Map
await consumer.run({
eachMessage: async ({ message }) => {
const key = message.key?.toString() || "default";
const value: TInput = JSON.parse(message.value!.toString());
// Get or initialize state
const currentState = stateByKey.get(key) || initialState;
const newState = aggregator(currentState, value);
stateByKey.set(key, newState);
// Clear existing window timer
const existingTimer = windowTimers.get(key);
if (existingTimer) clearTimeout(existingTimer);
// Set new window timer to emit aggregated state
const timer = setTimeout(async () => {
const finalState = stateByKey.get(key);
await producer.send({
topic: outputTopic,
messages: [
{
key,
value: JSON.stringify(finalState),
timestamp: Date.now().toString(),
},
],
});
stateByKey.delete(key);
windowTimers.delete(key);
}, windowMs);
windowTimers.set(key, timer);
},
});
}
// Stream joins
async joinStreams
leftTopic: string,
rightTopic: string,
outputTopic: string,
groupId: string,
joiner: (left: TLeft, right: TRight) => TResult,
windowMs: number = 30000
): Promise
const consumer = this.kafka.consumer({ groupId });
const producer = this.kafka.producer();
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topics: [leftTopic, rightTopic] });
const leftCache = new Map
const rightCache = new Map
await consumer.run({
eachMessage: async ({ topic, message }) => {
const key = message.key?.toString() || "default";
const timestamp = parseInt(message.timestamp);
const now = Date.now();
// Clean old entries
this.cleanOldEntries(leftCache, now, windowMs);
this.cleanOldEntries(rightCache, now, windowMs);
if (topic === leftTopic) {
const leftData: TLeft = JSON.parse(message.value!.toString());
leftCache.set(key, { data: leftData, timestamp });
// Try to join with right
const rightEntry = rightCache.get(key);
if (
rightEntry &&
Math.abs(timestamp - rightEntry.timestamp) <= windowMs
) {
const result = joiner(leftData, rightEntry.data);
await producer.send({
topic: outputTopic,
messages: [{ key, value: JSON.stringify(result) }],
});
}
} else {
const rightData: TRight = JSON.parse(message.value!.toString());
rightCache.set(key, { data: rightData, timestamp });
// Try to join with left
const leftEntry = leftCache.get(key);
if (
leftEntry &&
Math.abs(timestamp - leftEntry.timestamp) <= windowMs
) {
const result = joiner(leftEntry.data, rightData);
await producer.send({
topic: outputTopic,
messages: [{ key, value: JSON.stringify(result) }],
});
}
}
},
});
}
private cleanOldEntries
cache: Map
now: number,
windowMs: number
): void {
for (const [key, entry] of cache.entries()) {
if (now - entry.timestamp > windowMs) {
cache.delete(key);
}
}
}
}
// Kafka Streams-style operations
class KafkaStream
constructor(private kafka: Kafka, private topic: string) {}
// Map transformation
map
const outputTopic = ${this.topic}-mapped;
this.processStream(outputTopic, async (record) => ({
key: record.key,
value: mapper(record.value),
}));
return new KafkaStream
}
// Filter transformation
filter(predicate: (value: T) => boolean): KafkaStream
const outputTopic = ${this.topic}-filtered;
this.processStream(outputTopic, async (record) =>
predicate(record.value) ? record : null
);
return new KafkaStream
}
// Group by key and aggregate
groupBy
keyExtractor: (value: T) => K,
aggregator: (key: K, values: T[]) => V,
windowMs: number = 60000
): KafkaStream
const outputTopic = ${this.topic}-grouped;
const groups = new Map
const timers = new Map
this.processStream(outputTopic, async (record, producer) => {
const key = String(keyExtractor(record.value));
const values = groups.get(key) || [];
values.push(record.value);
groups.set(key, values);
const existingTimer = timers.get(key);
if (existingTimer) clearTimeout(existingTimer);
const timer = setTimeout(async () => {
const groupValues = groups.get(key) || [];
const result = aggregator(keyExtractor(record.value), groupValues);
await producer.send({
topic: outputTopic,
messages: [{ key, value: JSON.stringify(result) }],
});
groups.delete(key);
timers.delete(key);
}, windowMs);
timers.set(key, timer);
return null; // Don't emit immediately
});
return new KafkaStream
}
private async processStream(
outputTopic: string,
processor: (
record: StreamRecord
producer: any
) => Promise<{ key: string; value: any } | null>
): Promise
const consumer = this.kafka.consumer({
groupId: ${this.topic}-processor,
});
const producer = this.kafka.producer();
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: this.topic });
await consumer.run({
eachMessage: async ({ message, partition }) => {
const record: StreamRecord
key: message.key?.toString() || "default",
value: JSON.parse(message.value!.toString()),
timestamp: parseInt(message.timestamp),
partition,
offset: message.offset,
};
const result = await processor(record, producer);
if (result) {
await producer.send({
topic: outputTopic,
messages: [
{
key: result.key,
value: JSON.stringify(result.value),
},
],
});
}
},
});
}
}
```
Event Sourcing Patterns
Snapshots for Performance:
```typescript
interface Snapshot {
aggregateId: string;
version: number;
state: unknown;
timestamp: Date;
}
class SnapshotStore {
constructor(private db: Database) {}
async save(snapshot: Snapshot): Promise
await this.db.snapshots.upsert({
aggregateId: snapshot.aggregateId,
version: snapshot.version,
state: JSON.stringify(snapshot.state),
timestamp: snapshot.timestamp,
});
}
async getLatest(aggregateId: string): Promise
const row = await this.db.snapshots.findOne(
{ aggregateId },
{ orderBy: { version: "desc" } }
);
return row
? {
aggregateId: row.aggregateId,
version: row.version,
state: JSON.parse(row.state),
timestamp: row.timestamp,
}
: null;
}
}
// Enhanced aggregate with snapshots
abstract class SnapshotAggregate extends Aggregate {
private static SNAPSHOT_FREQUENCY = 100; // Snapshot every 100 events
async load(
eventStore: EventStore,
snapshotStore: SnapshotStore
): Promise
// Try to load from snapshot first
const snapshot = await snapshotStore.getLatest(this.id);
if (snapshot) {
this.applySnapshot(snapshot.state);
this._version = snapshot.version;
// Load events since snapshot
const events = await eventStore.getEvents(this.id, snapshot.version);
this.loadFromHistory(events);
} else {
// No snapshot, load all events
const events = await eventStore.getEvents(this.id);
this.loadFromHistory(events);
}
}
async save(
eventStore: EventStore,
snapshotStore: SnapshotStore
): Promise
const events = this.getUncommittedEvents();
await eventStore.append(events);
this.clearUncommittedEvents();
// Create snapshot if needed
if (this.version % SnapshotAggregate.SNAPSHOT_FREQUENCY === 0) {
await snapshotStore.save({
aggregateId: this.id,
version: this.version,
state: this.createSnapshot(),
timestamp: new Date(),
});
}
}
protected abstract createSnapshot(): unknown;
protected abstract applySnapshot(state: unknown): void;
}
```
Event Upcasting (Schema Migration):
```typescript
interface EventUpcaster {
eventType: string;
fromVersion: number;
toVersion: number;
upcast: (event: DomainEvent) => DomainEvent;
}
class EventStoreWithUpcasting implements EventStore {
private upcasters: Map
registerUpcaster(upcaster: EventUpcaster): void {
const existing = this.upcasters.get(upcaster.eventType) || [];
existing.push(upcaster);
existing.sort((a, b) => a.fromVersion - b.fromVersion);
this.upcasters.set(upcaster.eventType, existing);
}
async getEvents(
aggregateId: string,
fromVersion: number = 0
): Promise
const rawEvents = await this.rawEventStore.getEvents(
aggregateId,
fromVersion
);
return rawEvents.map((event) => this.upcastEvent(event));
}
private upcastEvent(event: DomainEvent): DomainEvent {
const upcasters = this.upcasters.get(event.type) || [];
let currentEvent = event;
for (const upcaster of upcasters) {
const eventVersion = (currentEvent.data as any)?.schemaVersion || 1;
if (eventVersion === upcaster.fromVersion) {
currentEvent = upcaster.upcast(currentEvent);
}
}
return currentEvent;
}
}
// Example upcaster
const orderCreatedV1toV2: EventUpcaster = {
eventType: "OrderCreated",
fromVersion: 1,
toVersion: 2,
upcast: (event) => ({
...event,
data: {
...(event.data as any),
// V2 added shipping address separate from billing
shippingAddress: (event.data as any).address,
billingAddress: (event.data as any).address,
schemaVersion: 2,
},
}),
};
```
Saga Patterns
Choreography vs Orchestration:
```typescript
// CHOREOGRAPHY: Services react to events independently
class OrderService {
async onOrderCreated(event: OrderCreatedEvent): Promise
// Publish event, other services react
await this.eventBus.publish("order.created", {
orderId: event.orderId,
customerId: event.customerId,
items: event.items,
});
}
}
class InventoryService {
constructor(private eventBus: EventBus) {
// Listen and react to order events
this.eventBus.subscribe("order.created", this.reserveInventory.bind(this));
}
private async reserveInventory(event: OrderCreatedEvent): Promise
try {
await this.reserve(event.items);
// Publish success event
await this.eventBus.publish("inventory.reserved", {
orderId: event.orderId,
});
} catch (error) {
// Publish failure event
await this.eventBus.publish("inventory.reservation-failed", {
orderId: event.orderId,
reason: error.message,
});
}
}
}
class PaymentService {
constructor(private eventBus: EventBus) {
// Wait for inventory before processing payment
this.eventBus.subscribe(
"inventory.reserved",
this.processPayment.bind(this)
);
}
private async processPayment(event: InventoryReservedEvent): Promise
// Process payment and publish result...
}
}
// ORCHESTRATION: Central coordinator controls flow
class OrderFulfillmentOrchestrator {
async fulfillOrder(orderId: string): Promise
try {
// Step 1: Reserve inventory
await this.inventoryService.reserve(orderId);
// Step 2: Process payment
await this.paymentService.charge(orderId);
// Step 3: Create shipment
await this.shippingService.createShipment(orderId);
// Step 4: Confirm order
await this.orderService.confirm(orderId);
} catch (error) {
// Explicit compensation
await this.compensate(orderId);
}
}
private async compensate(orderId: string): Promise
// Undo in reverse order
await this.shippingService.cancelShipment(orderId);
await this.paymentService.refund(orderId);
await this.inventoryService.release(orderId);
await this.orderService.cancel(orderId);
}
}
```
Saga State Machine:
```typescript
type SagaState =
| "STARTED"
| "INVENTORY_RESERVED"
| "PAYMENT_PROCESSED"
| "SHIPPED"
| "COMPLETED"
| "COMPENSATING"
| "FAILED";
interface SagaStateMachine
state: SagaState;
data: TData;
transitions: Map
}
interface SagaTransition
onEnter: (data: TData) => Promise
onSuccess: SagaState;
onFailure: SagaState;
compensate?: (data: TData) => Promise
}
class StatefulSagaOrchestrator
async execute(saga: SagaStateMachine
let currentState = saga.state;
while (currentState !== "COMPLETED" && currentState !== "FAILED") {
const transition = saga.transitions.get(currentState);
if (!transition)
throw new Error(No transition for state: ${currentState});
try {
await transition.onEnter(saga.data);
currentState = transition.onSuccess;
saga.state = currentState;
await this.persistSaga(saga); // Save state
} catch (error) {
// Compensation
if (transition.compensate) {
await transition.compensate(saga.data);
}
currentState = transition.onFailure;
saga.state = currentState;
await this.persistSaga(saga);
}
}
}
private async persistSaga(saga: SagaStateMachine
// Save saga state for recovery
await this.sagaStore.save({
id: (saga.data as any).orderId,
state: saga.state,
data: saga.data,
updatedAt: new Date(),
});
}
}
```
Best Practices
- Event Design
- Events should be immutable and represent facts
- Use past tense naming (OrderCreated, not CreateOrder)
- Include all necessary data; avoid references to mutable state
- Version your events for schema evolution
- Idempotency
- Always design consumers to be idempotent
- Use unique message IDs for deduplication
- Store processing state to handle retries
- Error Handling
- Implement dead letter queues for failed messages
- Set reasonable retry limits with exponential backoff
- Monitor DLQ size and alert on growth
- Ordering
- Use partition keys for ordering guarantees in Kafka
- Understand at-least-once vs exactly-once semantics
- Design for out-of-order message handling when needed
- Monitoring
- Track message lag, processing time, and error rates
- Set up alerts for consumer lag
- Monitor event store growth and query performance
Examples
Complete Order Processing Flow
```typescript
// 1. API receives order request
app.post("/orders", async (req, res) => {
const command: CreateOrderCommand = {
type: "CreateOrder",
payload: req.body,
metadata: {
userId: req.user.id,
correlationId: req.headers["x-correlation-id"] as string,
timestamp: new Date(),
},
};
await commandBus.dispatch(command);
res.status(202).json({ message: "Order creation initiated" });
});
// 2. Command handler creates aggregate and persists events
class CreateOrderHandler implements CommandHandler
async handle(command: CreateOrderCommand): Promise
const order = Order.create(
crypto.randomUUID(),
command.payload.customerId,
command.payload.items
);
await this.eventStore.append(order.getUncommittedEvents());
}
}
// 3. Event published to Kafka, projections update read models
// 4. Saga orchestrator starts fulfillment process
// 5. Each saga step publishes events that update projections
```
More from this repository10
Systematically restructures code to enhance readability, maintainability, and performance while preserving its original behavior.
Validates and sanitizes data across various formats and use cases, ensuring data integrity and security.
Skill
Manages asynchronous task processing with robust job queues, scheduling, worker pools, and advanced retry strategies across various frameworks.
Comprehensively tests software across domains, implementing unit, integration, and end-to-end tests with TDD/BDD workflows and robust test architecture.
Implements robust authentication and authorization patterns including OAuth2, JWT, MFA, access control, and identity management.
Skill
Skill
Skill
Skill