[DB-2019] Add persistent subscriptions support for secondary indexes#5580
[DB-2019] Add persistent subscriptions support for secondary indexes#5580alexeyzimarev wants to merge 17 commits intomasterfrom
Conversation
Review Summary by QodoAdd persistent subscriptions support for secondary indexes
WalkthroughsDescription• Adds persistent subscriptions support for secondary indexes through a new dedicated PersistentSubscriptionIndexService • Implements index-targeted subscription operations (Create/Update/Delete/Connect) with automatic cleanup on index deletion • Extends gRPC layer to detect index-prefix filters and route Create requests to the new index service • Reuses existing persistent subscription infrastructure (state machine, checkpoints, parking, ack/nack, consumer strategies) • Adds new client messages (ConnectToPersistentSubscriptionToIndex, CreatePersistentSubscriptionToIndex, UpdatePersistentSubscriptionToIndex, DeletePersistentSubscriptionToIndex) mirroring …ToAll structure • Implements PersistentSubscriptionIndexEventSource for TFPos-based event reading from indexes • Routes Delete/Update/Connect operations from main service to index service via forwarding logic • Extends PersistentSubscriptionStreamReader to support reading from index event sources • Includes comprehensive integration tests (4 test cases) and unit tests (19 tests total) • Provides design specification and detailed implementation plan documentation • Wire API remains unchanged; clients subscribe by targeting $all with index-name prefix filter Diagramflowchart LR
Client["Client Request"]
GrpcCreate["gRPC Create Handler"]
FilterDetect["FilterRouting Detection"]
MainService["PersistentSubscriptionService"]
IndexService["PersistentSubscriptionIndexService"]
EventSource["PersistentSubscriptionIndexEventSource"]
Reader["PersistentSubscriptionStreamReader"]
IndexBus["Index Events Bus"]
Client -->|Create with filter| GrpcCreate
GrpcCreate -->|Detect index prefix| FilterDetect
FilterDetect -->|Index found| IndexService
FilterDetect -->|No index| MainService
IndexService -->|Create event source| EventSource
EventSource -->|Read from index| Reader
Reader -->|Query index| IndexBus
IndexService -->|On SecondaryIndexDeleted| IndexService
File Changes1. src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionIndexService.cs
|
Code Review by Qodo
|
PR Review SummaryCritical Issues (3 found)
Important Issues (7 found)
Suggestions (6 found)
Strengths
Recommended ActionFix before merge (Critical + top Important):
Address soon (remaining Important): Consider later (Suggestions): 🤖 Generated with Claude Code |
| public CreatePersistentSubscriptionToIndex(Guid internalCorrId, Guid correlationId, IEnvelope envelope, | ||
| string groupName, string indexName, bool resolveLinkTos, TFPos startFrom, | ||
| int messageTimeoutMilliseconds, bool recordStatistics, int maxRetryCount, int bufferSize, | ||
| int liveBufferSize, int readBatchSize, | ||
| int checkPointAfterMilliseconds, int minCheckPointCount, int maxCheckPointCount, | ||
| int maxSubscriberCount, string namedConsumerStrategy, ClaimsPrincipal user, DateTime? expires = null) | ||
| : base(internalCorrId, correlationId, envelope, user, expires) { | ||
| ResolveLinkTos = resolveLinkTos; | ||
| GroupName = groupName; | ||
| IndexName = indexName; | ||
| StartFrom = startFrom; | ||
| MessageTimeoutMilliseconds = messageTimeoutMilliseconds; | ||
| RecordStatistics = recordStatistics; | ||
| MaxRetryCount = maxRetryCount; | ||
| BufferSize = bufferSize; | ||
| LiveBufferSize = liveBufferSize; | ||
| ReadBatchSize = readBatchSize; | ||
| MaxCheckPointCount = maxCheckPointCount; | ||
| MinCheckPointCount = minCheckPointCount; | ||
| CheckPointAfterMilliseconds = checkPointAfterMilliseconds; | ||
| MaxSubscriberCount = maxSubscriberCount; | ||
| NamedConsumerStrategy = namedConsumerStrategy; | ||
| } |
There was a problem hiding this comment.
1. ...toindex ctor lacks validation 📘 Rule violation ≡ Correctness
The new Create/Update/DeletePersistentSubscriptionToIndex messages assign required fields like groupName/indexName without Ensure.NotNullOrEmpty(...), allowing null/empty values to flow into the service and fail later in less obvious ways. This violates the requirement to fail explicitly rather than allowing silent invalid inputs for mandatory parameters.
Agent Prompt
## Issue description
`Create/Update/DeletePersistentSubscriptionToIndex` constructors do not validate required inputs (e.g., `groupName`, `indexName`, `namedConsumerStrategy`), allowing null/empty values to propagate and fail later.
## Issue Context
Other `ClientMessage` request types (e.g., `ConnectToPersistentSubscriptionToIndex`) use `Ensure.NotNullOrEmpty(...)` / `Ensure.Nonnegative(...)` to enforce invariants early.
## Fix Focus Areas
- src/KurrentDB.Core/Messages/ClientMessage.cs[1568-1590]
- src/KurrentDB.Core/Messages/ClientMessage.cs[1634-1656]
- src/KurrentDB.Core/Messages/ClientMessage.cs[1684-1689]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| } else if (eventSource.FromIndex) { | ||
| if (_mainQueue is null) | ||
| throw new InvalidOperationException("MainQueue publisher is required for index reads."); | ||
|
|
||
| var correlationId = Guid.NewGuid(); | ||
| var handler = new ResponseHandler(onEventsFound, onEventsSkipped, onError, skipFirstEvent); | ||
| _mainQueue.Publish(new ClientMessage.ReadIndexEventsForward( | ||
| internalCorrId: correlationId, | ||
| correlationId: correlationId, | ||
| envelope: new CallbackEnvelope(msg => handler.FetchIndexCompleted((ClientMessage.ReadIndexEventsForwardCompleted)msg)), | ||
| indexName: eventSource.IndexName, | ||
| commitPosition: startPosition.TFPosition.Commit, | ||
| preparePosition: startPosition.TFPosition.Prepare, | ||
| excludeStart: false, | ||
| maxCount: Math.Min(countToLoad, actualBatchSize), | ||
| requireLeader: false, | ||
| validationTfLastCommitPosition: null, | ||
| user: SystemAccounts.System, | ||
| replyOnExpired: false, | ||
| pool: null)); |
There was a problem hiding this comment.
2. Hardcoded requireleader: false 📘 Rule violation ⛨ Security
The new index read path publishes ReadIndexEventsForward with requireLeader: false and `user: SystemAccounts.System` hardcoded, rather than taking explicit caller-provided leadership and authorization context. This violates the requirement to pass requireLeader and ClaimsPrincipal explicitly to avoid incorrect privilege/behavior defaults.
Agent Prompt
## Issue description
The index read request hardcodes `requireLeader: false` and `user: SystemAccounts.System` instead of accepting explicit values from the caller.
## Issue Context
Compliance requires callers to explicitly supply leadership gating and authorization context to prevent accidental privilege/behavior changes.
## Fix Focus Areas
- src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionStreamReader.cs[99-118]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| private void SaveConfiguration(Action continueWith) { | ||
| // Re-read the full config, merge our index entries, then write it back. | ||
| // This avoids clobbering entries that belong to the main service. | ||
| _ioDispatcher.ReadBackward(SystemStreams.PersistentSubscriptionConfig, -1, 1, false, | ||
| SystemAccounts.System, | ||
| x => HandleSaveReadCompleted(continueWith, x), | ||
| expires: ClientMessage.ReadRequestMessage.NeverExpires); | ||
| } | ||
|
|
||
| private void HandleSaveReadCompleted(Action continueWith, | ||
| ClientMessage.ReadStreamEventsBackwardCompleted completed) { | ||
| PersistentSubscriptionConfig fullConfig; | ||
| switch (completed.Result) { | ||
| case ReadStreamResult.Success: | ||
| try { | ||
| fullConfig = PersistentSubscriptionConfig.FromSerializedForm(completed.Events[0].Event.Data); | ||
| } catch (Exception ex) { | ||
| Log.Error(ex, "Error reading config for merge during save."); | ||
| return; | ||
| } | ||
| break; | ||
| case ReadStreamResult.NoStream: | ||
| fullConfig = new PersistentSubscriptionConfig { Version = "2" }; | ||
| break; | ||
| default: | ||
| Log.Error("Unexpected result {result} reading config for merge during save.", completed.Result); | ||
| return; | ||
| } | ||
|
|
||
| // Remove all index entries from full config and replace with ours | ||
| fullConfig.Entries.RemoveAll(e => e.IndexName != null); | ||
| fullConfig.Entries.AddRange(_config.Entries); | ||
| fullConfig.Updated = _config.Updated; | ||
| fullConfig.UpdatedBy = _config.UpdatedBy; | ||
|
|
||
| WriteMergedConfig(fullConfig, continueWith); |
There was a problem hiding this comment.
3. Index config clobbered 🐞 Bug ≡ Correctness
PersistentSubscriptionIndexService.SaveConfiguration deletes all IndexName!=null entries from $persistentSubscriptionConfig and replaces them with its local _config.Entries, but bootstrap never populates _config with the forwarded entries, so the first write can silently drop other index subscriptions.
Agent Prompt
## Issue description
`PersistentSubscriptionIndexService` overwrites all index persistent subscription config entries on every save by removing all `IndexName != null` entries and re-adding only its local `_config.Entries`. Since bootstrap (`PersistentSubscriptionIndexEntriesLoaded`) does not populate `_config.Entries`, this can delete existing index subscriptions from `$persistentSubscriptionConfig`.
## Issue Context
This breaks persistence for index persistent subscriptions across restarts and can cause silent loss of unrelated index subscription configs when any create/update/delete triggers a save.
## Fix Focus Areas
- src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionIndexService.cs[69-120]
- src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionIndexService.cs[565-600]
- src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionIndexService.cs[513-533]
## What to change
- Ensure `_config.Entries` is initialized to the complete set of index entries (at minimum: set `_config.Entries = message.IndexEntries.ToList()` during bootstrap after validation, or call `LoadConfiguration` on leader start).
- Ensure create/update/delete mutate that same `_config.Entries` set, so the merge/write cannot drop other index entries.
- Consider guarding concurrent saves (serialize save operations) to avoid interleaving read/merge/write sequences.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| public void FetchIndexCompleted(ClientMessage.ReadIndexEventsForwardCompleted msg) { | ||
| switch (msg.Result) { | ||
| case ReadIndexResult.Success: | ||
| _onFetchCompleted( | ||
| _skipFirstEvent ? msg.Events.Skip(1).ToArray() : (IReadOnlyList<ResolvedEvent>)msg.Events, | ||
| new PersistentSubscriptionAllStreamPosition(msg.CurrentPos.CommitPosition, msg.CurrentPos.PreparePosition), | ||
| msg.IsEndOfStream); |
There was a problem hiding this comment.
6. Index reads never advance 🐞 Bug ≡ Correctness
PersistentSubscriptionStreamReader uses ReadIndexEventsForwardCompleted.CurrentPos as the next checkpoint for index reads, but CurrentPos is the request start position; this can cause repeated reads from the same position (duplicates/infinite catch-up).
Agent Prompt
## Issue description
For index reads, `PersistentSubscriptionStreamReader.ResponseHandler.FetchIndexCompleted` checkpoints using `msg.CurrentPos`, which is the *request start* position for index reads. This prevents the persistent subscription catch-up loop from advancing.
## Issue Context
Index read responses don’t expose `NextPos`; the correct next position must be derived from the last returned event’s `EventPosition` (same as `Enumerator.ReadIndex`).
## Fix Focus Areas
- src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionStreamReader.cs[99-118]
- src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionStreamReader.cs[210-227]
- src/KurrentDB.Core/Services/Transport/Enumerators/Enumerator.ReadIndex.cs[166-173]
## What to change
- In `FetchIndexCompleted(ReadIndexEventsForwardCompleted msg)`:
- If `msg.Events.Count > 0`, compute next TFPos from `msg.Events[^1].EventPosition` and use that for the returned `PersistentSubscriptionAllStreamPosition`.
- If `msg.Events.Count == 0`, keep the checkpoint at the existing start position.
- Consider passing `excludeStart: skipFirstEvent` (or equivalently derive next pos and set excludeStart=true on subsequent reads) to prevent duplicates.
- Add a regression test that a sequence of index reads advances the position and does not re-read the same page.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
There was a problem hiding this comment.
Pull request overview
This PR adds persistent subscription support for secondary indexes by introducing a dedicated PersistentSubscriptionIndexService that reuses the existing persistent subscription machinery, with gRPC Create routing to the new index-specific ClientMessage types and forwarding from the existing service for operations that don’t carry filter info.
Changes:
- Added
PersistentSubscriptionIndexService+PersistentSubscriptionIndexEventSourceand wiring inClusterVNodeto handle index lifecycle (committed/deleted) and subscription CRUD/connect. - Extended the persistent subscription event-source abstraction and stream reader to support index reads via
ReadIndexEventsForward. - Added gRPC Create-time routing helper (
FilterRouting) and integration test coverage for index persistent subscription create/delete scenarios.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/PersistentSubscriptionTests.cs | Adds integration tests for create/delete/duplicate/unknown-index behaviors. |
| src/KurrentDB.SecondaryIndexing.Tests/Fixtures/SecondaryIndexingFixture.cs | Adds fixture helpers for publishing/awaiting index persistent subscription create/delete messages. |
| src/KurrentDB.Core/Services/Transport/Grpc/PersistentSubscriptions.Create.cs | Routes Create($all + index-prefix filter) to CreatePersistentSubscriptionToIndex and handles its completion. |
| src/KurrentDB.Core/Services/Transport/Grpc/FilterRouting.cs | Adds shared helper for detecting index-targeted stream-identifier prefix filters. |
| src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionToIndexParamsBuilder.cs | Adds a params builder for index persistent subscriptions. |
| src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionStreamReader.cs | Adds FromIndex branch using ReadIndexEventsForward. |
| src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionIndexEventSource.cs | Implements IPersistentSubscriptionEventSource for index reads/checkpointing. |
| src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionIndexService.cs | New service handling index persistent subscription lifecycle + index committed/deleted messages. |
| src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionService.cs | Forwards Update/Delete/Connect($all) to index service when group belongs to an index entry; forwards index entries on bootstrap. |
| src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionConfig.cs | Adds PersistentSubscriptionEntry.IndexName to persist index subscriptions. |
| src/KurrentDB.Core/Services/PersistentSubscription/IPersistentSubscriptionEventSource.cs | Extends the abstraction with FromIndex and IndexName. |
| src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionAllStreamEventSource.cs | Implements new interface members for $all event source. |
| src/KurrentDB.Core/Services/PersistentSubscription/PersistentSubscriptionSingleStreamEventSource.cs | Implements new interface members for single-stream event source. |
| src/KurrentDB.Core/Messages/SubscriptionMessage.cs | Adds PersistentSubscriptionIndexEntriesLoaded for bootstrapping the index service. |
| src/KurrentDB.Core/Messages/ClientMessage.cs | Adds new …ToIndex message types for create/update/delete/connect and their completions. |
| src/KurrentDB.Core/ClusterVNode.cs | Wires index persistent subscriptions onto the per-subscription bus and main bus forwarding. |
| docs/superpowers/specs/2026-04-13-persistent-subscriptions-secondary-indexes-design.md | Adds design/spec documentation. |
| docs/superpowers/plans/2026-04-13-persistent-subscriptions-secondary-indexes.md | Adds implementation plan documentation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| indexName: eventSource.IndexName, | ||
| commitPosition: startPosition.TFPosition.Commit, | ||
| preparePosition: startPosition.TFPosition.Prepare, | ||
| excludeStart: false, |
There was a problem hiding this comment.
Index reads always publish ReadIndexEventsForward with excludeStart=false, but the code later uses CurrentPos as the next checkpoint. Since ReadIndexEventsForwardCompleted.CurrentPos is the requested position (see ClientMessage.IndexReads), this will cause the reader to re-read the same events repeatedly. Consider advancing based on the last returned event position and setting excludeStart=true for subsequent pages / when skipFirstEvent is in play (similar to the index enumerators).
| excludeStart: false, | |
| excludeStart: true, |
| public void FetchIndexCompleted(ClientMessage.ReadIndexEventsForwardCompleted msg) { | ||
| switch (msg.Result) { | ||
| case ReadIndexResult.Success: | ||
| _onFetchCompleted( | ||
| _skipFirstEvent ? msg.Events.Skip(1).ToArray() : (IReadOnlyList<ResolvedEvent>)msg.Events, | ||
| new PersistentSubscriptionAllStreamPosition(msg.CurrentPos.CommitPosition, msg.CurrentPos.PreparePosition), |
There was a problem hiding this comment.
On successful index reads, the next position passed to _onFetchCompleted is built from msg.CurrentPos. For ReadIndexEventsForwardCompleted this is the request start position, not the position after the last returned event, so the subscription will not progress. Compute the next position from the last event’s EventPosition/OriginalPosition (and handle the empty-events case) so checkpointing and subsequent reads advance correctly.
| public void FetchIndexCompleted(ClientMessage.ReadIndexEventsForwardCompleted msg) { | |
| switch (msg.Result) { | |
| case ReadIndexResult.Success: | |
| _onFetchCompleted( | |
| _skipFirstEvent ? msg.Events.Skip(1).ToArray() : (IReadOnlyList<ResolvedEvent>)msg.Events, | |
| new PersistentSubscriptionAllStreamPosition(msg.CurrentPos.CommitPosition, msg.CurrentPos.PreparePosition), | |
| private static PersistentSubscriptionAllStreamPosition GetNextIndexPosition( | |
| ClientMessage.ReadIndexEventsForwardCompleted msg) { | |
| if (msg.Events.Count > 0) { | |
| var lastEvent = msg.Events[msg.Events.Count - 1]; | |
| if (lastEvent.EventPosition is { } eventPosition) { | |
| return new PersistentSubscriptionAllStreamPosition( | |
| eventPosition.CommitPosition, | |
| eventPosition.PreparePosition); | |
| } | |
| if (lastEvent.OriginalPosition is { } originalPosition) { | |
| return new PersistentSubscriptionAllStreamPosition( | |
| originalPosition.CommitPosition, | |
| originalPosition.PreparePosition); | |
| } | |
| } | |
| return new PersistentSubscriptionAllStreamPosition( | |
| msg.CurrentPos.CommitPosition, | |
| msg.CurrentPos.PreparePosition); | |
| } | |
| public void FetchIndexCompleted(ClientMessage.ReadIndexEventsForwardCompleted msg) { | |
| switch (msg.Result) { | |
| case ReadIndexResult.Success: | |
| _onFetchCompleted( | |
| _skipFirstEvent ? msg.Events.Skip(1).ToArray() : (IReadOnlyList<ResolvedEvent>)msg.Events, | |
| GetNextIndexPosition(msg), |
| _ioDispatcher.ConfigureStreamAndWriteEvents(SystemStreams.PersistentSubscriptionConfig, | ||
| ExpectedVersion.Any, streamMetadata, events, SystemAccounts.System, | ||
| x => HandleWriteCompleted(continueWith, x)); |
There was a problem hiding this comment.
WriteMergedConfig uses ExpectedVersion.Any when rewriting $persistentSubscriptionConfig. With two services now capable of writing this stream (PersistentSubscriptionService and PersistentSubscriptionIndexService), this can silently drop updates (last writer wins) even with the read/merge step. Consider using optimistic concurrency (read expected version + retry on WrongExpectedVersion) or funnel all config writes through a single service/queue to avoid lost updates.
| foreach (var entry in _config.Entries) { | ||
| if (entry.IndexName != null) { | ||
| indexEntries.Add(entry); | ||
| continue; | ||
| } | ||
|
|
There was a problem hiding this comment.
Index entries are collected into indexEntries but remain in _config.Entries. Since PersistentSubscriptionService.SaveConfiguration serializes and writes _config as-is, any later save from the main service can overwrite changes made by PersistentSubscriptionIndexService (stale index entries clobber merged config). Consider removing index entries from _config after partitioning (or ensuring SaveConfiguration excludes them) so the index service is the only writer of index entries.
| foreach (var entry in _config.Entries) { | |
| if (entry.IndexName != null) { | |
| indexEntries.Add(entry); | |
| continue; | |
| } | |
| var nonIndexEntries = new List<PersistentSubscriptionEntry>(); | |
| foreach (var entry in _config.Entries.ToList()) { | |
| if (entry.IndexName != null) { | |
| indexEntries.Add(entry); | |
| continue; | |
| } | |
| nonIndexEntries.Add(entry); | |
| } | |
| _config.Entries = nonIndexEntries; | |
| foreach (var entry in _config.Entries) { |
| }, | ||
| (error) => { | ||
| // Before reporting DoesNotExist, check if the group belongs to an index subscription. | ||
| var indexEntry = _config.Entries.FirstOrDefault(e => e.IndexName != null && e.Group == message.GroupName); |
There was a problem hiding this comment.
Forwarding to the index service uses FirstOrDefault(e => e.IndexName != null && e.Group == GroupName). If the same group name exists on multiple index subscriptions, this will route Update($all, group) to an arbitrary index. Consider enforcing group-name uniqueness across index subscriptions, or detecting multiple matches and failing with a clear error instead of picking the first entry.
| var indexEntry = _config.Entries.FirstOrDefault(e => e.IndexName != null && e.Group == message.GroupName); | |
| var matchingIndexEntries = _config.Entries | |
| .Where(e => e.IndexName != null && e.Group == message.GroupName) | |
| .Take(2) | |
| .ToArray(); | |
| if (matchingIndexEntries.Length > 1) { | |
| message.Envelope.ReplyWith(new ClientMessage.UpdatePersistentSubscriptionToAllCompleted( | |
| message.CorrelationId, | |
| ClientMessage.UpdatePersistentSubscriptionToAllCompleted.UpdatePersistentSubscriptionToAllResult.Fail, | |
| $"Multiple index subscriptions exist for group '{message.GroupName}'. Group names must be unique across index subscriptions.")); | |
| return; | |
| } | |
| var indexEntry = matchingIndexEntries.SingleOrDefault(); |
| var indexEntry = _config.Entries.FirstOrDefault(e => e.IndexName != null && e.Group == message.GroupName); | ||
| if (indexEntry != null) { |
There was a problem hiding this comment.
Forwarding to the index service uses FirstOrDefault(e => e.IndexName != null && e.Group == GroupName). If the same group name exists on multiple index subscriptions, this Delete($all, group) will delete an arbitrary one. Consider enforcing group-name uniqueness across index subscriptions, or detecting multiple matches and failing with a clear error instead of picking the first entry.
| var indexEntry = _config.Entries.FirstOrDefault(e => e.IndexName != null && e.Group == message.GroupName); | ||
| if (indexEntry != null) { |
There was a problem hiding this comment.
Forwarding to the index service uses FirstOrDefault(e => e.IndexName != null && e.Group == GroupName). If the same group name exists on multiple index subscriptions, Connect($all, group) will connect to an arbitrary index subscription. Consider enforcing group-name uniqueness across index subscriptions, or detecting multiple matches and failing with a clear error instead of picking the first entry.
| var indexEntry = _config.Entries.FirstOrDefault(e => e.IndexName != null && e.Group == message.GroupName); | |
| if (indexEntry != null) { | |
| var indexEntries = _config.Entries | |
| .Where(e => e.IndexName != null && e.Group == message.GroupName) | |
| .Take(2) | |
| .ToArray(); | |
| if (indexEntries.Length > 1) { | |
| Log.Error( | |
| "Ambiguous persistent subscription group '{groupName}' matched multiple index subscriptions while connecting to $all.", | |
| message.GroupName); | |
| message.Envelope.ReplyWith(new ClientMessage.SubscriptionDropped( | |
| message.CorrelationId, | |
| SubscriptionDropReason.NotFound)); | |
| return ValueTask.CompletedTask; | |
| } | |
| if (indexEntries.Length == 1) { | |
| var indexEntry = indexEntries[0]; |
| if (prefixes is not { Count: 1 }) | ||
| return false; | ||
|
|
||
| var candidate = prefixes[0]; | ||
| if (!SystemStreams.IsIndexStream(candidate)) |
There was a problem hiding this comment.
TryGetIndexName returns false when prefixes.Count != 1, even if one of the prefixes is an index stream id. This differs from Streams.Read.cs, which throws InvalidArgument when an index name is combined with other prefixes. Consider matching that behavior here (throw when an index prefix is present alongside others) so index-targeted requests don’t silently fall back to normal $all filtering.
|
|
||
| foreach (var (stream, subscriptions) in _subscriptionTopics) { | ||
| // The stream key is in the format "$index-{indexName}"; the regex matches against the raw index name. | ||
| if (!message.StreamIdRegex.IsMatch(stream)) |
There was a problem hiding this comment.
SecondaryIndexDeleted provides a regex that matches the actual index stream ids (e.g. "$idx-…"), but _subscriptionTopics keys are "$index-{indexName}". As written, IsMatch(stream) will never match and index subscriptions/config entries won’t be cleaned up on index deletion. Match against the raw index name (e.g. strip the "$index-" prefix or store topics keyed by message.IndexName) before applying the regex.
| if (!message.StreamIdRegex.IsMatch(stream)) | |
| const string indexStreamPrefix = "$index-"; | |
| var indexName = stream.StartsWith(indexStreamPrefix, StringComparison.Ordinal) | |
| ? stream[indexStreamPrefix.Length..] | |
| : stream; | |
| if (!message.StreamIdRegex.IsMatch(indexName)) |
…e to ClusterVNode New service handles create/update/delete/connect for index subscriptions, reacts to SecondaryIndexCommitted for live events and SecondaryIndexDeleted for cleanup. Existing service partitions config entries and forwards index entries to the new service on bootstrap. Includes prerequisite plumbing: ClientMessage types for index persistent subscriptions, IPersistentSubscriptionEventSource.FromIndex/IndexName, PersistentSubscriptionIndexEventSource, PersistentSubscriptionToIndexParamsBuilder, IndexName field on PersistentSubscriptionEntry, and reader FromIndex branch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Create detects index-prefix filter at gRPC layer and publishes ToIndex messages directly. Delete, Update, and Connect use forwarding from the existing PersistentSubscriptionService when the group belongs to an index subscription (detected via config entries with IndexName). Adds FilterRouting static helper for extracting index names from stream-identifier prefix filters. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…scriptions Tests create/delete lifecycle and unknown-index rejection using the existing SecondaryIndexingFixture. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The forwarding envelopes for Update/Delete had an if with no else — if a NotHandled or unexpected message arrived, the client would hang forever with no reply. Now replies with Fail on unexpected messages. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The CallbackEnvelope for ReadIndexEventsForward used a hard cast with no type check. A NotHandled message would crash with InvalidCastException. Now routes unexpected messages to the onError callback. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…eams SecondaryIndexDeleted handler called Shutdown() but not Delete(), leaving checkpoint and parked-message streams orphaned. The explicit Delete handler already called Delete() correctly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Create/Connect/etc. could be processed before bootstrap completes or after shutdown. Now guards all handlers with a _started flag set after PersistentSubscriptionIndexEntriesLoaded and cleared on shutdown. Also clears subscription dictionaries on shutdown. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
HandleSaveReadCompleted returned silently on error paths without calling continueWith, causing the client to hang forever with no response. Now calls continueWith so the caller gets a reply. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
These methods were never called — the index service bootstraps via PersistentSubscriptionIndexEntriesLoaded from the main service, not by reading config directly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ted regex mismatch IndexName from FilterRouting is already the full index stream name (e.g. $idx-all). Adding $index- created $index-$idx-all — a double prefix that broke SecondaryIndexDeleted regex matching (regex matches $idx-... but topic keys were $index-$idx-...) and produced wrong subscription IDs. Now uses the raw index name throughout. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Two issues fixed: 1. Main service's SaveConfiguration now excludes index entries from writes — prevents clobbering index service changes. Index entries are kept in memory for forwarding lookups. 2. Index service notifies main service via PersistentSubscriptionIndexEntryChanged when subscriptions are created or deleted, keeping the forwarding lookup current for subscriptions created after startup. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
If the same group name exists on multiple index subscriptions, forwarding Update/Delete/Connect via the $all API would silently pick the first match. Now detects multiple matches and fails with a clear error message directing the user to the index-specific API. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ind enum The FromStream/FromAll/FromIndex booleans on IPersistentSubscriptionEventSource had no exhaustiveness checking and grew linearly with new variants. Now uses EventSourceKind enum with a switch expression in the reader. The booleans remain as default interface implementations for backward compatibility. Also seals all three event source implementations. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…d ParamsBuilder Prevents unintended inheritance and enables JIT devirtualization. Event source classes were already sealed in the previous commit. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per CLAUDE.md log level policy: recoverable errors and degraded states should use Warning, not Information. A timeout that triggers a retry is a recoverable error. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…s flow Replace CRUD-only tests with functional tests that prove the feature works end-to-end: - ReceivesEventsFromDefaultIndex: write events → wait for indexing → create subscription → connect → verify events arrive and ack - ReceivesLiveEventsAfterCatchUp: create subscription from end → connect → write events → verify live events arrive Also adds ConnectToPersistentSubscriptionToIndex fixture helper using ContinuationEnvelope + Channel pattern. Fixes _started guard to not block CRUD operations during async config load, and always publishes PersistentSubscriptionIndexEntriesLoaded even when empty. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
7788147 to
58000d7
Compare
FilterRouting.TryGetIndexName silently returned false when multiple prefixes included an index name, falling back to $all filtering. Now throws InvalidArgument matching the behavior of Streams.Read.cs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
PersistentSubscriptionIndexService— a sibling service toPersistentSubscriptionServicethat handles persistent subscriptions targeting secondary indexes…ToIndexmessages; Delete/Update/Connect use forwarding from the existing servicePersistentSubscriptionstate machine, checkpoint reader/writer, parking, ack/nack, and consumer strategiesSecondaryIndexDeleted, active connections are dropped and the subscription config is removedKey design decisions
$allwith aStreamIdentifierfilter whose single prefix is an index name (same convention as catch-up subscriptions)PersistentSubscriptionIndexServiceis a sibling on the sameperSubscrBus, cleanly isolating index lifecycle from stream/all subscriptionsNew files
PersistentSubscriptionIndexService.cs— handles Create/Update/Delete/Connect, SecondaryIndexCommitted (live events), SecondaryIndexDeleted (cleanup), TimerTick, lifecyclePersistentSubscriptionIndexEventSource.cs—IPersistentSubscriptionEventSourcefor indexes (TFPos-based, no filter)PersistentSubscriptionToIndexParamsBuilder.cs— factory for index subscription paramsFilterRouting.cs— shared helper extracted fromStreams.Read.csfor detecting index-prefix filtersTest plan
PersistentSubscriptionIndexEventSourceTests(11 tests),FilterRoutingTests(8 tests)PersistentSubscriptionTests— create/delete lifecycle, unknown index rejection, duplicate detection, non-existent delete (4 tests)🤖 Generated with Claude Code