distributed-events-advanced
π―Skillfrom thapaliyabikendra/ai-artifacts
Enables advanced distributed event patterns in ABP microservices with idempotent handlers, cross-tenant events, and saga implementations.
Installation
git clone https://github.com/thapaliyabikendra/ai-artifacts.gitnpx github:thapaliyabikendra/ai-artifacts install .curl -sSL https://raw.githubusercontent.com/thapaliyabikendra/ai-artifacts/main/scripts/install.sh | bash -s -- /path/to/projectnpx github:thapaliyabikendra/ai-artifacts updatenpx github:thapaliyabikendra/ai-artifacts uninstallSkill Details
"Advanced distributed event patterns for ABP microservices including idempotent handlers, cross-tenant events, event sourcing lite, and saga patterns. Use when: (1) implementing event handlers across services, (2) ensuring idempotent event processing, (3) cross-tenant event handling, (4) designing event-driven architectures."
Overview
# Distributed Events Advanced Patterns
Master advanced distributed event patterns for building resilient, scalable ABP microservices architectures.
When to Use This Skill
- Implementing event handlers that process events from other microservices
- Building idempotent event processing to handle duplicates
- Cross-tenant event synchronization
- Designing event-driven communication patterns
- Implementing saga/choreography patterns
- Troubleshooting event delivery issues
Core Concepts
Event Transfer Objects (ETOs)
ETOs are the payload of distributed events. Define them in a shared library accessible by both publisher and subscriber.
```csharp
// Shared/Etos/PatientCreatedEto.cs
[EventName("patient.created")] // Optional: explicit event name
public class PatientCreatedEto
{
public Guid Id { get; set; }
public Guid? TenantId { get; set; }
public string Name { get; set; }
public string Email { get; set; }
public DateTime CreatedAt { get; set; }
// Include correlation ID for tracing
public string CorrelationId { get; set; }
}
```
ETO Best Practices:
- Include
TenantIdfor multi-tenant scenarios - Include
CorrelationIdfor distributed tracing - Use primitive types only (no navigation properties)
- Version ETOs carefully (add properties, don't remove)
Event Handler Patterns
1. Basic Event Handler
```csharp
public class PatientCreatedEventHandler :
IDistributedEventHandler
ITransientDependency
{
private readonly ILogger
public PatientCreatedEventHandler(
ILogger
{
_logger = logger;
}
public async Task HandleEventAsync(PatientCreatedEto eventData)
{
_logger.LogInformation(
"Processing PatientCreated event: {PatientId}",
eventData.Id);
// Handle the event
await ProcessPatientAsync(eventData);
}
}
```
2. Idempotent Event Handler
Handle duplicate events gracefully using idempotency keys.
```csharp
public class PatientSyncEventHandler :
IDistributedEventHandler
ITransientDependency
{
private readonly IRepository
private readonly IRepository
private readonly ILogger
public async Task HandleEventAsync(PatientCreatedEto eto)
{
// Idempotency check using event ID or correlation ID
var eventKey = $"PatientCreated:{eto.Id}";
if (await _processedEventRepository.AnyAsync(x => x.EventKey == eventKey))
{
_logger.LogInformation(
"Event already processed, skipping: {EventKey}", eventKey);
return;
}
try
{
// Check if entity already exists
var existing = await _repository.FirstOrDefaultAsync(
x => x.ExternalId == eto.Id);
if (existing != null)
{
_logger.LogInformation(
"Patient already exists, updating: {PatientId}", eto.Id);
existing.SetName(eto.Name);
await _repository.UpdateAsync(existing);
}
else
{
var patient = new Patient(
GuidGenerator.Create(),
eto.Name,
eto.Email)
{
ExternalId = eto.Id
};
await _repository.InsertAsync(patient);
}
// Record processed event
await _processedEventRepository.InsertAsync(new ProcessedEvent
{
EventKey = eventKey,
ProcessedAt = DateTime.UtcNow
});
_logger.LogInformation(
"Successfully processed PatientCreated: {PatientId}", eto.Id);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to process PatientCreated: {PatientId}", eto.Id);
throw; // Re-throw to trigger retry
}
}
}
// Entity for tracking processed events
public class ProcessedEvent : Entity
{
public string EventKey { get; set; }
public DateTime ProcessedAt { get; set; }
}
```
3. Cross-Tenant Event Handler
Handle events that need to operate across tenant boundaries.
```csharp
public class EntitySyncEventHandler :
IDistributedEventHandler
ITransientDependency
{
private readonly IRepository
private readonly IDataFilter _dataFilter;
private readonly ICurrentTenant _currentTenant;
private readonly ILogger
public async Task HandleEventAsync(EntityUpdatedEto eto)
{
_logger.LogInformation(
"Processing cross-tenant sync: {EntityId}, TargetTenant: {TenantId}",
eto.Id, eto.TenantId);
// Option 1: Disable tenant filter completely
using (_dataFilter.Disable
{
await SyncEntityAsync(eto);
}
// Option 2: Switch to specific tenant context
using (_currentTenant.Change(eto.TenantId))
{
await SyncEntityAsync(eto);
}
}
private async Task SyncEntityAsync(EntityUpdatedEto eto)
{
var existing = await _repository.FirstOrDefaultAsync(
x => x.ExternalId == eto.ExternalId);
if (existing != null)
{
// Update existing
existing.Update(eto.Name, eto.Value);
await _repository.UpdateAsync(existing);
}
else
{
// Create new
var entity = new Entity(
GuidGenerator.Create(),
eto.Name,
eto.Value)
{
TenantId = eto.TenantId,
ExternalId = eto.ExternalId
};
await _repository.InsertAsync(entity);
}
}
}
```
4. Handler with Business Logic Delegation
Separate handler concerns from business logic for testability.
```csharp
// Event Handler - thin, handles infrastructure concerns
public class LicensePlateAllocatedEventHandler :
IDistributedEventHandler
ITransientDependency
{
private readonly ILicensePlateEventService _eventService;
private readonly ILogger
public LicensePlateAllocatedEventHandler(
ILicensePlateEventService eventService,
ILogger
{
_eventService = eventService;
_logger = logger;
}
public async Task HandleEventAsync(LicensePlateAllocatedEto eto)
{
try
{
_logger.LogInformation(
"[{Handler}] HandleEventAsync - Started - LicensePlateId: {Id}",
nameof(LicensePlateAllocatedEventHandler), eto.LicensePlateId);
await _eventService.ProcessAllocationAsync(eto);
_logger.LogInformation(
"[{Handler}] HandleEventAsync - Completed - LicensePlateId: {Id}",
nameof(LicensePlateAllocatedEventHandler), eto.LicensePlateId);
}
catch (Exception ex)
{
_logger.LogError(ex,
"[{Handler}] HandleEventAsync - Failed - LicensePlateId: {Id}",
nameof(LicensePlateAllocatedEventHandler), eto.LicensePlateId);
throw new UserFriendlyException($"Failed to process allocation: {ex.Message}");
}
}
}
// Service - contains business logic, easily testable
public interface ILicensePlateEventService
{
Task ProcessAllocationAsync(LicensePlateAllocatedEto eto);
}
public class LicensePlateEventService : ApplicationService, ILicensePlateEventService
{
private readonly IRepository
public async Task ProcessAllocationAsync(LicensePlateAllocatedEto eto)
{
var licensePlate = await _repository.GetAsync(eto.LicensePlateId);
licensePlate.MarkAsAllocated(eto.AllocatedTo, eto.AllocatedAt);
await _repository.UpdateAsync(licensePlate);
}
}
```
Publishing Events
1. From Application Service
```csharp
public class PatientAppService : ApplicationService, IPatientAppService
{
private readonly IDistributedEventBus _eventBus;
public async Task
{
var patient = new Patient(GuidGenerator.Create(), input.Name, input.Email);
await _patientRepository.InsertAsync(patient);
// Publish event after successful creation
await _eventBus.PublishAsync(new PatientCreatedEto
{
Id = patient.Id,
TenantId = CurrentTenant.Id,
Name = patient.Name,
Email = patient.Email,
CreatedAt = patient.CreationTime,
CorrelationId = CorrelationIdAccessor.GetCorrelationId()
});
return ObjectMapper.Map
}
}
```
2. From Domain Entity (Aggregate Root)
```csharp
public class Patient : FullAuditedAggregateRoot
{
public string Name { get; private set; }
public string Email { get; private set; }
public bool IsActive { get; private set; }
public void Activate()
{
if (IsActive)
{
throw new BusinessException("Patient is already active");
}
IsActive = true;
// Domain event - published when UoW completes
AddDistributedEvent(new PatientActivatedEto
{
Id = Id,
Name = Name,
Email = Email,
ActivatedAt = DateTime.UtcNow
});
}
}
```
3. Outbox Pattern (Transactional Events)
Ensure events are published atomically with database changes.
```csharp
// Configure in module
public override void ConfigureServices(ServiceConfigurationContext context)
{
Configure
{
options.Outbox.IsEnabled = true; // Enable outbox
});
}
```
Advanced Patterns
1. Event Retry with Exponential Backoff
```csharp
public class ResilientEventHandler :
IDistributedEventHandler
ITransientDependency
{
private const int MaxRetries = 3;
private readonly ILogger
public async Task HandleEventAsync(ImportantEventEto eto)
{
var retryCount = 0;
Exception lastException = null;
while (retryCount < MaxRetries)
{
try
{
await ProcessEventAsync(eto);
return; // Success
}
catch (Exception ex) when (IsTransientError(ex))
{
lastException = ex;
retryCount++;
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
_logger.LogWarning(
"Retry {RetryCount}/{MaxRetries} after {Delay}s for event {EventId}",
retryCount, MaxRetries, delay.TotalSeconds, eto.Id);
await Task.Delay(delay);
}
}
_logger.LogError(lastException,
"Failed to process event after {MaxRetries} retries: {EventId}",
MaxRetries, eto.Id);
throw lastException;
}
private bool IsTransientError(Exception ex) =>
ex is DbUpdateConcurrencyException ||
ex is TimeoutException ||
ex is HttpRequestException;
}
```
2. Saga/Choreography Pattern
Coordinate multi-step processes across services.
```csharp
// Step 1: Order Service publishes OrderCreated
public class OrderAppService : ApplicationService
{
public async Task
{
var order = new Order(GuidGenerator.Create(), input.CustomerId);
await _orderRepository.InsertAsync(order);
await _eventBus.PublishAsync(new OrderCreatedEto
{
OrderId = order.Id,
CustomerId = input.CustomerId,
Items = input.Items
});
return ObjectMapper.Map
}
}
// Step 2: Inventory Service handles OrderCreated
public class OrderCreatedHandler : IDistributedEventHandler
{
public async Task HandleEventAsync(OrderCreatedEto eto)
{
try
{
await ReserveInventoryAsync(eto.Items);
// Publish success event
await _eventBus.PublishAsync(new InventoryReservedEto
{
OrderId = eto.OrderId,
ReservedAt = DateTime.UtcNow
});
}
catch (InsufficientInventoryException ex)
{
// Publish failure event for compensation
await _eventBus.PublishAsync(new InventoryReservationFailedEto
{
OrderId = eto.OrderId,
Reason = ex.Message
});
}
}
}
// Step 3: Order Service handles compensation
public class InventoryReservationFailedHandler :
IDistributedEventHandler
{
public async Task HandleEventAsync(InventoryReservationFailedEto eto)
{
var order = await _orderRepository.GetAsync(eto.OrderId);
order.Cancel($"Inventory reservation failed: {eto.Reason}");
await _orderRepository.UpdateAsync(order);
// Notify customer
await _eventBus.PublishAsync(new OrderCancelledEto
{
OrderId = eto.OrderId,
Reason = eto.Reason
});
}
}
```
3. Event Aggregation
Batch multiple events for efficiency.
```csharp
public class BatchEventHandler :
IDistributedEventHandler
ITransientDependency
{
private static readonly ConcurrentDictionary
private static readonly SemaphoreSlim _lock = new(1, 1);
public async Task HandleEventAsync(ItemUpdatedEto eto)
{
var batchKey = eto.TenantId ?? Guid.Empty;
_batches.AddOrUpdate(
batchKey,
new List
(_, list) => { list.Add(eto); return list; });
// Process batch when threshold reached
if (_batches[batchKey].Count >= 100)
{
await _lock.WaitAsync();
try
{
if (_batches.TryRemove(batchKey, out var batch))
{
await ProcessBatchAsync(batch);
}
}
finally
{
_lock.Release();
}
}
}
}
```
Error Handling Patterns
Dead Letter Queue Handling
```csharp
public class DeadLetterEventHandler :
IDistributedEventHandler
ITransientDependency
{
private readonly IRepository
private readonly ILogger
public async Task HandleEventAsync(DeadLetterEvent eto)
{
_logger.LogWarning(
"Event moved to dead letter queue: {EventType}, Error: {Error}",
eto.OriginalEventType, eto.ErrorMessage);
await _failedEventRepository.InsertAsync(new FailedEvent
{
EventType = eto.OriginalEventType,
EventData = eto.OriginalEventData,
ErrorMessage = eto.ErrorMessage,
FailedAt = DateTime.UtcNow,
RetryCount = eto.RetryCount
});
// Notify operations team
await _notificationService.SendAlertAsync(
"Event Processing Failed",
$"Event {eto.OriginalEventType} failed after {eto.RetryCount} retries");
}
}
```
Testing Event Handlers
```csharp
public class PatientCreatedEventHandlerTests : ApplicationTestBase
{
private readonly PatientCreatedEventHandler _handler;
private readonly IRepository
[Fact]
public async Task HandleEventAsync_CreatesPatient_WhenNotExists()
{
// Arrange
var eto = new PatientCreatedEto
{
Id = Guid.NewGuid(),
Name = "John Doe",
Email = "john@example.com"
};
// Act
await _handler.HandleEventAsync(eto);
// Assert
var patient = await _repository.FirstOrDefaultAsync(
x => x.ExternalId == eto.Id);
patient.ShouldNotBeNull();
patient.Name.ShouldBe("John Doe");
}
[Fact]
public async Task HandleEventAsync_UpdatesPatient_WhenExists()
{
// Arrange
var existingId = Guid.NewGuid();
await _repository.InsertAsync(new Patient(
Guid.NewGuid(), "Old Name", "old@example.com")
{
ExternalId = existingId
});
var eto = new PatientCreatedEto
{
Id = existingId,
Name = "New Name",
Email = "new@example.com"
};
// Act
await _handler.HandleEventAsync(eto);
// Assert
var patient = await _repository.FirstOrDefaultAsync(
x => x.ExternalId == existingId);
patient.Name.ShouldBe("New Name");
}
[Fact]
public async Task HandleEventAsync_IsIdempotent()
{
// Arrange
var eto = new PatientCreatedEto
{
Id = Guid.NewGuid(),
Name = "John Doe"
};
// Act - process same event twice
await _handler.HandleEventAsync(eto);
await _handler.HandleEventAsync(eto);
// Assert - only one patient created
var count = await _repository.CountAsync(x => x.ExternalId == eto.Id);
count.ShouldBe(1);
}
}
```
Configuration
RabbitMQ Configuration
```csharp
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
Configure
{
options.ClientName = "MyService";
options.ExchangeName = "MyApp";
});
Configure
{
options.Outbox.IsEnabled = true;
options.Inbox.IsEnabled = true;
options.Inbox.HandlerExecutionMaxRetryCount = 3;
});
}
```
References
- [Event Handler Templates](references/event-handler-templates.md)
- [Saga Pattern Examples](references/saga-patterns.md)
External Resources
- ABP Distributed Events: https://docs.abp.io/en/abp/latest/Distributed-Event-Bus
- RabbitMQ Integration: https://docs.abp.io/en/abp/latest/Distributed-Event-Bus-RabbitMQ-Integration
More from this repository10
Configures and optimizes Entity Framework Core patterns for ABP Framework, focusing on entity configuration, migrations, and relationship design with PostgreSQL.
Implements domain layer patterns for ABP Framework, providing robust entity, aggregate, repository, and domain service implementations following DDD principles.
csharp-advanced-patterns skill from thapaliyabikendra/ai-artifacts
Optimizes LINQ queries in EF Core/ABP applications by preventing N+1 queries, implementing eager loading, and improving database-level performance.
clean-code-dotnet skill from thapaliyabikendra/ai-artifacts
fluentvalidation-patterns skill from thapaliyabikendra/ai-artifacts
ef-core-advanced-patterns skill from thapaliyabikendra/ai-artifacts
abp-api-implementation skill from thapaliyabikendra/ai-artifacts
abp-infrastructure-patterns skill from thapaliyabikendra/ai-artifacts
openiddict-authorization skill from thapaliyabikendra/ai-artifacts