🎯

sse-streaming

🎯Skill

from dadbodgeoff/drift

VibeIndex|
What it does

Streams real-time server updates to clients using Server-Sent Events (SSE) with automatic reconnection and heartbeats.

πŸ“¦

Part of

dadbodgeoff/drift(69 items)

sse-streaming

Installation

npm installInstall npm package
npm install -g driftdetect
npm installInstall npm package
npm install -g driftdetect@latest
npm installInstall npm package
npm install -g driftdetect-mcp
Claude Desktop ConfigurationAdd this to your claude_desktop_config.json
{ "mcpServers": { "drift": { "command": "driftdetect-mcp" } } ...
πŸ“– Extracted from docs: dadbodgeoff/drift
4Installs
-
AddedFeb 4, 2026

Skill Details

SKILL.md

Implement Server-Sent Events (SSE) for real-time updates with automatic reconnection and heartbeats. Use when building live dashboards, notifications, progress indicators, or any feature needing server-to-client push.

Overview

# Server-Sent Events (SSE)

Real-time server-to-client streaming with automatic reconnection.

When to Use This Skill

  • Live dashboards and monitoring
  • Real-time notifications
  • Progress indicators for long operations
  • Live feeds (activity, chat, updates)
  • AI streaming responses

SSE vs WebSockets

| Feature | SSE | WebSocket |

|---------|-----|-----------|

| Direction | Server β†’ Client | Bidirectional |

| Protocol | HTTP | WS |

| Reconnection | Automatic | Manual |

| Complexity | Simple | Complex |

| Browser Support | Native | Native |

| Through Proxies | Easy | Can be tricky |

Use SSE when: You only need server-to-client push

Use WebSocket when: You need bidirectional communication

TypeScript Implementation

SSE Server

```typescript

// sse-server.ts

import { Request, Response } from 'express';

import { EventEmitter } from 'events';

interface SSEClient {

id: string;

userId?: string;

res: Response;

lastEventId: number;

}

class SSEServer {

private clients = new Map();

private eventId = 0;

private emitter = new EventEmitter();

private heartbeatInterval: NodeJS.Timeout;

constructor(heartbeatMs = 30000) {

// Send heartbeats to keep connections alive

this.heartbeatInterval = setInterval(() => {

this.broadcast({ type: 'heartbeat', data: { timestamp: Date.now() } });

}, heartbeatMs);

}

connect(req: Request, res: Response, userId?: string): string {

const clientId = crypto.randomUUID();

// Set SSE headers

res.setHeader('Content-Type', 'text/event-stream');

res.setHeader('Cache-Control', 'no-cache');

res.setHeader('Connection', 'keep-alive');

res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering

res.flushHeaders();

// Get last event ID for replay

const lastEventId = parseInt(req.headers['last-event-id'] as string) || 0;

const client: SSEClient = {

id: clientId,

userId,

res,

lastEventId,

};

this.clients.set(clientId, client);

// Send connection confirmation

this.sendToClient(client, {

type: 'connected',

data: { clientId },

});

// Handle disconnect

req.on('close', () => {

this.clients.delete(clientId);

this.emitter.emit('disconnect', clientId);

});

this.emitter.emit('connect', clientId, userId);

return clientId;

}

send(clientId: string, event: SSEEvent): boolean {

const client = this.clients.get(clientId);

if (!client) return false;

return this.sendToClient(client, event);

}

sendToUser(userId: string, event: SSEEvent): number {

let sent = 0;

for (const client of this.clients.values()) {

if (client.userId === userId) {

if (this.sendToClient(client, event)) sent++;

}

}

return sent;

}

broadcast(event: SSEEvent): number {

let sent = 0;

for (const client of this.clients.values()) {

if (this.sendToClient(client, event)) sent++;

}

return sent;

}

private sendToClient(client: SSEClient, event: SSEEvent): boolean {

try {

const id = ++this.eventId;

const data = JSON.stringify(event.data);

let message = '';

if (event.type) message += event: ${event.type}\n;

message += id: ${id}\n;

message += data: ${data}\n\n;

client.res.write(message);

return true;

} catch {

// Client disconnected

this.clients.delete(client.id);

return false;

}

}

getClientCount(): number {

return this.clients.size;

}

getUserClients(userId: string): string[] {

return Array.from(this.clients.values())

.filter(c => c.userId === userId)

.map(c => c.id);

}

onConnect(handler: (clientId: string, userId?: string) => void): void {

this.emitter.on('connect', handler);

}

onDisconnect(handler: (clientId: string) => void): void {

this.emitter.on('disconnect', handler);

}

close(): void {

clearInterval(this.heartbeatInterval);

for (const client of this.clients.values()) {

client.res.end();

}

this.clients.clear();

}

}

interface SSEEvent {

type?: string;

data: unknown;

}

export { SSEServer, SSEEvent, SSEClient };

```

Express Routes

```typescript

// sse-routes.ts

import { Router } from 'express';

import { SSEServer } from './sse-server';

const router = Router();

const sse = new SSEServer();

// SSE endpoint

router.get('/events', (req, res) => {

const userId = req.user?.id; // From auth middleware

sse.connect(req, res, userId);

});

// Send notification to specific user

router.post('/notify/:userId', (req, res) => {

const { userId } = req.params;

const { type, message } = req.body;

const sent = sse.sendToUser(userId, {

type: 'notification',

data: { type, message, timestamp: Date.now() },

});

res.json({ sent });

});

// Broadcast to all connected clients

router.post('/broadcast', (req, res) => {

const { type, data } = req.body;

const sent = sse.broadcast({ type, data });

res.json({ sent, total: sse.getClientCount() });

});

export { router as sseRouter, sse };

```

Progress Streaming

```typescript

// progress-stream.ts

import { SSEServer } from './sse-server';

interface ProgressUpdate {

taskId: string;

progress: number; // 0-100

status: 'pending' | 'running' | 'completed' | 'failed';

message?: string;

}

class ProgressTracker {

constructor(private sse: SSEServer) {}

async trackTask(

taskId: string,

userId: string,

task: (update: (progress: number, message?: string) => void) => Promise

): Promise {

const sendUpdate = (update: ProgressUpdate) => {

this.sse.sendToUser(userId, {

type: 'progress',

data: update,

});

};

sendUpdate({ taskId, progress: 0, status: 'running' });

try {

const result = await task((progress, message) => {

sendUpdate({ taskId, progress, status: 'running', message });

});

sendUpdate({ taskId, progress: 100, status: 'completed' });

return result;

} catch (error) {

sendUpdate({

taskId,

progress: 0,

status: 'failed',

message: error instanceof Error ? error.message : 'Unknown error',

});

throw error;

}

}

}

// Usage

const tracker = new ProgressTracker(sse);

app.post('/process', async (req, res) => {

const taskId = crypto.randomUUID();

const userId = req.user.id;

// Return immediately with task ID

res.json({ taskId });

// Process in background with progress updates

tracker.trackTask(taskId, userId, async (update) => {

for (let i = 0; i <= 100; i += 10) {

await doSomeWork();

update(i, Processing step ${i / 10});

}

});

});

```

Python Implementation

```python

# sse_server.py

import asyncio

import json

import uuid

from typing import Dict, Optional, AsyncGenerator, Callable

from dataclasses import dataclass

from fastapi import Request

from fastapi.responses import StreamingResponse

@dataclass

class SSEClient:

id: str

user_id: Optional[str]

queue: asyncio.Queue

class SSEServer:

def __init__(self, heartbeat_seconds: int = 30):

self.clients: Dict[str, SSEClient] = {}

self.event_id = 0

self.heartbeat_seconds = heartbeat_seconds

self._heartbeat_task: Optional[asyncio.Task] = None

async def start(self):

self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())

async def stop(self):

if self._heartbeat_task:

self._heartbeat_task.cancel()

for client in self.clients.values():

await client.queue.put(None) # Signal disconnect

self.clients.clear()

async def _heartbeat_loop(self):

while True:

await asyncio.sleep(self.heartbeat_seconds)

await self.broadcast("heartbeat", {"timestamp": asyncio.get_event_loop().time()})

async def connect(self, request: Request, user_id: Optional[str] = None) -> StreamingResponse:

client_id = str(uuid.uuid4())

queue: asyncio.Queue = asyncio.Queue()

client = SSEClient(id=client_id, user_id=user_id, queue=queue)

self.clients[client_id] = client

async def event_generator() -> AsyncGenerator[str, None]:

# Send connection event

yield self._format_event("connected", {"clientId": client_id})

try:

while True:

event = await queue.get()

if event is None: # Disconnect signal

break

yield event

finally:

self.clients.pop(client_id, None)

return StreamingResponse(

event_generator(),

media_type="text/event-stream",

headers={

"Cache-Control": "no-cache",

"Connection": "keep-alive",

"X-Accel-Buffering": "no",

},

)

def _format_event(self, event_type: str, data: dict) -> str:

self.event_id += 1

return f"event: {event_type}\nid: {self.event_id}\ndata: {json.dumps(data)}\n\n"

async def send(self, client_id: str, event_type: str, data: dict) -> bool:

client = self.clients.get(client_id)

if not client:

return False

await client.queue.put(self._format_event(event_type, data))

return True

async def send_to_user(self, user_id: str, event_type: str, data: dict) -> int:

sent = 0

for client in self.clients.values():

if client.user_id == user_id:

await client.queue.put(self._format_event(event_type, data))

sent += 1

return sent

async def broadcast(self, event_type: str, data: dict) -> int:

event = self._format_event(event_type, data)

for client in self.clients.values():

await client.queue.put(event)

return len(self.clients)

```

FastAPI Routes

```python

# sse_routes.py

from fastapi import APIRouter, Request, Depends

from .sse_server import SSEServer

router = APIRouter()

sse = SSEServer()

@router.on_event("startup")

async def startup():

await sse.start()

@router.on_event("shutdown")

async def shutdown():

await sse.stop()

@router.get("/events")

async def events(request: Request, user_id: str = Depends(get_current_user_id)):

return await sse.connect(request, user_id)

@router.post("/notify/{user_id}")

async def notify(user_id: str, event_type: str, message: str):

sent = await sse.send_to_user(user_id, "notification", {

"type": event_type,

"message": message,

})

return {"sent": sent}

```

Frontend Client

```typescript

// sse-client.ts

interface SSEClientOptions {

url: string;

onMessage?: (event: MessageEvent) => void;

onError?: (error: Event) => void;

onOpen?: () => void;

reconnectDelay?: number;

maxRetries?: number;

}

class SSEClient {

private eventSource: EventSource | null = null;

private retries = 0;

private handlers = new Map void>>();

constructor(private options: SSEClientOptions) {}

connect(): void {

if (this.eventSource) {

this.eventSource.close();

}

this.eventSource = new EventSource(this.options.url);

this.eventSource.onopen = () => {

this.retries = 0;

this.options.onOpen?.();

};

this.eventSource.onerror = (error) => {

this.options.onError?.(error);

this.handleReconnect();

};

this.eventSource.onmessage = (event) => {

this.options.onMessage?.(event);

};

// Register typed event handlers

for (const [type, handlers] of this.handlers) {

this.eventSource.addEventListener(type, (event: MessageEvent) => {

const data = JSON.parse(event.data);

handlers.forEach(handler => handler(data));

});

}

}

on(eventType: string, handler: (data: T) => void): () => void {

if (!this.handlers.has(eventType)) {

this.handlers.set(eventType, new Set());

// Add listener if already connected

if (this.eventSource) {

this.eventSource.addEventListener(eventType, (event: MessageEvent) => {

const data = JSON.parse(event.data);

this.handlers.get(eventType)?.forEach(h => h(data));

});

}

}

this.handlers.get(eventType)!.add(handler);

// Return unsubscribe function

return () => {

this.handlers.get(eventType)?.delete(handler);

};

}

private handleReconnect(): void {

const maxRetries = this.options.maxRetries ?? 10;

const delay = this.options.reconnectDelay ?? 1000;

if (this.retries >= maxRetries) {

console.error('SSE: Max retries reached');

return;

}

this.retries++;

const backoff = delay * Math.pow(2, this.retries - 1);

console.log(SSE: Reconnecting in ${backoff}ms (attempt ${this.retries}));

setTimeout(() => this.connect(), backoff);

}

disconnect(): void {

if (this.eventSource) {

this.eventSource.close();

this.eventSource = null;

}

}

}

// Usage

const sse = new SSEClient({ url: '/api/events' });

sse.on<{ type: string; message: string }>('notification', (data) => {

showNotification(data.type, data.message);

});

sse.on<{ taskId: string; progress: number }>('progress', (data) => {

updateProgressBar(data.taskId, data.progress);

});

sse.connect();

```

React Hook

```typescript

// useSSE.ts

import { useEffect, useRef, useCallback } from 'react';

function useSSE(url: string) {

const eventSourceRef = useRef(null);

const handlersRef = useRef void>>(new Map());

useEffect(() => {

const eventSource = new EventSource(url);

eventSourceRef.current = eventSource;

eventSource.onerror = () => {

// Browser handles reconnection automatically

console.log('SSE connection error, reconnecting...');

};

return () => {

eventSource.close();

};

}, [url]);

const subscribe = useCallback((

eventType: string,

handler: (data: T) => void

) => {

const eventSource = eventSourceRef.current;

if (!eventSource) return () => {};

const wrappedHandler = (event: MessageEvent) => {

handler(JSON.parse(event.data));

};

eventSource.addEventListener(eventType, wrappedHandler);

return () => {

eventSource.removeEventListener(eventType, wrappedHandler);

};

}, []);

return { subscribe };

}

// Usage in component

function Dashboard() {

const { subscribe } = useSSE('/api/events');

const [notifications, setNotifications] = useState([]);

useEffect(() => {

return subscribe('notification', (data) => {

setNotifications(prev => [...prev, data]);

});

}, [subscribe]);

return ;

}

```

Best Practices

  1. Send heartbeats: Keep connections alive through proxies
  2. Use event IDs: Enable automatic replay on reconnect
  3. Disable buffering: Set X-Accel-Buffering: no for nginx
  4. Handle reconnection: Browser does it automatically, but add backoff
  5. Clean up on disconnect: Remove clients from memory

Common Mistakes

  • Not disabling proxy buffering (events arrive in batches)
  • Not sending heartbeats (connections timeout)
  • Not handling client disconnects (memory leak)
  • Using SSE for bidirectional communication (use WebSocket)
  • Not using event IDs (lose events on reconnect)

More from this repository10

🎯
feature-flags🎯Skill

Enables controlled feature rollouts, A/B testing, and selective feature access through configurable flags for gradual deployment and user targeting.

🎯
design-tokens🎯Skill

Generates a comprehensive, type-safe design token system with WCAG AA color compliance and multi-framework support for consistent visual design.

🎯
file-uploads🎯Skill

Securely validates, scans, and processes file uploads with multi-stage checks, malware detection, and race condition prevention.

🎯
ai-coaching🎯Skill

Guides users through articulating creative intent by extracting structured parameters and detecting conversation readiness.

🎯
environment-config🎯Skill

Validates and centralizes environment variables with type safety, fail-fast startup checks, and multi-environment support.

🎯
community-feed🎯Skill

Generates efficient social feed with cursor pagination, trending algorithms, and engagement tracking for infinite scroll experiences.

🎯
cloud-storage🎯Skill

Enables secure, multi-tenant cloud file storage with signed URLs, direct uploads, and visibility control for user-uploaded assets.

🎯
email-service🎯Skill

Simplifies email sending, templating, and tracking with robust SMTP integration and support for multiple email providers and transactional workflows.

🎯
error-sanitization🎯Skill

Sanitizes error messages by logging full details server-side while exposing only generic, safe messages to prevent sensitive information leakage.

🎯
batch-processing🎯Skill

Optimizes database operations by collecting and batching independent records, improving throughput by 30-40% with built-in fallback processing.