🎯

worker-orchestration

🎯Skill

from dadbodgeoff/drift

VibeIndex|
What it does

Orchestrates background workers with intelligent scheduling, dependency management, and automatic failure handling.

πŸ“¦

Part of

dadbodgeoff/drift(69 items)

worker-orchestration

Installation

npm installInstall npm package
npm install -g driftdetect
npm installInstall npm package
npm install -g driftdetect@latest
npm installInstall npm package
npm install -g driftdetect-mcp
Claude Desktop ConfigurationAdd this to your claude_desktop_config.json
{ "mcpServers": { "drift": { "command": "driftdetect-mcp" } } ...
πŸ“– Extracted from docs: dadbodgeoff/drift
4Installs
-
AddedFeb 4, 2026

Skill Details

SKILL.md

Manage concurrent background workers with scheduling, dependencies, health monitoring, and automatic disabling of failing workers.

Overview

# Worker Orchestration

Coordinate multiple background workers.

When to Use This Skill

  • Multiple background jobs need coordination
  • Jobs have dependencies on each other
  • Need to prevent conflicting concurrent execution
  • Want automatic disabling of failing workers

Core Concepts

  1. Scheduling - Run workers on intervals
  2. Dependencies - Worker A must complete before B starts
  3. Blocking - Workers that can't run concurrently
  4. Auto-disable - Stop workers after consecutive failures

TypeScript Implementation

Types

```typescript

// types.ts

export enum WorkerExecutionMode {

SCHEDULED = 'scheduled',

TRIGGERED = 'triggered',

CONTINUOUS = 'continuous',

}

export enum JobPriority {

LOW = 0,

NORMAL = 1,

HIGH = 2,

CRITICAL = 3,

}

export interface WorkerConfig {

name: string;

executionMode: WorkerExecutionMode;

intervalSeconds: number;

timeoutSeconds: number;

maxRetries: number;

priority: JobPriority;

maxConsecutiveFailures: number;

dependsOn: string[];

blocks: string[];

// Runtime state

isEnabled: boolean;

isRunning: boolean;

consecutiveFailures: number;

lastRun?: Date;

lastSuccess?: Date;

lastError?: string;

}

export type WorkerFn = (config: WorkerConfig) => Promise;

```

Orchestrator

```typescript

// orchestrator.ts

import { WorkerConfig, WorkerFn, WorkerExecutionMode, JobPriority } from './types';

interface OrchestratorConfig {

tickIntervalMs: number;

maxConcurrentWorkers: number;

}

export class WorkerOrchestrator {

private workers = new Map();

private workerFns = new Map();

private running = new Set();

private tickInterval: NodeJS.Timeout | null = null;

private state: 'stopped' | 'running' | 'stopping' = 'stopped';

constructor(private config: OrchestratorConfig) {}

registerWorker(

name: string,

fn: WorkerFn,

options: Partial = {}

): void {

this.workers.set(name, {

name,

executionMode: options.executionMode || WorkerExecutionMode.SCHEDULED,

intervalSeconds: options.intervalSeconds || 300,

timeoutSeconds: options.timeoutSeconds || 60,

maxRetries: options.maxRetries || 3,

priority: options.priority || JobPriority.NORMAL,

maxConsecutiveFailures: options.maxConsecutiveFailures || 5,

dependsOn: options.dependsOn || [],

blocks: options.blocks || [],

isEnabled: true,

isRunning: false,

consecutiveFailures: 0,

});

this.workerFns.set(name, fn);

}

async start(): Promise {

if (this.state !== 'stopped') return;

this.state = 'running';

this.tickInterval = setInterval(

() => this.tick(),

this.config.tickIntervalMs

);

console.log([Orchestrator] Started with ${this.workers.size} workers);

}

async stop(): Promise {

if (this.state === 'stopped') return;

this.state = 'stopping';

if (this.tickInterval) clearInterval(this.tickInterval);

// Wait for running workers

const maxWait = 30000;

const start = Date.now();

while (this.running.size > 0 && Date.now() - start < maxWait) {

await this.sleep(100);

}

this.state = 'stopped';

console.log('[Orchestrator] Stopped');

}

private async tick(): Promise {

if (this.state !== 'running') return;

// Sort by priority (higher first)

const sortedWorkers = Array.from(this.workers.entries())

.sort((a, b) => b[1].priority - a[1].priority);

for (const [name, config] of sortedWorkers) {

if (!config.isEnabled || config.isRunning) continue;

if (this.running.size >= this.config.maxConcurrentWorkers) break;

if (this.shouldRun(config)) {

this.executeWorker(name, config);

}

}

}

private shouldRun(config: WorkerConfig): boolean {

// Check dependencies - must not be running

for (const dep of config.dependsOn) {

if (this.workers.get(dep)?.isRunning) return false;

}

// Check blockers - must not be running

for (const blocker of config.blocks) {

if (this.workers.get(blocker)?.isRunning) return false;

}

// Check schedule

if (!config.lastRun) return true;

const elapsed = (Date.now() - config.lastRun.getTime()) / 1000;

return elapsed >= config.intervalSeconds;

}

private async executeWorker(name: string, config: WorkerConfig): Promise {

const fn = this.workerFns.get(name);

if (!fn) return;

config.isRunning = true;

this.running.add(name);

console.log([Orchestrator] Starting ${name});

try {

// Execute with timeout

await Promise.race([

fn(config),

this.sleep(config.timeoutSeconds * 1000).then(() => {

throw new Error('Worker timeout');

}),

]);

config.lastRun = new Date();

config.lastSuccess = new Date();

config.consecutiveFailures = 0;

console.log([Orchestrator] Completed ${name});

} catch (error) {

config.lastRun = new Date();

config.lastError = error instanceof Error ? error.message : String(error);

config.consecutiveFailures++;

console.error([Orchestrator] Failed ${name}:, config.lastError);

// Auto-disable after too many failures

if (config.consecutiveFailures >= config.maxConsecutiveFailures) {

config.isEnabled = false;

console.log([Orchestrator] Disabled ${name} after ${config.consecutiveFailures} failures);

}

} finally {

config.isRunning = false;

this.running.delete(name);

}

}

async triggerWorker(name: string): Promise {

const config = this.workers.get(name);

if (!config || config.isRunning) return false;

await this.executeWorker(name, config);

return true;

}

enableWorker(name: string): void {

const config = this.workers.get(name);

if (config) {

config.isEnabled = true;

config.consecutiveFailures = 0;

}

}

disableWorker(name: string): void {

const config = this.workers.get(name);

if (config) {

config.isEnabled = false;

}

}

getStatus() {

return {

state: this.state,

workers: this.workers.size,

running: this.running.size,

workerStates: Object.fromEntries(

Array.from(this.workers.entries()).map(([name, config]) => [

name,

{

enabled: config.isEnabled,

running: config.isRunning,

failures: config.consecutiveFailures,

lastRun: config.lastRun,

lastError: config.lastError,

},

])

),

};

}

private sleep(ms: number): Promise {

return new Promise(resolve => setTimeout(resolve, ms));

}

}

```

Python Implementation

```python

# orchestrator.py

import asyncio

from dataclasses import dataclass, field

from datetime import datetime

from enum import Enum

from typing import Dict, Set, Callable, Awaitable, Optional, List

class JobPriority(int, Enum):

LOW = 0

NORMAL = 1

HIGH = 2

CRITICAL = 3

@dataclass

class WorkerConfig:

name: str

interval_seconds: int = 300

timeout_seconds: int = 60

max_consecutive_failures: int = 5

priority: JobPriority = JobPriority.NORMAL

depends_on: List[str] = field(default_factory=list)

blocks: List[str] = field(default_factory=list)

# Runtime state

is_enabled: bool = True

is_running: bool = False

consecutive_failures: int = 0

last_run: Optional[datetime] = None

last_error: Optional[str] = None

WorkerFn = Callable[[WorkerConfig], Awaitable[None]]

class WorkerOrchestrator:

def __init__(self, tick_interval: float = 5.0, max_concurrent: int = 5):

self._tick_interval = tick_interval

self._max_concurrent = max_concurrent

self._workers: Dict[str, WorkerConfig] = {}

self._worker_fns: Dict[str, WorkerFn] = {}

self._running: Set[str] = set()

self._state = "stopped"

self._task: Optional[asyncio.Task] = None

def register_worker(

self,

name: str,

fn: WorkerFn,

**options

):

self._workers[name] = WorkerConfig(name=name, **options)

self._worker_fns[name] = fn

async def start(self):

if self._state != "stopped":

return

self._state = "running"

self._task = asyncio.create_task(self._tick_loop())

print(f"[Orchestrator] Started with {len(self._workers)} workers")

async def stop(self):

if self._state == "stopped":

return

self._state = "stopping"

if self._task:

self._task.cancel()

# Wait for running workers

timeout = 30.0

start = datetime.now()

while self._running and (datetime.now() - start).total_seconds() < timeout:

await asyncio.sleep(0.1)

self._state = "stopped"

print("[Orchestrator] Stopped")

async def _tick_loop(self):

while self._state == "running":

await self._tick()

await asyncio.sleep(self._tick_interval)

async def _tick(self):

# Sort by priority

sorted_workers = sorted(

self._workers.items(),

key=lambda x: x[1].priority,

reverse=True

)

for name, config in sorted_workers:

if not config.is_enabled or config.is_running:

continue

if len(self._running) >= self._max_concurrent:

break

if self._should_run(config):

asyncio.create_task(self._execute_worker(name, config))

def _should_run(self, config: WorkerConfig) -> bool:

# Check dependencies

for dep in config.depends_on:

if self._workers.get(dep, WorkerConfig(name="")).is_running:

return False

# Check blockers

for blocker in config.blocks:

if self._workers.get(blocker, WorkerConfig(name="")).is_running:

return False

# Check schedule

if not config.last_run:

return True

elapsed = (datetime.now() - config.last_run).total_seconds()

return elapsed >= config.interval_seconds

async def _execute_worker(self, name: str, config: WorkerConfig):

fn = self._worker_fns.get(name)

if not fn:

return

config.is_running = True

self._running.add(name)

try:

await asyncio.wait_for(

fn(config),

timeout=config.timeout_seconds

)

config.last_run = datetime.now()

config.consecutive_failures = 0

except Exception as e:

config.last_run = datetime.now()

config.last_error = str(e)

config.consecutive_failures += 1

if config.consecutive_failures >= config.max_consecutive_failures:

config.is_enabled = False

print(f"[Orchestrator] Disabled {name}")

finally:

config.is_running = False

self._running.discard(name)

def get_status(self) -> Dict:

return {

"state": self._state,

"workers": len(self._workers),

"running": len(self._running),

"worker_states": {

name: {

"enabled": c.is_enabled,

"running": c.is_running,

"failures": c.consecutive_failures,

}

for name, c in self._workers.items()

},

}

```

Usage Examples

```typescript

const orchestrator = new WorkerOrchestrator({

tickIntervalMs: 5000,

maxConcurrentWorkers: 5,

});

// High-priority data fetcher

orchestrator.registerWorker('fetch-data', fetchDataWorker, {

intervalSeconds: 60,

timeoutSeconds: 30,

priority: JobPriority.HIGH,

});

// Depends on fetch-data completing first

orchestrator.registerWorker('process-data', processDataWorker, {

intervalSeconds: 120,

dependsOn: ['fetch-data'],

});

// Can't run while process-data is running

orchestrator.registerWorker('cleanup', cleanupWorker, {

intervalSeconds: 3600,

priority: JobPriority.LOW,

blocks: ['process-data'],

});

// Start

await orchestrator.start();

// Graceful shutdown

process.on('SIGTERM', () => orchestrator.stop());

```

Best Practices

  1. Use dependencies - For sequential workflows
  2. Use blocks - For mutually exclusive jobs
  3. Set timeouts - Prevent hung workers
  4. Auto-disable - Stop failing workers automatically
  5. Monitor status - Expose metrics endpoint

Common Mistakes

  • No timeout on workers (hung forever)
  • Missing dependencies (race conditions)
  • Too many concurrent workers (resource exhaustion)
  • Not handling graceful shutdown
  • No visibility into worker state

Related Skills

  • [Background Jobs](../background-jobs/)
  • [Graceful Shutdown](../graceful-shutdown/)
  • [Leader Election](../leader-election/)

More from this repository10

🎯
feature-flags🎯Skill

Enables controlled feature rollouts, A/B testing, and selective feature access through configurable flags for gradual deployment and user targeting.

🎯
design-tokens🎯Skill

Generates a comprehensive, type-safe design token system with WCAG AA color compliance and multi-framework support for consistent visual design.

🎯
file-uploads🎯Skill

Securely validates, scans, and processes file uploads with multi-stage checks, malware detection, and race condition prevention.

🎯
ai-coaching🎯Skill

Guides users through articulating creative intent by extracting structured parameters and detecting conversation readiness.

🎯
environment-config🎯Skill

Validates and centralizes environment variables with type safety, fail-fast startup checks, and multi-environment support.

🎯
community-feed🎯Skill

Generates efficient social feed with cursor pagination, trending algorithms, and engagement tracking for infinite scroll experiences.

🎯
cloud-storage🎯Skill

Enables secure, multi-tenant cloud file storage with signed URLs, direct uploads, and visibility control for user-uploaded assets.

🎯
email-service🎯Skill

Simplifies email sending, templating, and tracking with robust SMTP integration and support for multiple email providers and transactional workflows.

🎯
error-sanitization🎯Skill

Sanitizes error messages by logging full details server-side while exposing only generic, safe messages to prevent sensitive information leakage.

🎯
batch-processing🎯Skill

Optimizes database operations by collecting and batching independent records, improving throughput by 30-40% with built-in fallback processing.