Skip to content

feat: add custom event handler helpers for batched events (SQS, SNS, etc.) #191

@j-d-ha

Description

@j-d-ha

Summary

Add custom event handler helpers to provide a cleaner abstraction layer for working with batched event types like SQS and SNS. These helpers will simplify event-driven Lambda development by providing:

  • Simple API for handling individual messages within batch events
  • Built-in support for batch item failure reporting (for services that support it)
  • Unified abstraction layer for both batch-aware and non-batch-aware event sources
  • Support for both sync and async handler methods
  • Cleaner, more testable code for event-driven workflows

Motivation

Currently, when working with batched events (SQS, SNS, Kinesis, DynamoDB Streams), developers need to:

  1. Manually iterate through records in the batch
  2. Implement their own error handling and partial batch failure logic
  3. Construct batch item failure responses manually
  4. Duplicate this boilerplate across multiple Lambda functions

This feature would provide a clean abstraction that handles these concerns automatically, similar to how the existing envelope system simplifies event deserialization.

Proposed API

Basic Handler Registration

var builder = LambdaApplication.CreateBuilder();
var lambda = builder.Build();

// SQS - with automatic batch item failure reporting
lambda.MapSqsHandler<MyMessage>(async (message, context, ct) => 
{
    await _processor.ProcessMessageAsync(message, ct);
    // Return true for success, false for failure
    // Failures are automatically added to batch item failures
    return true;
});

// SNS - with automatic batch item failure reporting
lambda.MapSnsHandler<MyNotification>(async (notification, record, ct) =>
{
    await _handler.HandleNotificationAsync(notification, ct);
    return true;
});

// Generic batch handler for other event types
lambda.MapBatchHandler<KinesisEvent, KinesisEventRecord>(async (record, ct) =>
{
    var data = Encoding.UTF8.GetString(record.Kinesis.Data);
    await _processor.ProcessAsync(data, ct);
    return true;
});

Advanced Usage with Context

// Access to full record metadata
lambda.MapSqsHandler<OrderEvent>((message, record, context, ct) =>
{
    _logger.LogInformation(
        "Processing order {OrderId} from queue, message ID: {MessageId}",
        message.OrderId,
        record.MessageId
    );
    
    await _orderService.ProcessOrderAsync(message, ct);
    return true;
});

// Batch-level control
lambda.MapSqsHandler<OrderEvent>(async (batch, ct) =>
{
    var results = new List<BatchItemResult>();
    
    foreach (var item in batch.Items)
    {
        try
        {
            await _orderService.ProcessOrderAsync(item.Message, ct);
            results.Add(BatchItemResult.Success(item.MessageId));
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process message {MessageId}", item.MessageId);
            results.Add(BatchItemResult.Failure(item.MessageId));
        }
    }
    
    return results;
});

Configuration

builder.Services.ConfigureSqsHandler(options =>
{
    options.ReportBatchItemFailures = true; // Default: true
    options.StopOnFirstFailure = false;     // Default: false
    options.MaxConcurrency = 10;            // Process messages in parallel
});

Implementation Considerations

Batch Item Failure Reporting

For event sources that support batch item failures (SQS, Kinesis, DynamoDB Streams):

  • Automatically construct the batchItemFailures response array
  • Only include failed items (AWS re-drives based on this)
  • Handle the nuances of each event source's failure format

Error Handling Strategies

  1. Individual Handler Failures: Catch exceptions in individual message handlers, mark as failed, continue processing remaining messages
  2. Stop on First Failure: Optional mode to stop processing batch on first failure
  3. Batch-Level Failures: Allow handlers to throw to fail entire batch

Event Source Support

Initial Implementation:

  • SQS (MapSqsHandler<T>)
  • SNS (MapSnsHandler<T>)

Future Additions:

  • Kinesis Streams
  • DynamoDB Streams
  • EventBridge
  • S3 Events (batch of S3 event records)

Sync vs Async

Support both synchronous and asynchronous handler methods:

// Sync
lambda.MapSqsHandler<MyMessage>((message) => 
{
    _processor.ProcessSync(message);
    return true;
});

// Async
lambda.MapSqsHandler<MyMessage>(async (message, ct) => 
{
    await _processor.ProcessAsync(message, ct);
    return true;
});

Benefits

  1. Reduced Boilerplate: Eliminate repetitive batch processing and error handling code
  2. Better Testability: Test individual message handlers without batch processing logic
  3. Automatic Failure Handling: Built-in support for partial batch failures
  4. Type Safety: Strongly-typed message handling with compile-time checks
  5. Consistent Patterns: Unified API across different event sources
  6. Performance: Option for concurrent message processing within a batch
  7. Observability: Built-in integration with OpenTelemetry for per-message tracing

Technical Notes

  • Should integrate with existing envelope system (build on top of SqsEnvelope<T>, SnsEnvelope<T>)
  • Source generator support for optimal performance
  • Proper CancellationToken propagation (Lambda timeout handling)
  • Scoped DI container per message (optional, configurable)
  • Middleware support at batch and/or message level

Related

  • Existing envelope packages: AwsLambda.Host.Envelopes.Sqs, AwsLambda.Host.Envelopes.Sns
  • AWS Lambda batch item failure documentation
  • Similar patterns in other frameworks (e.g., MassTransit, NServiceBus)

Acceptance Criteria

  • API design reviewed and approved
  • SQS handler helper implemented with batch item failure support
  • SNS handler helper implemented with batch item failure support
  • Support for both sync and async handlers
  • Comprehensive unit tests
  • Integration tests with sample events
  • Documentation and examples
  • OpenTelemetry integration
  • Source generator support (if applicable)

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions