🎯

job-state-machine

🎯Skill

from dadbodgeoff/drift

VibeIndex|
What it does

Manages async job processing with validated state transitions, progress tracking, and automatic asset linking across terminal states.

πŸ“¦

Part of

dadbodgeoff/drift(69 items)

job-state-machine

Installation

npm installInstall npm package
npm install -g driftdetect
npm installInstall npm package
npm install -g driftdetect@latest
npm installInstall npm package
npm install -g driftdetect-mcp
Claude Desktop ConfigurationAdd this to your claude_desktop_config.json
{ "mcpServers": { "drift": { "command": "driftdetect-mcp" } } ...
πŸ“– Extracted from docs: dadbodgeoff/drift
4Installs
-
AddedFeb 4, 2026

Skill Details

SKILL.md

Async job processing with validated state transitions, progress tracking, and asset linking. Ensure jobs always reach terminal states with proper error handling.

Overview

# Job State Machine

Validated state transitions for async jobs.

When to Use This Skill

  • Processing async jobs (image generation, exports, etc.)
  • Need progress tracking for long operations
  • Want to link created assets to jobs
  • Need reliable state management

Core Concepts

  1. Defined states - QUEUED β†’ PROCESSING β†’ COMPLETED/FAILED/PARTIAL
  2. Valid transitions - Only allowed state changes
  3. Terminal states - Jobs must reach an end state
  4. Asset linking - Track outputs created by jobs

State Machine

```

QUEUED

β”‚

β–Ό

PROCESSING

/ | \

β–Ό β–Ό β–Ό

COMPLETED PARTIAL FAILED

(terminal states)

```

TypeScript Implementation

Types

```typescript

// types.ts

export enum JobStatus {

QUEUED = 'queued',

PROCESSING = 'processing',

COMPLETED = 'completed',

FAILED = 'failed',

PARTIAL = 'partial',

}

export const VALID_TRANSITIONS: Record = {

[JobStatus.QUEUED]: [JobStatus.PROCESSING],

[JobStatus.PROCESSING]: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED],

[JobStatus.COMPLETED]: [],

[JobStatus.FAILED]: [],

[JobStatus.PARTIAL]: [],

};

export function isTerminalState(status: JobStatus): boolean {

return VALID_TRANSITIONS[status].length === 0;

}

export interface Job {

id: string;

userId: string;

jobType: string;

status: JobStatus;

progress: number;

errorMessage?: string;

parameters?: Record;

result?: Record;

createdAt: Date;

updatedAt: Date;

completedAt?: Date;

}

export interface Asset {

id: string;

jobId: string;

userId: string;

assetType: string;

url: string;

storagePath: string;

fileSize: number;

createdAt: Date;

}

```

Job Service

```typescript

// job-service.ts

import { Job, JobStatus, Asset, VALID_TRANSITIONS, isTerminalState } from './types';

export class InvalidStateTransitionError extends Error {

constructor(public currentStatus: JobStatus, public targetStatus: JobStatus) {

super(Cannot transition from '${currentStatus}' to '${targetStatus}');

this.name = 'InvalidStateTransitionError';

}

}

export class JobService {

constructor(private db: Database) {}

async createJob(

userId: string,

jobType: string,

parameters?: Record

): Promise {

const job: Job = {

id: crypto.randomUUID(),

userId,

jobType,

status: JobStatus.QUEUED,

progress: 0,

parameters,

createdAt: new Date(),

updatedAt: new Date(),

};

await this.db.jobs.insert(job);

return job;

}

async getJob(jobId: string, userId: string): Promise {

const job = await this.db.jobs.findOne({ id: jobId });

if (!job) throw new Error('Job not found');

if (job.userId !== userId) throw new Error('Unauthorized');

return job;

}

async transitionStatus(

jobId: string,

targetStatus: JobStatus,

options: {

progress?: number;

errorMessage?: string;

result?: Record;

} = {}

): Promise {

const job = await this.db.jobs.findOne({ id: jobId });

if (!job) throw new Error('Job not found');

// Allow same-state for progress updates

if (job.status !== targetStatus) {

if (!VALID_TRANSITIONS[job.status].includes(targetStatus)) {

throw new InvalidStateTransitionError(job.status, targetStatus);

}

}

const updates: Partial = {

status: targetStatus,

progress: options.progress ?? job.progress,

updatedAt: new Date(),

};

if (options.errorMessage !== undefined) {

updates.errorMessage = options.errorMessage;

}

if (options.result !== undefined) {

updates.result = options.result;

}

if (isTerminalState(targetStatus)) {

updates.completedAt = new Date();

}

await this.db.jobs.update({ id: jobId }, updates);

return { ...job, ...updates };

}

async updateProgress(jobId: string, progress: number): Promise {

const job = await this.db.jobs.findOne({ id: jobId });

if (!job) throw new Error('Job not found');

if (job.status !== JobStatus.PROCESSING) {

console.warn(Ignoring progress update for job ${jobId} in ${job.status} state);

return job;

}

return this.transitionStatus(jobId, JobStatus.PROCESSING, { progress });

}

async markCompleted(jobId: string, result?: Record): Promise {

return this.transitionStatus(jobId, JobStatus.COMPLETED, { progress: 100, result });

}

async markFailed(jobId: string, errorMessage: string): Promise {

return this.transitionStatus(jobId, JobStatus.FAILED, { errorMessage });

}

async markPartial(

jobId: string,

result?: Record,

errorMessage?: string

): Promise {

return this.transitionStatus(jobId, JobStatus.PARTIAL, {

progress: 100,

result,

errorMessage,

});

}

async createAsset(

jobId: string,

userId: string,

assetType: string,

url: string,

storagePath: string,

fileSize: number

): Promise {

const asset: Asset = {

id: crypto.randomUUID(),

jobId,

userId,

assetType,

url,

storagePath,

fileSize,

createdAt: new Date(),

};

await this.db.assets.insert(asset);

return asset;

}

async getJobAssets(jobId: string, userId: string): Promise {

await this.getJob(jobId, userId); // Verify ownership

return this.db.assets.find({ jobId });

}

}

```

Python Implementation

```python

# job_service.py

from enum import Enum

from dataclasses import dataclass

from datetime import datetime

from typing import Optional, Dict, Any, List

from uuid import uuid4

class JobStatus(str, Enum):

QUEUED = "queued"

PROCESSING = "processing"

COMPLETED = "completed"

FAILED = "failed"

PARTIAL = "partial"

VALID_TRANSITIONS = {

JobStatus.QUEUED: [JobStatus.PROCESSING],

JobStatus.PROCESSING: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED],

JobStatus.COMPLETED: [],

JobStatus.FAILED: [],

JobStatus.PARTIAL: [],

}

def is_terminal_state(status: JobStatus) -> bool:

return len(VALID_TRANSITIONS.get(status, [])) == 0

@dataclass

class Job:

id: str

user_id: str

job_type: str

status: JobStatus

progress: int

created_at: datetime

updated_at: datetime

error_message: Optional[str] = None

parameters: Optional[Dict[str, Any]] = None

result: Optional[Dict[str, Any]] = None

completed_at: Optional[datetime] = None

class InvalidStateTransitionError(Exception):

def __init__(self, current: JobStatus, target: JobStatus):

self.current = current

self.target = target

super().__init__(f"Cannot transition from '{current}' to '{target}'")

class JobService:

def __init__(self, db):

self.db = db

async def create_job(

self,

user_id: str,

job_type: str,

parameters: Optional[Dict[str, Any]] = None,

) -> Job:

now = datetime.utcnow()

job = Job(

id=str(uuid4()),

user_id=user_id,

job_type=job_type,

status=JobStatus.QUEUED,

progress=0,

parameters=parameters,

created_at=now,

updated_at=now,

)

await self.db.jobs.insert(job)

return job

async def transition_status(

self,

job_id: str,

target_status: JobStatus,

progress: Optional[int] = None,

error_message: Optional[str] = None,

result: Optional[Dict[str, Any]] = None,

) -> Job:

job = await self.db.jobs.find_one(id=job_id)

if not job:

raise ValueError("Job not found")

if job.status != target_status:

if target_status not in VALID_TRANSITIONS.get(job.status, []):

raise InvalidStateTransitionError(job.status, target_status)

job.status = target_status

job.updated_at = datetime.utcnow()

if progress is not None:

job.progress = progress

if error_message is not None:

job.error_message = error_message

if result is not None:

job.result = result

if is_terminal_state(target_status):

job.completed_at = datetime.utcnow()

await self.db.jobs.update(job)

return job

async def mark_completed(self, job_id: str, result: Optional[Dict] = None) -> Job:

return await self.transition_status(

job_id, JobStatus.COMPLETED, progress=100, result=result

)

async def mark_failed(self, job_id: str, error_message: str) -> Job:

return await self.transition_status(

job_id, JobStatus.FAILED, error_message=error_message

)

```

Worker Integration

```typescript

async function processJob(jobId: string) {

const jobService = getJobService();

try {

// Transition to PROCESSING

await jobService.transitionStatus(jobId, JobStatus.PROCESSING, { progress: 0 });

const job = await jobService.getJobInternal(jobId);

// Process with progress updates

await jobService.updateProgress(jobId, 25);

const result1 = await doStep1(job.parameters);

await jobService.updateProgress(jobId, 50);

const result2 = await doStep2(result1);

await jobService.updateProgress(jobId, 75);

const assetUrl = await uploadResult(result2);

// Create asset

await jobService.createAsset(

jobId,

job.userId,

job.jobType,

assetUrl,

${job.userId}/${jobId}/result.png,

result2.length

);

// Mark completed

await jobService.markCompleted(jobId, { assetCount: 1 });

} catch (error) {

console.error(Job ${jobId} failed:, error);

await jobService.markFailed(jobId, error.message);

}

}

```

Database Schema

```sql

CREATE TABLE jobs (

id UUID PRIMARY KEY,

user_id UUID NOT NULL REFERENCES users(id),

job_type VARCHAR(50) NOT NULL,

status VARCHAR(20) NOT NULL DEFAULT 'queued',

progress INTEGER NOT NULL DEFAULT 0,

error_message TEXT,

parameters JSONB,

result JSONB,

created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

completed_at TIMESTAMPTZ,

CONSTRAINT valid_status CHECK (status IN ('queued', 'processing', 'completed', 'failed', 'partial')),

CONSTRAINT valid_progress CHECK (progress >= 0 AND progress <= 100)

);

CREATE INDEX idx_jobs_user_status ON jobs(user_id, status);

CREATE TABLE assets (

id UUID PRIMARY KEY,

job_id UUID NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,

user_id UUID NOT NULL REFERENCES users(id),

asset_type VARCHAR(50) NOT NULL,

url TEXT NOT NULL,

storage_path TEXT NOT NULL,

file_size INTEGER NOT NULL,

created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()

);

CREATE INDEX idx_assets_job_id ON assets(job_id);

```

Best Practices

  1. Validate all transitions - Never skip state machine validation
  2. Always reach terminal - Jobs must complete, fail, or partial
  3. Track progress - Provide feedback for long operations
  4. Link assets - Maintain relationship for cleanup
  5. Log transitions - Essential for debugging

Common Mistakes

  • Allowing invalid state transitions
  • Jobs stuck in non-terminal states
  • Not tracking progress for long jobs
  • Orphaned assets without job links
  • Missing error messages on failure

Related Skills

  • [Background Jobs](../background-jobs/)
  • [Dead Letter Queue](../dead-letter-queue/)
  • [Graceful Shutdown](../graceful-shutdown/)

More from this repository10

🎯
feature-flags🎯Skill

Enables controlled feature rollouts, A/B testing, and selective feature access through configurable flags for gradual deployment and user targeting.

🎯
design-tokens🎯Skill

Generates a comprehensive, type-safe design token system with WCAG AA color compliance and multi-framework support for consistent visual design.

🎯
file-uploads🎯Skill

Securely validates, scans, and processes file uploads with multi-stage checks, malware detection, and race condition prevention.

🎯
ai-coaching🎯Skill

Guides users through articulating creative intent by extracting structured parameters and detecting conversation readiness.

🎯
environment-config🎯Skill

Validates and centralizes environment variables with type safety, fail-fast startup checks, and multi-environment support.

🎯
community-feed🎯Skill

Generates efficient social feed with cursor pagination, trending algorithms, and engagement tracking for infinite scroll experiences.

🎯
cloud-storage🎯Skill

Enables secure, multi-tenant cloud file storage with signed URLs, direct uploads, and visibility control for user-uploaded assets.

🎯
email-service🎯Skill

Simplifies email sending, templating, and tracking with robust SMTP integration and support for multiple email providers and transactional workflows.

🎯
error-sanitization🎯Skill

Sanitizes error messages by logging full details server-side while exposing only generic, safe messages to prevent sensitive information leakage.

🎯
batch-processing🎯Skill

Optimizes database operations by collecting and batching independent records, improving throughput by 30-40% with built-in fallback processing.