Job Types and Queue
```typescript
// types.ts
type JobStatus = 'pending' | 'running' | 'success' | 'failed' | 'retrying' | 'cancelled';
interface Job {
id: string;
type: string;
payload: T;
status: JobStatus;
attempts: number;
maxAttempts: number;
createdAt: Date;
scheduledFor: Date;
startedAt?: Date;
completedAt?: Date;
error?: string;
result?: unknown;
}
interface JobHandler {
(payload: T, job: Job): Promise;
}
```
Queue Implementation (Redis)
```typescript
// queue.ts
import { Redis } from 'ioredis';
import { v4 as uuid } from 'uuid';
class JobQueue {
private redis: Redis;
private handlers = new Map();
constructor(redis: Redis) {
this.redis = redis;
}
register(type: string, handler: JobHandler): void {
this.handlers.set(type, handler as JobHandler);
}
async enqueue(
type: string,
payload: T,
options: { delay?: number; maxAttempts?: number } = {}
): Promise {
const job: Job = {
id: uuid(),
type,
payload,
status: 'pending',
attempts: 0,
maxAttempts: options.maxAttempts || 3,
createdAt: new Date(),
scheduledFor: new Date(Date.now() + (options.delay || 0)),
};
await this.redis.zadd(
'jobs:pending',
job.scheduledFor.getTime(),
JSON.stringify(job)
);
return job.id;
}
async process(): Promise {
while (true) {
const result = await this.redis.bzpopmin('jobs:pending', 1);
if (!result) continue;
const job: Job = JSON.parse(result[1]);
if (job.scheduledFor.getTime() > Date.now()) {
// Not ready yet, put back
await this.redis.zadd('jobs:pending', job.scheduledFor.getTime(), JSON.stringify(job));
continue;
}
await this.executeJob(job);
}
}
private async executeJob(job: Job): Promise {
const handler = this.handlers.get(job.type);
if (!handler) {
console.error(No handler for job type: ${job.type});
return;
}
job.status = 'running';
job.attempts++;
job.startedAt = new Date();
try {
job.result = await handler(job.payload, job);
job.status = 'success';
job.completedAt = new Date();
await this.redis.hset('jobs:completed', job.id, JSON.stringify(job));
} catch (error) {
job.error = error instanceof Error ? error.message : String(error);
if (job.attempts < job.maxAttempts) {
job.status = 'retrying';
const backoff = Math.pow(2, job.attempts) * 1000; // Exponential backoff
job.scheduledFor = new Date(Date.now() + backoff);
await this.redis.zadd('jobs:pending', job.scheduledFor.getTime(), JSON.stringify(job));
} else {
job.status = 'failed';
job.completedAt = new Date();
// Move to dead letter queue
await this.redis.lpush('jobs:dlq', JSON.stringify(job));
}
}
}
}
export { JobQueue, Job, JobHandler };
```
Job Handlers
```typescript
// handlers/email.ts
import { JobHandler } from '../queue';
interface SendEmailPayload {
to: string;
subject: string;
template: string;
data: Record;
}
export const sendEmailHandler: JobHandler = async (payload) => {
const { to, subject, template, data } = payload;
// Render template
const html = await renderTemplate(template, data);
// Send via email provider
await emailProvider.send({
to,
subject,
html,
});
return { sent: true, to };
};
// handlers/webhook.ts
interface WebhookPayload {
url: string;
event: string;
data: unknown;
}
export const webhookHandler: JobHandler = async (payload, job) => {
const response = await fetch(payload.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Event': payload.event,
'X-Webhook-Delivery': job.id,
},
body: JSON.stringify(payload.data),
});
if (!response.ok) {
throw new Error(Webhook failed: ${response.status});
}
return { status: response.status };
};
```
Worker Process
```typescript
// worker.ts
import { Redis } from 'ioredis';
import { JobQueue } from './queue';
import { sendEmailHandler } from './handlers/email';
import { webhookHandler } from './handlers/webhook';
const redis = new Redis(process.env.REDIS_URL);
const queue = new JobQueue(redis);
// Register handlers
queue.register('send-email', sendEmailHandler);
queue.register('webhook', webhookHandler);
// Graceful shutdown
let isShuttingDown = false;
process.on('SIGTERM', () => {
console.log('Received SIGTERM, shutting down gracefully...');
isShuttingDown = true;
});
// Start processing
console.log('Worker started, waiting for jobs...');
queue.process();
```