Skip to content

[Feature]: Change notifications / reactive state #157

@eneshoxha

Description

@eneshoxha

To tie Cortex.States and Cortex.Streams together even tighter, we are adding observable change feeds.

1. Change model

public enum StateChangeKind
{
    Added,
    Updated,
    Removed
}

public readonly record struct StateChange<TKey, TValue>(
    TKey Key,
    TValue? OldValue,
    TValue? NewValue,
    StateChangeKind Kind);

2. Observable store

public interface IObservableDataStore<TKey, TValue> : IDataStore<TKey, TValue>
{
    IObservable<StateChange<TKey, TValue>> Changes { get; }
}

InMemoryStateStore implementation:

  • Wrap Put / Remove to publish StateChange on a Subject<StateChange<…>>.
  • Lazy-create the subject when first subscribed to avoid overhead if unused.

Usage examples:

  • Streams operators can subscribe to changes and emit derived events.
  • ProjectionTables can incrementally update cached projections instead of recomputing on GetAll().

Async-friendly variant:

  • Allow a Channel<StateChange<…>> or extend to IAsyncEnumerable<StateChange<…>> ChangesAsync(CancellationToken) where supported.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or requestfeatureThis label is in use for minor version increments

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions