Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
51fbf19
feat(persistent-subs): add PersistentSubscriptionIndexService and wir…
alexeyzimarev Apr 14, 2026
7cf11bb
feat(persistent-subs): add gRPC filter hijack for index subscriptions
alexeyzimarev Apr 14, 2026
f0951a7
test(persistent-subs): add integration tests for index persistent sub…
alexeyzimarev Apr 14, 2026
99a0608
fix(persistent-subs): add else branches to translating envelopes
alexeyzimarev Apr 14, 2026
2720841
fix(persistent-subs): add type-check before cast in index read callback
alexeyzimarev Apr 14, 2026
26c7409
fix(persistent-subs): call Delete() on index deletion to clean up str…
alexeyzimarev Apr 14, 2026
43bb41b
fix(persistent-subs): add _started lifecycle guard to index service
alexeyzimarev Apr 14, 2026
d0fe754
fix(persistent-subs): call continueWith on save-path errors
alexeyzimarev Apr 14, 2026
5426088
fix(persistent-subs): remove dead LoadConfiguration/HandleLoadCompleted
alexeyzimarev Apr 14, 2026
9ad4a96
fix(persistent-subs): remove $index- prefix to fix SecondaryIndexDele…
alexeyzimarev Apr 14, 2026
9a1deab
fix(persistent-subs): fix stale-config forwarding and save clobbering
alexeyzimarev Apr 14, 2026
46407ef
fix(persistent-subs): detect ambiguous group names in forwarding
alexeyzimarev Apr 14, 2026
13ca4cf
refactor(persistent-subs): replace boolean tristate with EventSourceK…
alexeyzimarev Apr 14, 2026
30848c4
refactor(persistent-subs): seal PersistentSubscriptionIndexService an…
alexeyzimarev Apr 14, 2026
a687d09
fix(persistent-subs): use Warning log level for config save timeout
alexeyzimarev Apr 14, 2026
58000d7
test(persistent-subs): add end-to-end integration tests proving event…
alexeyzimarev Apr 14, 2026
513f448
fix(persistent-subs): throw on multi-prefix filter with index name
alexeyzimarev Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/KurrentDB.Core/ClusterVNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,28 @@ GossipAdvertiseInfo GetGossipAdvertiseInfo() {
perSubscrBus.Subscribe<SubscriptionMessage.PersistentSubscriptionTimerTick>(persistentSubscription);
perSubscrBus.Subscribe<SubscriptionMessage.PersistentSubscriptionsRestart>(persistentSubscription);

// PERSISTENT SUBSCRIPTIONS TO INDEX
_mainBus.Subscribe<StorageMessage.SecondaryIndexCommitted>(perSubscrQueue);
_mainBus.Subscribe<StorageMessage.SecondaryIndexDeleted>(perSubscrQueue);
_mainBus.Subscribe<ClientMessage.CreatePersistentSubscriptionToIndex>(perSubscrQueue);
_mainBus.Subscribe<ClientMessage.UpdatePersistentSubscriptionToIndex>(perSubscrQueue);
_mainBus.Subscribe<ClientMessage.DeletePersistentSubscriptionToIndex>(perSubscrQueue);
_mainBus.Subscribe<ClientMessage.ConnectToPersistentSubscriptionToIndex>(perSubscrQueue);

var persistentSubscriptionIndex = new PersistentSubscriptionIndexService(
perSubscrQueue, psubDispatcher, _mainQueue, consumerStrategyRegistry, secondaryIndexReaders);
perSubscrBus.Subscribe<SubscriptionMessage.PersistentSubscriptionIndexEntriesLoaded>(persistentSubscriptionIndex);
perSubscrBus.Subscribe<SubscriptionMessage.PersistentSubscriptionIndexEntryChanged>(persistentSubscription);
perSubscrBus.Subscribe<ClientMessage.CreatePersistentSubscriptionToIndex>(persistentSubscriptionIndex);
perSubscrBus.Subscribe<ClientMessage.UpdatePersistentSubscriptionToIndex>(persistentSubscriptionIndex);
perSubscrBus.Subscribe<ClientMessage.DeletePersistentSubscriptionToIndex>(persistentSubscriptionIndex);
perSubscrBus.Subscribe<ClientMessage.ConnectToPersistentSubscriptionToIndex>(persistentSubscriptionIndex);
perSubscrBus.Subscribe<StorageMessage.SecondaryIndexCommitted>(persistentSubscriptionIndex);
perSubscrBus.Subscribe<StorageMessage.SecondaryIndexDeleted>(persistentSubscriptionIndex);
perSubscrBus.Subscribe<SubscriptionMessage.PersistentSubscriptionTimerTick>(persistentSubscriptionIndex);
perSubscrBus.Subscribe<SystemMessage.BecomeShuttingDown>(persistentSubscriptionIndex);
perSubscrBus.Subscribe<SystemMessage.StateChangeMessage>(persistentSubscriptionIndex);

// STORAGE SCAVENGER
var scavengerDispatcher = new IODispatcher(_mainQueue, _mainQueue);
_mainBus.Subscribe<ClientMessage.ReadStreamEventsBackwardCompleted>(scavengerDispatcher.BackwardReader);
Expand Down
188 changes: 188 additions & 0 deletions src/KurrentDB.Core/Messages/ClientMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,194 @@ public enum DeletePersistentSubscriptionToAllResult {
}
}

// Index persistent subscriptions

[DerivedMessage(CoreMessage.Client)]
public partial class ConnectToPersistentSubscriptionToIndex : ReadRequestMessage {
public readonly Guid ConnectionId;
public readonly string ConnectionName;
public readonly string GroupName;
public readonly string IndexName;
public readonly int AllowedInFlightMessages;
public readonly string From;

public ConnectToPersistentSubscriptionToIndex(Guid internalCorrId, Guid correlationId, IEnvelope envelope,
Guid connectionId, string connectionName, string groupName, string indexName,
int allowedInFlightMessages, string from, ClaimsPrincipal user, DateTime? expires = null)
: base(internalCorrId, correlationId, envelope, user, expires) {
GroupName = Ensure.NotNullOrEmpty(groupName);
IndexName = Ensure.NotNullOrEmpty(indexName);
ConnectionId = Ensure.NotEmptyGuid(connectionId);
ConnectionName = connectionName;
AllowedInFlightMessages = Ensure.Nonnegative(allowedInFlightMessages);
From = from;
}
}

[DerivedMessage(CoreMessage.Client)]
public partial class CreatePersistentSubscriptionToIndex : ReadRequestMessage {
public readonly TFPos StartFrom;
public readonly int MessageTimeoutMilliseconds;
public readonly bool RecordStatistics;

public readonly bool ResolveLinkTos;
public readonly int MaxRetryCount;
public readonly int BufferSize;
public readonly int LiveBufferSize;
public readonly int ReadBatchSize;

public readonly string GroupName;
public readonly string IndexName;
public readonly int MaxSubscriberCount;
public readonly string NamedConsumerStrategy;
public readonly int MaxCheckPointCount;
public readonly int MinCheckPointCount;
public readonly int CheckPointAfterMilliseconds;

public CreatePersistentSubscriptionToIndex(Guid internalCorrId, Guid correlationId, IEnvelope envelope,
string groupName, string indexName, bool resolveLinkTos, TFPos startFrom,
int messageTimeoutMilliseconds, bool recordStatistics, int maxRetryCount, int bufferSize,
int liveBufferSize, int readBatchSize,
int checkPointAfterMilliseconds, int minCheckPointCount, int maxCheckPointCount,
int maxSubscriberCount, string namedConsumerStrategy, ClaimsPrincipal user, DateTime? expires = null)
: base(internalCorrId, correlationId, envelope, user, expires) {
ResolveLinkTos = resolveLinkTos;
GroupName = groupName;
IndexName = indexName;
StartFrom = startFrom;
MessageTimeoutMilliseconds = messageTimeoutMilliseconds;
RecordStatistics = recordStatistics;
MaxRetryCount = maxRetryCount;
BufferSize = bufferSize;
LiveBufferSize = liveBufferSize;
ReadBatchSize = readBatchSize;
MaxCheckPointCount = maxCheckPointCount;
MinCheckPointCount = minCheckPointCount;
CheckPointAfterMilliseconds = checkPointAfterMilliseconds;
MaxSubscriberCount = maxSubscriberCount;
NamedConsumerStrategy = namedConsumerStrategy;
}
Comment on lines +1568 to +1590
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. ...toindex ctor lacks validation 📘 Rule violation ≡ Correctness

The new Create/Update/DeletePersistentSubscriptionToIndex messages assign required fields like
groupName/indexName without Ensure.NotNullOrEmpty(...), allowing null/empty values to flow
into the service and fail later in less obvious ways. This violates the requirement to fail
explicitly rather than allowing silent invalid inputs for mandatory parameters.
Agent Prompt
## Issue description
`Create/Update/DeletePersistentSubscriptionToIndex` constructors do not validate required inputs (e.g., `groupName`, `indexName`, `namedConsumerStrategy`), allowing null/empty values to propagate and fail later.

## Issue Context
Other `ClientMessage` request types (e.g., `ConnectToPersistentSubscriptionToIndex`) use `Ensure.NotNullOrEmpty(...)` / `Ensure.Nonnegative(...)` to enforce invariants early.

## Fix Focus Areas
- src/KurrentDB.Core/Messages/ClientMessage.cs[1568-1590]
- src/KurrentDB.Core/Messages/ClientMessage.cs[1634-1656]
- src/KurrentDB.Core/Messages/ClientMessage.cs[1684-1689]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

}

[DerivedMessage(CoreMessage.Client)]
public partial class CreatePersistentSubscriptionToIndexCompleted : ReadResponseMessage {
public readonly Guid CorrelationId;
public readonly string Reason;
public readonly CreatePersistentSubscriptionToIndexResult Result;

public CreatePersistentSubscriptionToIndexCompleted(Guid correlationId, CreatePersistentSubscriptionToIndexResult result, string reason) {
CorrelationId = Ensure.NotEmptyGuid(correlationId);
Result = result;
Reason = reason;
}

public enum CreatePersistentSubscriptionToIndexResult {
Success = 0,
AlreadyExists = 1,
Fail = 2,
AccessDenied = 3
}
}

[DerivedMessage(CoreMessage.Client)]
public partial class UpdatePersistentSubscriptionToIndex : ReadRequestMessage {
public readonly TFPos StartFrom;
public readonly int MessageTimeoutMilliseconds;
public readonly bool RecordStatistics;

public readonly bool ResolveLinkTos;
public readonly int MaxRetryCount;
public readonly int BufferSize;
public readonly int LiveBufferSize;
public readonly int ReadBatchSize;

public readonly string GroupName;
public readonly string IndexName;
public readonly int MaxSubscriberCount;

public readonly int MaxCheckPointCount;
public readonly int MinCheckPointCount;
public readonly int CheckPointAfterMilliseconds;
public readonly string NamedConsumerStrategy;

public UpdatePersistentSubscriptionToIndex(Guid internalCorrId, Guid correlationId, IEnvelope envelope,
string groupName, string indexName, bool resolveLinkTos, TFPos startFrom,
int messageTimeoutMilliseconds, bool recordStatistics, int maxRetryCount, int bufferSize,
int liveBufferSize, int readBatchSize, int checkPointAfterMilliseconds, int minCheckPointCount,
int maxCheckPointCount, int maxSubscriberCount, string namedConsumerStrategy, ClaimsPrincipal user,
DateTime? expires = null)
: base(internalCorrId, correlationId, envelope, user, expires) {
ResolveLinkTos = resolveLinkTos;
GroupName = groupName;
IndexName = indexName;
StartFrom = startFrom;
MessageTimeoutMilliseconds = messageTimeoutMilliseconds;
RecordStatistics = recordStatistics;
MaxRetryCount = maxRetryCount;
BufferSize = bufferSize;
LiveBufferSize = liveBufferSize;
ReadBatchSize = readBatchSize;
MaxCheckPointCount = maxCheckPointCount;
MinCheckPointCount = minCheckPointCount;
CheckPointAfterMilliseconds = checkPointAfterMilliseconds;
MaxSubscriberCount = maxSubscriberCount;
NamedConsumerStrategy = namedConsumerStrategy;
}
}

[DerivedMessage(CoreMessage.Client)]
public partial class UpdatePersistentSubscriptionToIndexCompleted : ReadResponseMessage {
public readonly Guid CorrelationId;
public readonly string Reason;
public readonly UpdatePersistentSubscriptionToIndexResult Result;

public UpdatePersistentSubscriptionToIndexCompleted(Guid correlationId, UpdatePersistentSubscriptionToIndexResult result, string reason) {
CorrelationId = Ensure.NotEmptyGuid(correlationId);
Result = result;
Reason = reason;
}

public enum UpdatePersistentSubscriptionToIndexResult {
Success = 0,
DoesNotExist = 1,
Fail = 2,
AccessDenied = 3
}
}

[DerivedMessage(CoreMessage.Client)]
public partial class DeletePersistentSubscriptionToIndex : ReadRequestMessage {
public readonly string GroupName;
public readonly string IndexName;

public DeletePersistentSubscriptionToIndex(Guid internalCorrId, Guid correlationId, IEnvelope envelope,
string indexName, string groupName, ClaimsPrincipal user, DateTime? expires = null)
: base(internalCorrId, correlationId, envelope, user, expires) {
GroupName = groupName;
IndexName = indexName;
}
}

[DerivedMessage(CoreMessage.Client)]
public partial class DeletePersistentSubscriptionToIndexCompleted : ReadResponseMessage {
public readonly Guid CorrelationId;
public readonly string Reason;
public readonly DeletePersistentSubscriptionToIndexResult Result;

public DeletePersistentSubscriptionToIndexCompleted(Guid correlationId, DeletePersistentSubscriptionToIndexResult result, string reason) {
CorrelationId = Ensure.NotEmptyGuid(correlationId);
Result = result;
Reason = reason;
}

public enum DeletePersistentSubscriptionToIndexResult {
Success = 0,
DoesNotExist = 1,
Fail = 2,
AccessDenied = 3
}
}

