Skip to content

[Feature]: Asynchronous Processing Enhancements #153

@eneshoxha

Description

@eneshoxha

1 Full Async/Await Support

public interface IAsyncOperator
{
    Task ProcessAsync(object input, CancellationToken cancellationToken = default);
}

// Implement for all operators to enable true async pipelines
public class AsyncMapOperator<TInput, TOutput> : IAsyncOperator
{
    private readonly Func<TInput, Task<TOutput>> _asyncMapFunction;
    
    public async Task ProcessAsync(object input, CancellationToken cancellationToken)
    {
        var result = await _asyncMapFunction((TInput)input);
        await _nextOperator.ProcessAsync(result, cancellationToken);
    }
}

2 Backpressure Implementation

public class BackpressureConfig
{
    public int MaxQueueSize { get; set; } = 1000;
    public TimeSpan MaxWaitTime { get; set; } = TimeSpan.FromSeconds(5);
}

public interface IBackpressureOperator : IOperator
{
    BackpressureConfig BackpressureConfig { get; set; }
    int CurrentQueueSize { get; }
}

Most operators are synchronous; heavy operations (e.g., database writes, HTTP calls) inside map or sink functions can block the processing thread. The Task Asynchronous Programming (TAP) guide notes that asynchronous programming helps avoid performance bottlenecks and enhances responsiveness. To support asynchronous pipelines:

Provide MapAsync, FilterAsync, FlatMapAsync, SinkAsync and SourceAsync variants that accept Func<T,Task> or Func<T,ValueTask> delegates and await them. These operators should return Task and use await internally; this allows the call chain to release the thread while awaiting I/O.

Add backpressure support by letting operators return a Task that completes only when the next operator has finished processing. This can be achieved by chaining tasks and awaiting them sequentially.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or requestfeatureThis label is in use for minor version increments

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions