SSE Server
```typescript
// sse-server.ts
import { Request, Response } from 'express';
import { EventEmitter } from 'events';
interface SSEClient {
id: string;
userId?: string;
res: Response;
lastEventId: number;
}
class SSEServer {
private clients = new Map();
private eventId = 0;
private emitter = new EventEmitter();
private heartbeatInterval: NodeJS.Timeout;
constructor(heartbeatMs = 30000) {
// Send heartbeats to keep connections alive
this.heartbeatInterval = setInterval(() => {
this.broadcast({ type: 'heartbeat', data: { timestamp: Date.now() } });
}, heartbeatMs);
}
connect(req: Request, res: Response, userId?: string): string {
const clientId = crypto.randomUUID();
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering
res.flushHeaders();
// Get last event ID for replay
const lastEventId = parseInt(req.headers['last-event-id'] as string) || 0;
const client: SSEClient = {
id: clientId,
userId,
res,
lastEventId,
};
this.clients.set(clientId, client);
// Send connection confirmation
this.sendToClient(client, {
type: 'connected',
data: { clientId },
});
// Handle disconnect
req.on('close', () => {
this.clients.delete(clientId);
this.emitter.emit('disconnect', clientId);
});
this.emitter.emit('connect', clientId, userId);
return clientId;
}
send(clientId: string, event: SSEEvent): boolean {
const client = this.clients.get(clientId);
if (!client) return false;
return this.sendToClient(client, event);
}
sendToUser(userId: string, event: SSEEvent): number {
let sent = 0;
for (const client of this.clients.values()) {
if (client.userId === userId) {
if (this.sendToClient(client, event)) sent++;
}
}
return sent;
}
broadcast(event: SSEEvent): number {
let sent = 0;
for (const client of this.clients.values()) {
if (this.sendToClient(client, event)) sent++;
}
return sent;
}
private sendToClient(client: SSEClient, event: SSEEvent): boolean {
try {
const id = ++this.eventId;
const data = JSON.stringify(event.data);
let message = '';
if (event.type) message += event: ${event.type}\n;
message += id: ${id}\n;
message += data: ${data}\n\n;
client.res.write(message);
return true;
} catch {
// Client disconnected
this.clients.delete(client.id);
return false;
}
}
getClientCount(): number {
return this.clients.size;
}
getUserClients(userId: string): string[] {
return Array.from(this.clients.values())
.filter(c => c.userId === userId)
.map(c => c.id);
}
onConnect(handler: (clientId: string, userId?: string) => void): void {
this.emitter.on('connect', handler);
}
onDisconnect(handler: (clientId: string) => void): void {
this.emitter.on('disconnect', handler);
}
close(): void {
clearInterval(this.heartbeatInterval);
for (const client of this.clients.values()) {
client.res.end();
}
this.clients.clear();
}
}
interface SSEEvent {
type?: string;
data: unknown;
}
export { SSEServer, SSEEvent, SSEClient };
```
Express Routes
```typescript
// sse-routes.ts
import { Router } from 'express';
import { SSEServer } from './sse-server';
const router = Router();
const sse = new SSEServer();
// SSE endpoint
router.get('/events', (req, res) => {
const userId = req.user?.id; // From auth middleware
sse.connect(req, res, userId);
});
// Send notification to specific user
router.post('/notify/:userId', (req, res) => {
const { userId } = req.params;
const { type, message } = req.body;
const sent = sse.sendToUser(userId, {
type: 'notification',
data: { type, message, timestamp: Date.now() },
});
res.json({ sent });
});
// Broadcast to all connected clients
router.post('/broadcast', (req, res) => {
const { type, data } = req.body;
const sent = sse.broadcast({ type, data });
res.json({ sent, total: sse.getClientCount() });
});
export { router as sseRouter, sse };
```
Progress Streaming
```typescript
// progress-stream.ts
import { SSEServer } from './sse-server';
interface ProgressUpdate {
taskId: string;
progress: number; // 0-100
status: 'pending' | 'running' | 'completed' | 'failed';
message?: string;
}
class ProgressTracker {
constructor(private sse: SSEServer) {}
async trackTask(
taskId: string,
userId: string,
task: (update: (progress: number, message?: string) => void) => Promise
): Promise {
const sendUpdate = (update: ProgressUpdate) => {
this.sse.sendToUser(userId, {
type: 'progress',
data: update,
});
};
sendUpdate({ taskId, progress: 0, status: 'running' });
try {
const result = await task((progress, message) => {
sendUpdate({ taskId, progress, status: 'running', message });
});
sendUpdate({ taskId, progress: 100, status: 'completed' });
return result;
} catch (error) {
sendUpdate({
taskId,
progress: 0,
status: 'failed',
message: error instanceof Error ? error.message : 'Unknown error',
});
throw error;
}
}
}
// Usage
const tracker = new ProgressTracker(sse);
app.post('/process', async (req, res) => {
const taskId = crypto.randomUUID();
const userId = req.user.id;
// Return immediately with task ID
res.json({ taskId });
// Process in background with progress updates
tracker.trackTask(taskId, userId, async (update) => {
for (let i = 0; i <= 100; i += 10) {
await doSomeWork();
update(i, Processing step ${i / 10});
}
});
});
```