[DerivedMessage(CoreMessage.Client)]
public partial class PersistentSubscriptionAckEvents : ReadRequestMessage {
public readonly string SubscriptionId;
Expand Down
26 changes: 26 additions & 0 deletions src/KurrentDB.Core/Messages/SubscriptionMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using System;
using System.Collections.Generic;
using KurrentDB.Core.Messaging;
using KurrentDB.Core.Services;
using KurrentDB.Core.Services.PersistentSubscription;

namespace KurrentDB.Core.Messages;

Expand Down Expand Up @@ -80,4 +82,28 @@ public partial class PersistentSubscriptionsStarted : Message {
[DerivedMessage(CoreMessage.Subscription)]
public partial class PersistentSubscriptionsStopped : Message {
}

[DerivedMessage(CoreMessage.Subscription)]
public partial class PersistentSubscriptionIndexEntriesLoaded : Message {
public readonly IReadOnlyList<PersistentSubscriptionEntry> IndexEntries;

public PersistentSubscriptionIndexEntriesLoaded(IReadOnlyList<PersistentSubscriptionEntry> indexEntries) {
IndexEntries = indexEntries;
}
}

/// <summary>
/// Notifies the main PersistentSubscriptionService that an index subscription entry
/// was created or deleted, so it can update its config for forwarding lookups.
/// </summary>
[DerivedMessage(CoreMessage.Subscription)]
public partial class PersistentSubscriptionIndexEntryChanged : Message {
public readonly PersistentSubscriptionEntry Entry;
public readonly bool IsDelete;

public PersistentSubscriptionIndexEntryChanged(PersistentSubscriptionEntry entry, bool isDelete) {
Entry = entry;
IsDelete = isDelete;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,19 @@

namespace KurrentDB.Core.Services.PersistentSubscription;

public enum EventSourceKind {
Stream,
All,
Index
}

public interface IPersistentSubscriptionEventSource {
bool FromStream { get; }
EventSourceKind Kind { get; }
bool FromStream => Kind == EventSourceKind.Stream;
bool FromAll => Kind == EventSourceKind.All;
bool FromIndex => Kind == EventSourceKind.Index;
string EventStreamId { get; }
bool FromAll { get; }
string IndexName { get; }
string ToString();
IPersistentSubscriptionStreamPosition StreamStartPosition { get; }
IPersistentSubscriptionStreamPosition GetStreamPositionFor(ResolvedEvent @event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

namespace KurrentDB.Core.Services.PersistentSubscription;

public class PersistentSubscriptionAllStreamEventSource : IPersistentSubscriptionEventSource {
public bool FromStream => false;
public sealed class PersistentSubscriptionAllStreamEventSource : IPersistentSubscriptionEventSource {
public EventSourceKind Kind => EventSourceKind.All;
public string EventStreamId => throw new InvalidOperationException();
public bool FromAll => true;
public string IndexName => throw new InvalidOperationException();
public override string ToString() => SystemStreams.AllStream;
public IEventFilter EventFilter { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public BadConfigDataException(string message, Exception inner) : base(message, i
public class PersistentSubscriptionEntry {
public string Stream;
public string Group;
public string IndexName;
public EventFilter.EventFilterDto Filter;
public bool ResolveLinkTos;
public bool ExtraStatistics;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using System;
using System.Diagnostics;
using KurrentDB.Core.Data;
using KurrentDB.Core.Services.Storage.ReaderIndex;

namespace KurrentDB.Core.Services.PersistentSubscription;

public sealed class PersistentSubscriptionIndexEventSource : IPersistentSubscriptionEventSource {
public EventSourceKind Kind => EventSourceKind.Index;
public string EventStreamId => throw new InvalidOperationException();
public string IndexName { get; }
public IEventFilter EventFilter => null;

public PersistentSubscriptionIndexEventSource(string indexName) {
IndexName = indexName ?? throw new ArgumentNullException(nameof(indexName));
}

public override string ToString() => IndexName;

public IPersistentSubscriptionStreamPosition StreamStartPosition =>
new PersistentSubscriptionAllStreamPosition(0L, 0L);

public IPersistentSubscriptionStreamPosition GetStreamPositionFor(ResolvedEvent @event) {
if (@event.OriginalPosition.HasValue) {
return new PersistentSubscriptionAllStreamPosition(
@event.OriginalPosition.Value.CommitPosition,
@event.OriginalPosition.Value.PreparePosition);
}
throw new InvalidOperationException();
}

public IPersistentSubscriptionStreamPosition GetStreamPositionFor(string checkpoint) {
const string C = "C:";
const string P = "P:";
string[] tokens = checkpoint.Split("/");
Debug.Assert(tokens.Length == 2);
Debug.Assert(tokens[0].StartsWith(C));
Debug.Assert(tokens[1].StartsWith(P));
long commitPosition = long.Parse(tokens[0].Substring(C.Length));
long preparePosition = long.Parse(tokens[1].Substring(P.Length));
return new PersistentSubscriptionAllStreamPosition(commitPosition, preparePosition);
}
}
Loading
Loading