Types
```typescript
// types.ts
export enum WorkerExecutionMode {
SCHEDULED = 'scheduled',
TRIGGERED = 'triggered',
CONTINUOUS = 'continuous',
}
export enum JobPriority {
LOW = 0,
NORMAL = 1,
HIGH = 2,
CRITICAL = 3,
}
export interface WorkerConfig {
name: string;
executionMode: WorkerExecutionMode;
intervalSeconds: number;
timeoutSeconds: number;
maxRetries: number;
priority: JobPriority;
maxConsecutiveFailures: number;
dependsOn: string[];
blocks: string[];
// Runtime state
isEnabled: boolean;
isRunning: boolean;
consecutiveFailures: number;
lastRun?: Date;
lastSuccess?: Date;
lastError?: string;
}
export type WorkerFn = (config: WorkerConfig) => Promise;
```
Orchestrator
```typescript
// orchestrator.ts
import { WorkerConfig, WorkerFn, WorkerExecutionMode, JobPriority } from './types';
interface OrchestratorConfig {
tickIntervalMs: number;
maxConcurrentWorkers: number;
}
export class WorkerOrchestrator {
private workers = new Map();
private workerFns = new Map();
private running = new Set();
private tickInterval: NodeJS.Timeout | null = null;
private state: 'stopped' | 'running' | 'stopping' = 'stopped';
constructor(private config: OrchestratorConfig) {}
registerWorker(
name: string,
fn: WorkerFn,
options: Partial = {}
): void {
this.workers.set(name, {
name,
executionMode: options.executionMode || WorkerExecutionMode.SCHEDULED,
intervalSeconds: options.intervalSeconds || 300,
timeoutSeconds: options.timeoutSeconds || 60,
maxRetries: options.maxRetries || 3,
priority: options.priority || JobPriority.NORMAL,
maxConsecutiveFailures: options.maxConsecutiveFailures || 5,
dependsOn: options.dependsOn || [],
blocks: options.blocks || [],
isEnabled: true,
isRunning: false,
consecutiveFailures: 0,
});
this.workerFns.set(name, fn);
}
async start(): Promise {
if (this.state !== 'stopped') return;
this.state = 'running';
this.tickInterval = setInterval(
() => this.tick(),
this.config.tickIntervalMs
);
console.log([Orchestrator] Started with ${this.workers.size} workers);
}
async stop(): Promise {
if (this.state === 'stopped') return;
this.state = 'stopping';
if (this.tickInterval) clearInterval(this.tickInterval);
// Wait for running workers
const maxWait = 30000;
const start = Date.now();
while (this.running.size > 0 && Date.now() - start < maxWait) {
await this.sleep(100);
}
this.state = 'stopped';
console.log('[Orchestrator] Stopped');
}
private async tick(): Promise {
if (this.state !== 'running') return;
// Sort by priority (higher first)
const sortedWorkers = Array.from(this.workers.entries())
.sort((a, b) => b[1].priority - a[1].priority);
for (const [name, config] of sortedWorkers) {
if (!config.isEnabled || config.isRunning) continue;
if (this.running.size >= this.config.maxConcurrentWorkers) break;
if (this.shouldRun(config)) {
this.executeWorker(name, config);
}
}
}
private shouldRun(config: WorkerConfig): boolean {
// Check dependencies - must not be running
for (const dep of config.dependsOn) {
if (this.workers.get(dep)?.isRunning) return false;
}
// Check blockers - must not be running
for (const blocker of config.blocks) {
if (this.workers.get(blocker)?.isRunning) return false;
}
// Check schedule
if (!config.lastRun) return true;
const elapsed = (Date.now() - config.lastRun.getTime()) / 1000;
return elapsed >= config.intervalSeconds;
}
private async executeWorker(name: string, config: WorkerConfig): Promise {
const fn = this.workerFns.get(name);
if (!fn) return;
config.isRunning = true;
this.running.add(name);
console.log([Orchestrator] Starting ${name});
try {
// Execute with timeout
await Promise.race([
fn(config),
this.sleep(config.timeoutSeconds * 1000).then(() => {
throw new Error('Worker timeout');
}),
]);
config.lastRun = new Date();
config.lastSuccess = new Date();
config.consecutiveFailures = 0;
console.log([Orchestrator] Completed ${name});
} catch (error) {
config.lastRun = new Date();
config.lastError = error instanceof Error ? error.message : String(error);
config.consecutiveFailures++;
console.error([Orchestrator] Failed ${name}:, config.lastError);
// Auto-disable after too many failures
if (config.consecutiveFailures >= config.maxConsecutiveFailures) {
config.isEnabled = false;
console.log([Orchestrator] Disabled ${name} after ${config.consecutiveFailures} failures);
}
} finally {
config.isRunning = false;
this.running.delete(name);
}
}
async triggerWorker(name: string): Promise {
const config = this.workers.get(name);
if (!config || config.isRunning) return false;
await this.executeWorker(name, config);
return true;
}
enableWorker(name: string): void {
const config = this.workers.get(name);
if (config) {
config.isEnabled = true;
config.consecutiveFailures = 0;
}
}
disableWorker(name: string): void {
const config = this.workers.get(name);
if (config) {
config.isEnabled = false;
}
}
getStatus() {
return {
state: this.state,
workers: this.workers.size,
running: this.running.size,
workerStates: Object.fromEntries(
Array.from(this.workers.entries()).map(([name, config]) => [
name,
{
enabled: config.isEnabled,
running: config.isRunning,
failures: config.consecutiveFailures,
lastRun: config.lastRun,
lastError: config.lastError,
},
])
),
};
}
private sleep(ms: number): Promise {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
```