🎯

saga-orchestration

🎯Skill

from rmyndharis/antigravity-skills

VibeIndex|
What it does

Orchestrates distributed transactions across multiple services, managing complex workflows with automatic compensation and failure handling.

πŸ“¦

Part of

rmyndharis/antigravity-skills(289 items)

saga-orchestration

Installation

npm runRun npm script
npm run build:catalog
npxRun with npx
npx @rmyndharis/antigravity-skills search <query>
npxRun with npx
npx @rmyndharis/antigravity-skills search kubernetes
npxRun with npx
npx @rmyndharis/antigravity-skills list
npxRun with npx
npx @rmyndharis/antigravity-skills install <skill-name>

+ 15 more commands

πŸ“– Extracted from docs: rmyndharis/antigravity-skills
10Installs
-
AddedFeb 4, 2026

Skill Details

SKILL.md

Implement saga patterns for distributed transactions and cross-aggregate workflows. Use when coordinating multi-step business processes, handling compensating transactions, or managing long-running workflows.

Overview

# Saga Orchestration

Patterns for managing distributed transactions and long-running business processes.

Do not use this skill when

  • The task is unrelated to saga orchestration
  • You need a different domain or tool outside this scope

Instructions

  • Clarify goals, constraints, and required inputs.
  • Apply relevant best practices and validate outcomes.
  • Provide actionable steps and verification.
  • If detailed examples are required, open resources/implementation-playbook.md.

Use this skill when

  • Coordinating multi-service transactions
  • Implementing compensating transactions
  • Managing long-running business workflows
  • Handling failures in distributed systems
  • Building order fulfillment processes
  • Implementing approval workflows

Core Concepts

1. Saga Types

```

Choreography Orchestration

β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”

β”‚Svc A│─►│Svc B│─►│Svc Cβ”‚ β”‚ Orchestratorβ”‚

β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜

β”‚ β”‚ β”‚ β”‚

β–Ό β–Ό β–Ό β”Œβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”

Event Event Event β–Ό β–Ό β–Ό

β”Œβ”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”

β”‚Svc1β”‚β”‚Svc2β”‚β”‚Svc3β”‚

β””β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”˜

```

2. Saga Execution States

| State | Description |

| ---------------- | ------------------------------ |

| Started | Saga initiated |

| Pending | Waiting for step completion |

| Compensating | Rolling back due to failure |

| Completed | All steps succeeded |

| Failed | Saga failed after compensation |

Templates

Template 1: Saga Orchestrator Base

```python

from abc import ABC, abstractmethod

from dataclasses import dataclass, field

from enum import Enum

from typing import List, Dict, Any, Optional

from datetime import datetime

import uuid

class SagaState(Enum):

STARTED = "started"

PENDING = "pending"

COMPENSATING = "compensating"

COMPLETED = "completed"

FAILED = "failed"

@dataclass

class SagaStep:

name: str

action: str

compensation: str

status: str = "pending"

result: Optional[Dict] = None

error: Optional[str] = None

executed_at: Optional[datetime] = None

compensated_at: Optional[datetime] = None

@dataclass

class Saga:

saga_id: str

saga_type: str

state: SagaState

data: Dict[str, Any]

steps: List[SagaStep]

current_step: int = 0

created_at: datetime = field(default_factory=datetime.utcnow)

updated_at: datetime = field(default_factory=datetime.utcnow)

class SagaOrchestrator(ABC):

"""Base class for saga orchestrators."""

def __init__(self, saga_store, event_publisher):

self.saga_store = saga_store

self.event_publisher = event_publisher

@abstractmethod

def define_steps(self, data: Dict) -> List[SagaStep]:

"""Define the saga steps."""

pass

@property

@abstractmethod

def saga_type(self) -> str:

"""Unique saga type identifier."""

pass

async def start(self, data: Dict) -> Saga:

"""Start a new saga."""

saga = Saga(

saga_id=str(uuid.uuid4()),

saga_type=self.saga_type,

state=SagaState.STARTED,

data=data,

steps=self.define_steps(data)

)

await self.saga_store.save(saga)

await self._execute_next_step(saga)

return saga

async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):

"""Handle successful step completion."""

saga = await self.saga_store.get(saga_id)

# Update step

for step in saga.steps:

if step.name == step_name:

step.status = "completed"

step.result = result

step.executed_at = datetime.utcnow()

break

saga.current_step += 1

saga.updated_at = datetime.utcnow()

# Check if saga is complete

if saga.current_step >= len(saga.steps):

saga.state = SagaState.COMPLETED

await self.saga_store.save(saga)

await self._on_saga_completed(saga)

else:

saga.state = SagaState.PENDING

await self.saga_store.save(saga)

await self._execute_next_step(saga)

async def handle_step_failed(self, saga_id: str, step_name: str, error: str):

"""Handle step failure - start compensation."""

saga = await self.saga_store.get(saga_id)

# Mark step as failed

for step in saga.steps:

if step.name == step_name:

step.status = "failed"

step.error = error

break

saga.state = SagaState.COMPENSATING

saga.updated_at = datetime.utcnow()

await self.saga_store.save(saga)

# Start compensation from current step backwards

await self._compensate(saga)

async def _execute_next_step(self, saga: Saga):

"""Execute the next step in the saga."""

if saga.current_step >= len(saga.steps):

return

step = saga.steps[saga.current_step]

step.status = "executing"

await self.saga_store.save(saga)

# Publish command to execute step

await self.event_publisher.publish(

step.action,

{

"saga_id": saga.saga_id,

"step_name": step.name,

**saga.data

}

)

async def _compensate(self, saga: Saga):

"""Execute compensation for completed steps."""

# Compensate in reverse order

for i in range(saga.current_step - 1, -1, -1):

step = saga.steps[i]

if step.status == "completed":

step.status = "compensating"

await self.saga_store.save(saga)

await self.event_publisher.publish(

step.compensation,

{

"saga_id": saga.saga_id,

"step_name": step.name,

"original_result": step.result,

**saga.data

}

)

async def handle_compensation_completed(self, saga_id: str, step_name: str):

"""Handle compensation completion."""

saga = await self.saga_store.get(saga_id)

for step in saga.steps:

if step.name == step_name:

step.status = "compensated"

step.compensated_at = datetime.utcnow()

break

# Check if all compensations complete

all_compensated = all(

s.status in ("compensated", "pending", "failed")

for s in saga.steps

)

if all_compensated:

saga.state = SagaState.FAILED

await self._on_saga_failed(saga)

await self.saga_store.save(saga)

async def _on_saga_completed(self, saga: Saga):

"""Called when saga completes successfully."""

await self.event_publisher.publish(

f"{self.saga_type}Completed",

{"saga_id": saga.saga_id, **saga.data}

)

async def _on_saga_failed(self, saga: Saga):

"""Called when saga fails after compensation."""

await self.event_publisher.publish(

f"{self.saga_type}Failed",

{"saga_id": saga.saga_id, "error": "Saga failed", **saga.data}

)

```

Template 2: Order Fulfillment Saga

```python

class OrderFulfillmentSaga(SagaOrchestrator):

"""Orchestrates order fulfillment across services."""

@property

def saga_type(self) -> str:

return "OrderFulfillment"

def define_steps(self, data: Dict) -> List[SagaStep]:

return [

SagaStep(

name="reserve_inventory",

action="InventoryService.ReserveItems",

compensation="InventoryService.ReleaseReservation"

),

SagaStep(

name="process_payment",

action="PaymentService.ProcessPayment",

compensation="PaymentService.RefundPayment"

),

SagaStep(

name="create_shipment",

action="ShippingService.CreateShipment",

compensation="ShippingService.CancelShipment"

),

SagaStep(

name="send_confirmation",

action="NotificationService.SendOrderConfirmation",

compensation="NotificationService.SendCancellationNotice"

)

]

# Usage

async def create_order(order_data: Dict):

saga = OrderFulfillmentSaga(saga_store, event_publisher)

return await saga.start({

"order_id": order_data["order_id"],

"customer_id": order_data["customer_id"],

"items": order_data["items"],

"payment_method": order_data["payment_method"],

"shipping_address": order_data["shipping_address"]

})

# Event handlers in each service

class InventoryService:

async def handle_reserve_items(self, command: Dict):

try:

# Reserve inventory

reservation = await self.reserve(

command["items"],

command["order_id"]

)

# Report success

await self.event_publisher.publish(

"SagaStepCompleted",

{

"saga_id": command["saga_id"],

"step_name": "reserve_inventory",

"result": {"reservation_id": reservation.id}

}

)

except InsufficientInventoryError as e:

await self.event_publisher.publish(

"SagaStepFailed",

{

"saga_id": command["saga_id"],

"step_name": "reserve_inventory",

"error": str(e)

}

)

async def handle_release_reservation(self, command: Dict):

# Compensating action

await self.release_reservation(

command["original_result"]["reservation_id"]

)

await self.event_publisher.publish(

"SagaCompensationCompleted",

{

"saga_id": command["saga_id"],

"step_name": "reserve_inventory"

}

)

```

Template 3: Choreography-Based Saga

```python

from dataclasses import dataclass

from typing import Dict, Any

import asyncio

@dataclass

class SagaContext:

"""Passed through choreographed saga events."""

saga_id: str

step: int

data: Dict[str, Any]

completed_steps: list

class OrderChoreographySaga:

"""Choreography-based saga using events."""

def __init__(self, event_bus):

self.event_bus = event_bus

self._register_handlers()

def _register_handlers(self):

self.event_bus.subscribe("OrderCreated", self._on_order_created)

self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)

self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)

self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)

# Compensation handlers

self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)

self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)

async def _on_order_created(self, event: Dict):

"""Step 1: Order created, reserve inventory."""

await self.event_bus.publish("ReserveInventory", {

"saga_id": event["order_id"],

"order_id": event["order_id"],

"items": event["items"]

})

async def _on_inventory_reserved(self, event: Dict):

"""Step 2: Inventory reserved, process payment."""

await self.event_bus.publish("ProcessPayment", {

"saga_id": event["saga_id"],

"order_id": event["order_id"],

"amount": event["total_amount"],

"reservation_id": event["reservation_id"]

})

async def _on_payment_processed(self, event: Dict):

"""Step 3: Payment done, create shipment."""

await self.event_bus.publish("CreateShipment", {

"saga_id": event["saga_id"],

"order_id": event["order_id"],

"payment_id": event["payment_id"]

})

async def _on_shipment_created(self, event: Dict):

"""Step 4: Complete - send confirmation."""

await self.event_bus.publish("OrderFulfilled", {

"saga_id": event["saga_id"],

"order_id": event["order_id"],

"tracking_number": event["tracking_number"]

})

# Compensation handlers

async def _on_payment_failed(self, event: Dict):

"""Payment failed - release inventory."""

await self.event_bus.publish("ReleaseInventory", {

"saga_id": event["saga_id"],

"reservation_id": event["reservation_id"]

})

await self.event_bus.publish("OrderFailed", {

"order_id": event["order_id"],

"reason": "Payment failed"

})

async def _on_shipment_failed(self, event: Dict):

"""Shipment failed - refund payment and release inventory."""

await self.event_bus.publish("RefundPayment", {

"saga_id": event["saga_id"],

"payment_id": event["payment_id"]

})

await self.event_bus.publish("ReleaseInventory", {

"saga_id": event["saga_id"],

"reservation_id": event["reservation_id"]

})

```

Template 4: Saga with Timeouts

```python

class TimeoutSagaOrchestrator(SagaOrchestrator):

"""Saga orchestrator with step timeouts."""

def __init__(self, saga_store, event_publisher, scheduler):

super().__init__(saga_store, event_publisher)

self.scheduler = scheduler

async def _execute_next_step(self, saga: Saga):

if saga.current_step >= len(saga.steps):

return

step = saga.steps[saga.current_step]

step.status = "executing"

step.timeout_at = datetime.utcnow() + timedelta(minutes=5)

await self.saga_store.save(saga)

# Schedule timeout check

await self.scheduler.schedule(

f"saga_timeout_{saga.saga_id}_{step.name}",

self._check_timeout,

{"saga_id": saga.saga_id, "step_name": step.name},

run_at=step.timeout_at

)

await self.event_publisher.publish(

step.action,

{"saga_id": saga.saga_id, "step_name": step.name, **saga.data}

)

async def _check_timeout(self, data: Dict):

"""Check if step has timed out."""

saga = await self.saga_store.get(data["saga_id"])

step = next(s for s in saga.steps if s.name == data["step_name"])

if step.status == "executing":

# Step timed out - fail it

await self.handle_step_failed(

data["saga_id"],

data["step_name"],

"Step timed out"

)

```

Best Practices

Do's

  • Make steps idempotent - Safe to retry
  • Design compensations carefully - They must work
  • Use correlation IDs - For tracing across services
  • Implement timeouts - Don't wait forever
  • Log everything - For debugging failures

Don'ts

  • Don't assume instant completion - Sagas take time
  • Don't skip compensation testing - Most critical part
  • Don't couple services - Use async messaging
  • Don't ignore partial failures - Handle gracefully

Resources

  • [Saga Pattern](https://microservices.io/patterns/data/saga.html)
  • [Designing Data-Intensive Applications](https://dataintensive.net/)