🎯

sse-resilience

🎯Skill

from dadbodgeoff/drift

VibeIndex|
What it does

Manages resilient Server-Sent Events (SSE) streams using Redis for tracking, heartbeating, and automatic recovery of orphaned streams.

πŸ“¦

Part of

dadbodgeoff/drift(69 items)

sse-resilience

Installation

npm installInstall npm package
npm install -g driftdetect
npm installInstall npm package
npm install -g driftdetect@latest
npm installInstall npm package
npm install -g driftdetect-mcp
Claude Desktop ConfigurationAdd this to your claude_desktop_config.json
{ "mcpServers": { "drift": { "command": "driftdetect-mcp" } } ...
πŸ“– Extracted from docs: dadbodgeoff/drift
4Installs
-
AddedFeb 4, 2026

Skill Details

SKILL.md

Redis-backed SSE stream management with stream registry, heartbeat monitoring, completion store for terminal events, and automatic orphan cleanup via background guardian process.

Overview

# SSE Stream Resilience

Redis-backed stream management with heartbeat monitoring and completion recovery.

When to Use This Skill

  • SSE streams can fail silently (client disconnects mid-stream)
  • Completion events get lost and users never see results
  • Need visibility into stream health
  • Want to prevent resource leaks from abandoned streams

Core Concepts

The solution provides:

  • Stream registry (track all active streams in Redis)
  • Heartbeat monitoring (detect orphaned streams)
  • Completion store (persist terminal events for recovery)
  • Stream guardian (background cleanup process)

```

Client ←→ SSE Endpoint ←→ Stream Registry (Redis)

↓

Completion Store (Redis)

↓

Stream Guardian (Background)

```

Implementation

TypeScript

```typescript

// lib/sse/types.ts

export enum StreamState {

ACTIVE = 'active',

COMPLETED = 'completed',

FAILED = 'failed',

ORPHANED = 'orphaned',

}

export interface StreamMetadata {

streamId: string;

streamType: string;

userId: string;

startedAt: Date;

lastHeartbeat: Date;

state: StreamState;

metadata: Record;

}

export interface CompletionData {

streamId: string;

terminalEventType: string;

terminalEventData: Record;

completedAt: Date;

}

// lib/sse/stream-registry.ts

const STREAM_KEY_PREFIX = 'sse:stream:';

const ACTIVE_STREAMS_KEY = 'sse:active';

const STREAM_TTL = 3600; // 1 hour max lifetime

const STALE_THRESHOLD = 30; // 30 seconds = stale

export class StreamRegistry {

constructor(private redis: Redis) {}

async register(metadata: StreamMetadata): Promise {

const streamKey = ${STREAM_KEY_PREFIX}${metadata.streamId};

if (await this.redis.exists(streamKey)) {

return false;

}

const pipeline = this.redis.pipeline();

pipeline.hset(streamKey, {

streamId: metadata.streamId,

streamType: metadata.streamType,

userId: metadata.userId,

startedAt: metadata.startedAt.toISOString(),

lastHeartbeat: metadata.lastHeartbeat.toISOString(),

state: metadata.state,

metadata: JSON.stringify(metadata.metadata),

});

pipeline.expire(streamKey, STREAM_TTL);

pipeline.zadd(ACTIVE_STREAMS_KEY, metadata.lastHeartbeat.getTime(), metadata.streamId);

await pipeline.exec();

return true;

}

async heartbeat(streamId: string): Promise {

const streamKey = ${STREAM_KEY_PREFIX}${streamId};

const now = new Date();

if (!await this.redis.exists(streamKey)) {

return false;

}

const pipeline = this.redis.pipeline();

pipeline.hset(streamKey, 'lastHeartbeat', now.toISOString());

pipeline.zadd(ACTIVE_STREAMS_KEY, now.getTime(), streamId);

await pipeline.exec();

return true;

}

async unregister(streamId: string): Promise {

const streamKey = ${STREAM_KEY_PREFIX}${streamId};

const userId = await this.redis.hget(streamKey, 'userId');

if (!userId) return false;

const pipeline = this.redis.pipeline();

pipeline.del(streamKey);

pipeline.zrem(ACTIVE_STREAMS_KEY, streamId);

await pipeline.exec();

return true;

}

async getStaleStreams(thresholdSeconds = STALE_THRESHOLD): Promise {

const cutoff = Date.now() - (thresholdSeconds * 1000);

const staleIds = await this.redis.zrangebyscore(ACTIVE_STREAMS_KEY, 0, cutoff);

const streams: StreamMetadata[] = [];

for (const streamId of staleIds) {

const stream = await this.getStream(streamId);

if (stream && stream.state === StreamState.ACTIVE) {

streams.push(stream);

}

}

return streams;

}

async updateState(streamId: string, state: StreamState): Promise {

const streamKey = ${STREAM_KEY_PREFIX}${streamId};

if (!await this.redis.exists(streamKey)) return false;

await this.redis.hset(streamKey, 'state', state);

return true;

}

}

// lib/sse/completion-store.ts

const COMPLETION_KEY_PREFIX = 'sse:completion:';

const COMPLETION_TTL = 300; // 5 minutes for recovery window

export class CompletionStore {

constructor(private redis: Redis) {}

async storeCompletion(data: CompletionData): Promise {

const key = ${COMPLETION_KEY_PREFIX}${data.streamId};

await this.redis.hset(key, {

streamId: data.streamId,

terminalEventType: data.terminalEventType,

terminalEventData: JSON.stringify(data.terminalEventData),

completedAt: data.completedAt.toISOString(),

});

await this.redis.expire(key, COMPLETION_TTL);

}

async getCompletion(streamId: string): Promise {

const key = ${COMPLETION_KEY_PREFIX}${streamId};

const data = await this.redis.hgetall(key);

if (!data.streamId) return null;

return {

streamId: data.streamId,

terminalEventType: data.terminalEventType,

terminalEventData: JSON.parse(data.terminalEventData || '{}'),

completedAt: new Date(data.completedAt),

};

}

}

// lib/sse/stream-guardian.ts

export class StreamGuardian {

private intervalId: NodeJS.Timeout | null = null;

constructor(

private registry: StreamRegistry,

private completionStore: CompletionStore,

private checkIntervalMs = 30000

) {}

start(): void {

if (this.intervalId) return;

this.intervalId = setInterval(

() => this.runCheck(),

this.checkIntervalMs

);

}

stop(): void {

if (this.intervalId) {

clearInterval(this.intervalId);

this.intervalId = null;

}

}

private async runCheck(): Promise {

try {

const staleStreams = await this.registry.getStaleStreams();

for (const stream of staleStreams) {

await this.handleOrphanedStream(stream);

}

} catch (err) {

console.error('Stream Guardian error:', err);

}

}

private async handleOrphanedStream(stream: StreamMetadata): Promise {

console.log(Handling orphaned stream: ${stream.streamId});

await this.registry.updateState(stream.streamId, StreamState.ORPHANED);

}

}

```

SSE Endpoint

```typescript

// app/api/stream/[streamId]/route.ts

export async function GET(req: Request, { params }: { params: { streamId: string } }) {

const userId = req.headers.get('x-user-id')!;

const streamId = params.streamId;

// Check for existing completion (recovery)

const existingCompletion = await completionStore.getCompletion(streamId);

if (existingCompletion) {

return new Response(

`data: ${JSON.stringify({

type: existingCompletion.terminalEventType,

data: existingCompletion.terminalEventData,

})}\n\n`,

{ headers: { 'Content-Type': 'text/event-stream' } }

);

}

// Register new stream

await registry.register({

streamId,

streamType: 'generation',

userId,

startedAt: new Date(),

lastHeartbeat: new Date(),

state: StreamState.ACTIVE,

metadata: {},

});

const encoder = new TextEncoder();

let heartbeatInterval: NodeJS.Timeout;

const stream = new ReadableStream({

start(controller) {

controller.enqueue(

encoder.encode(data: ${JSON.stringify({ type: 'connected', streamId })}\n\n)

);

// Heartbeat every 15 seconds

heartbeatInterval = setInterval(async () => {

try {

await registry.heartbeat(streamId);

controller.enqueue(encoder.encode(: heartbeat\n\n));

} catch {}

}, 15000);

},

cancel() {

clearInterval(heartbeatInterval);

registry.unregister(streamId);

},

});

return new Response(stream, {

headers: {

'Content-Type': 'text/event-stream',

'Cache-Control': 'no-cache',

'X-Stream-Id': streamId,

},

});

}

```

Client-Side Recovery

```typescript

function useResilientSSE(streamId: string) {

const [status, setStatus] = useState<'connecting' | 'connected' | 'completed' | 'error'>('connecting');

const [data, setData] = useState(null);

const reconnectAttempts = useRef(0);

useEffect(() => {

let eventSource: EventSource | null = null;

const connect = async () => {

// First, check for existing completion

try {

const recovery = await fetch(/api/stream/${streamId}/recover);

const result = await recovery.json();

if (result.status === 'completed') {

setStatus('completed');

setData(result.terminalEventData);

return;

}

} catch {}

// Connect to SSE

eventSource = new EventSource(/api/stream/${streamId});

eventSource.onmessage = (event) => {

const parsed = JSON.parse(event.data);

if (parsed.type === 'completed' || parsed.type === 'failed') {

setStatus('completed');

setData(parsed.data);

eventSource?.close();

} else {

setData(parsed);

}

};

eventSource.onerror = () => {

eventSource?.close();

if (reconnectAttempts.current < 3) {

reconnectAttempts.current++;

setTimeout(connect, 1000 * reconnectAttempts.current);

} else {

setStatus('error');

}

};

};

connect();

return () => eventSource?.close();

}, [streamId]);

return { status, data };

}

```

Best Practices

  1. Heartbeat every 15 seconds - Keeps stream alive and detects orphans
  2. Store completions for recovery - 5 minute window for client reconnection
  3. Background guardian process - Clean up orphaned streams automatically
  4. Client-side reconnection - Retry with exponential backoff
  5. Check for completion on connect - Recover missed terminal events

Common Mistakes

  • No heartbeat mechanism (can't detect orphaned streams)
  • Not storing completion data (lost terminal events)
  • Missing recovery endpoint (clients can't recover)
  • No background cleanup (resource leaks)
  • Forgetting to unregister on clean disconnect

Related Patterns

  • websocket-management - WebSocket alternative
  • graceful-shutdown - Drain streams on shutdown
  • checkpoint-resume - Track stream progress

More from this repository10

🎯
feature-flags🎯Skill

Enables controlled feature rollouts, A/B testing, and selective feature access through configurable flags for gradual deployment and user targeting.

🎯
design-tokens🎯Skill

Generates a comprehensive, type-safe design token system with WCAG AA color compliance and multi-framework support for consistent visual design.

🎯
file-uploads🎯Skill

Securely validates, scans, and processes file uploads with multi-stage checks, malware detection, and race condition prevention.

🎯
ai-coaching🎯Skill

Guides users through articulating creative intent by extracting structured parameters and detecting conversation readiness.

🎯
environment-config🎯Skill

Validates and centralizes environment variables with type safety, fail-fast startup checks, and multi-environment support.

🎯
community-feed🎯Skill

Generates efficient social feed with cursor pagination, trending algorithms, and engagement tracking for infinite scroll experiences.

🎯
cloud-storage🎯Skill

Enables secure, multi-tenant cloud file storage with signed URLs, direct uploads, and visibility control for user-uploaded assets.

🎯
email-service🎯Skill

Simplifies email sending, templating, and tracking with robust SMTP integration and support for multiple email providers and transactional workflows.

🎯
error-sanitization🎯Skill

Sanitizes error messages by logging full details server-side while exposing only generic, safe messages to prevent sensitive information leakage.

🎯
batch-processing🎯Skill

Optimizes database operations by collecting and batching independent records, improving throughput by 30-40% with built-in fallback processing.