diff --git a/example-cl-mimicry.yaml b/example-cl-mimicry.yaml index d852800a9..385bdd51a 100644 --- a/example-cl-mimicry.yaml +++ b/example-cl-mimicry.yaml @@ -116,6 +116,34 @@ outputs: # # Configuration for events without sharding keys (Group D events) # noShardingKeyEvents: # enabled: true # Process all events without sharding keys +# +# randomSampling: +# # Random sampling configuration - provides a "second chance" for events +# # that were NOT captured by deterministic sharding. +# # +# # Pattern Format: +# # - For GroupC events (IWANT, IDONTWANT - no topic): Use exact event type +# # e.g., "LIBP2P_TRACE_RPC_META_CONTROL_IWANT" +# # - For GroupA/B events (with topic): Use "EVENT_TYPE*:topic_pattern" +# # e.g., "LIBP2P_TRACE_RPC_META_CONTROL_IHAVE*:.*attestation.*" +# # - For topic-only patterns: Use regex pattern +# # e.g., ".*beacon_block.*" +# # +# # Note: Random sampling only applies to events REJECTED by deterministic +# # sharding. Events already accepted are NOT sampled again (no duplicates). +# patterns: +# # 2% random sample of IWANT events +# "LIBP2P_TRACE_RPC_META_CONTROL_IWANT": +# chance: "2%" +# +# # 2% random sample of IHAVE attestation events +# "LIBP2P_TRACE_RPC_META_CONTROL_IHAVE*:.*attestation.*": +# chance: "2%" +# +# # 0.5% catch-all for any other rejected events +# ".*": +# chance: "0.5%" +# # events: # recvRpcEnabled: false # sendRpcEnabled: false diff --git a/pkg/clmimicry/config.go b/pkg/clmimicry/config.go index 0f2d47cd6..73fd81635 100644 --- a/pkg/clmimicry/config.go +++ b/pkg/clmimicry/config.go @@ -37,6 +37,10 @@ type Config struct { // Sharding is the configuration for event sharding Sharding ShardingConfig `yaml:"sharding"` + + // RandomSampling is the configuration for random sampling (second chance for events + // that were not captured by deterministic sharding) + RandomSampling RandomSamplingConfig `yaml:"randomSampling"` } func (c *Config) Validate() error { @@ -62,6 +66,10 @@ func (c *Config) Validate() error { return fmt.Errorf("invalid sharding config: %w", err) } + if err := c.validateRandomSampling(); err != nil { + return fmt.Errorf("invalid random sampling config: %w", err) + } + return nil } @@ -116,6 +124,27 @@ func (c *Config) validateSharding() error { return nil } +// validateRandomSampling validates the random sampling configuration +func (c *Config) validateRandomSampling() error { + // If random sampling is not configured, that's fine - it's optional + if c.RandomSampling.Patterns == nil { + c.RandomSampling.Patterns = make(map[string]*RandomSamplingPatternConfig) + + return nil + } + + // Compile and validate patterns + if err := c.RandomSampling.compilePatterns(); err != nil { + return fmt.Errorf("failed to compile random sampling patterns: %w", err) + } + + if err := c.RandomSampling.validate(); err != nil { + return err + } + + return nil +} + // ApplyOverrides applies any overrides to the config. func (c *Config) ApplyOverrides(o *Override, log logrus.FieldLogger) error { if o == nil { diff --git a/pkg/clmimicry/event_libp2p_test.go b/pkg/clmimicry/event_libp2p_test.go index 04480a915..cb75dbd68 100644 --- a/pkg/clmimicry/event_libp2p_test.go +++ b/pkg/clmimicry/event_libp2p_test.go @@ -2784,7 +2784,8 @@ func createTestMimicry(t *testing.T, config *Config, sink output.Sink) *testMimi if config.Sharding.Topics != nil || config.Sharding.NoShardingKeyEvents != nil { // Use sharding from config var err error - sharder, err = NewUnifiedSharder(&config.Sharding, true) + + sharder, err = NewUnifiedSharder(&config.Sharding, nil, true) if err != nil { t.Fatalf("Failed to create sharder: %v", err) } diff --git a/pkg/clmimicry/gossipsub_data_column_sidecar_test.go b/pkg/clmimicry/gossipsub_data_column_sidecar_test.go index 1c900446c..00b3c79c4 100644 --- a/pkg/clmimicry/gossipsub_data_column_sidecar_test.go +++ b/pkg/clmimicry/gossipsub_data_column_sidecar_test.go @@ -312,7 +312,8 @@ func createTestMimicryWithWallclock(t *testing.T, config *Config, sink output.Si var sharder *UnifiedSharder if config.Sharding.Topics != nil || config.Sharding.NoShardingKeyEvents != nil { var err error - sharder, err = NewUnifiedSharder(&config.Sharding, true) + + sharder, err = NewUnifiedSharder(&config.Sharding, nil, true) if err != nil { t.Fatalf("Failed to create sharder: %v", err) } diff --git a/pkg/clmimicry/random_sampling.go b/pkg/clmimicry/random_sampling.go new file mode 100644 index 000000000..a7ce61955 --- /dev/null +++ b/pkg/clmimicry/random_sampling.go @@ -0,0 +1,272 @@ +package clmimicry + +import ( + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/ethpandaops/xatu/pkg/proto/xatu" +) + +// RandomSamplingConfig represents random sampling configuration for events +// that were not captured by deterministic sharding. +type RandomSamplingConfig struct { + // Patterns maps event/topic patterns to their sampling chances. + // Key format: "EVENT_TYPE*:topic_pattern" or "EVENT_TYPE" (for GroupC events without topics) + Patterns map[string]*RandomSamplingPatternConfig `yaml:"patterns"` + + // compiledPatterns holds compiled regex patterns for performance + compiledPatterns map[string]*CompiledRandomPattern +} + +// RandomSamplingPatternConfig defines chance-based sampling for a pattern. +type RandomSamplingPatternConfig struct { + // Chance is the probability as a percentage string (e.g., "2%", "0.5%") + Chance string `yaml:"chance"` + + // parsedChance is the parsed float64 value (0.0 to 1.0) + parsedChance float64 +} + +// CompiledRandomPattern holds a compiled pattern with its config. +type CompiledRandomPattern struct { + // Pattern is the compiled regex for topic matching (nil for GroupC patterns) + Pattern *regexp.Regexp + + // Config is the associated sampling configuration + Config *RandomSamplingPatternConfig + + // EventTypeConstraint specifies which event types this pattern applies to. + // Empty string means it applies to all events. + // Can be an exact event name (e.g., "LIBP2P_TRACE_RPC_META_CONTROL_IWANT") + // or a wildcard pattern (e.g., "LIBP2P_TRACE_RPC_META_*") + EventTypeConstraint string + + // IsGroupCPattern is true if this is an event-type-only pattern (no topic regex) + IsGroupCPattern bool +} + +// parseChancePercentage parses strings like "2%", "0.5%", "100%" to a float64 in range 0.0-1.0. +func parseChancePercentage(chance string) (float64, error) { + chance = strings.TrimSpace(chance) + + if !strings.HasSuffix(chance, "%") { + return 0, fmt.Errorf("chance must end with '%%', got '%s'", chance) + } + + valueStr := strings.TrimSuffix(chance, "%") + + value, err := strconv.ParseFloat(valueStr, 64) + if err != nil { + return 0, fmt.Errorf("invalid percentage value '%s': %w", valueStr, err) + } + + if value < 0 || value > 100 { + return 0, fmt.Errorf("percentage must be between 0 and 100, got %.2f", value) + } + + return value / 100.0, nil +} + +// isValidEventTypeName checks if a string looks like an event type name. +// Event types start with uppercase letters and contain underscores. +func isValidEventTypeName(s string) bool { + if s == "" { + return false + } + + // Must start with uppercase letter + if s[0] < 'A' || s[0] > 'Z' { + return false + } + + // Must contain underscore + return strings.Contains(s, "_") +} + +// compilePatterns compiles all regex patterns in the configuration. +func (c *RandomSamplingConfig) compilePatterns() error { + if c.Patterns == nil { + c.Patterns = make(map[string]*RandomSamplingPatternConfig) + + return nil + } + + c.compiledPatterns = make(map[string]*CompiledRandomPattern, len(c.Patterns)) + + for pattern, config := range c.Patterns { + // Parse the chance percentage + parsedChance, err := parseChancePercentage(config.Chance) + if err != nil { + return fmt.Errorf("invalid chance '%s' for pattern '%s': %w", + config.Chance, pattern, err) + } + + config.parsedChance = parsedChance + + // Check if this is a GroupC event-type-only pattern (no colon separator). + // GroupC patterns don't have a colon (e.g., "LIBP2P_TRACE_RPC_META_CONTROL_IWANT") + if !strings.Contains(pattern, ":") && isValidEventTypeName(pattern) { + c.compiledPatterns[pattern] = &CompiledRandomPattern{ + Pattern: nil, // No regex needed for event-type-only match + Config: config, + EventTypeConstraint: pattern, + IsGroupCPattern: true, + } + + continue + } + + // Parse EVENT_TYPE:topic_pattern format (same as existing sharding) + var ( + eventTypeConstraint string + topicPattern string + ) + + if colonIdx := strings.Index(pattern, ":"); colonIdx != -1 { + potentialEventType := pattern[:colonIdx] + // Check if it looks like an event type (starts with uppercase letter and contains underscore) + if potentialEventType != "" && + potentialEventType[0] >= 'A' && potentialEventType[0] <= 'Z' && + strings.Contains(potentialEventType, "_") { + // Extract event type constraint and topic pattern + eventTypeConstraint = potentialEventType + topicPattern = pattern[colonIdx+1:] + } else { + // Colon is part of the regex pattern, not an event type separator + eventTypeConstraint = "" + topicPattern = pattern + } + } else { + // No event type prefix, pattern applies to all events + eventTypeConstraint = "" + topicPattern = pattern + } + + // Compile the topic pattern as regex + compiled, err := regexp.Compile(topicPattern) + if err != nil { + return fmt.Errorf("invalid regex pattern '%s': %w", topicPattern, err) + } + + c.compiledPatterns[pattern] = &CompiledRandomPattern{ + Pattern: compiled, + Config: config, + EventTypeConstraint: eventTypeConstraint, + IsGroupCPattern: false, + } + } + + return nil +} + +// validate validates the random sampling configuration. +func (c *RandomSamplingConfig) validate() error { + for pattern, config := range c.Patterns { + if config.Chance == "" { + return fmt.Errorf("chance is required for pattern '%s'", pattern) + } + + if config.parsedChance < 0 || config.parsedChance > 1.0 { + return fmt.Errorf("invalid parsed chance %.4f for pattern '%s'", + config.parsedChance, pattern) + } + } + + return nil +} + +// findMatchingPattern finds the matching random sampling pattern for a given topic and event type. +// Returns nil if no pattern matches. +func (c *RandomSamplingConfig) findMatchingPattern( + topic string, + eventType xatu.Event_Name, +) *RandomSamplingPatternConfig { + if len(c.compiledPatterns) == 0 { + return nil + } + + eventTypeName := eventType.String() + + // For GroupC events (no topic), check event-type-only patterns first + if topic == "" { + // Try exact match on event type name + if compiled, exists := c.compiledPatterns[eventTypeName]; exists && compiled.IsGroupCPattern { + return compiled.Config + } + + // Check wildcard event type patterns for GroupC + for _, compiled := range c.compiledPatterns { + if !compiled.IsGroupCPattern { + continue + } + + // Check wildcard match (e.g., "LIBP2P_TRACE_RPC_META_*") + if strings.HasSuffix(compiled.EventTypeConstraint, "*") { + prefix := strings.TrimSuffix(compiled.EventTypeConstraint, "*") + if strings.HasPrefix(eventTypeName, prefix) { + return compiled.Config + } + } + } + } + + // Find the best matching pattern (select highest chance when multiple match) + var bestMatch *RandomSamplingPatternConfig + + var bestChance float64 + + for _, compiled := range c.compiledPatterns { + if compiled.IsGroupCPattern { + continue // Already handled above for GroupC + } + + // Check topic pattern match + if compiled.Pattern != nil && !compiled.Pattern.MatchString(topic) { + continue + } + + // Check event type constraint + if compiled.EventTypeConstraint != "" { + // Check for wildcard match (e.g., "LIBP2P_TRACE_RPC_META_*") + if strings.HasSuffix(compiled.EventTypeConstraint, "*") { + prefix := strings.TrimSuffix(compiled.EventTypeConstraint, "*") + if !strings.HasPrefix(eventTypeName, prefix) { + continue + } + } else if eventTypeName != compiled.EventTypeConstraint { + // Exact match required + continue + } + } + + // Select pattern with highest chance (consistent with existing sharding behavior) + if bestMatch == nil || compiled.Config.parsedChance > bestChance { + bestMatch = compiled.Config + bestChance = compiled.Config.parsedChance + } + } + + return bestMatch +} + +// GetParsedChance returns the parsed chance value (0.0 to 1.0). +func (c *RandomSamplingPatternConfig) GetParsedChance() float64 { + return c.parsedChance +} + +// LogSummary returns a human-readable summary of the random sampling configuration. +func (c *RandomSamplingConfig) LogSummary() string { + if len(c.Patterns) == 0 { + return "Random sampling disabled (no patterns configured)" + } + + summary := fmt.Sprintf("Random sampling enabled with %d patterns:", len(c.Patterns)) + + for pattern, config := range c.Patterns { + summary += fmt.Sprintf("\n - Pattern '%s': %s chance", pattern, config.Chance) + } + + return summary +} diff --git a/pkg/clmimicry/random_sampling_test.go b/pkg/clmimicry/random_sampling_test.go new file mode 100644 index 000000000..953adb385 --- /dev/null +++ b/pkg/clmimicry/random_sampling_test.go @@ -0,0 +1,679 @@ +package clmimicry + +import ( + "testing" + + "github.com/ethpandaops/xatu/pkg/proto/xatu" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const testMsgIDBase = "test-msg-id" + +func TestParseChancePercentage(t *testing.T) { + tests := []struct { + name string + input string + expected float64 + wantError bool + }{ + { + name: "2 percent", + input: "2%", + expected: 0.02, + wantError: false, + }, + { + name: "0.5 percent", + input: "0.5%", + expected: 0.005, + wantError: false, + }, + { + name: "100 percent", + input: "100%", + expected: 1.0, + wantError: false, + }, + { + name: "0 percent", + input: "0%", + expected: 0.0, + wantError: false, + }, + { + name: "50 percent", + input: "50%", + expected: 0.5, + wantError: false, + }, + { + name: "2.5 percent", + input: "2.5%", + expected: 0.025, + wantError: false, + }, + { + name: "with spaces", + input: " 10% ", + expected: 0.1, + wantError: false, + }, + { + name: "missing percent sign", + input: "2", + expected: 0, + wantError: true, + }, + { + name: "invalid value", + input: "invalid%", + expected: 0, + wantError: true, + }, + { + name: "over 100 percent", + input: "101%", + expected: 0, + wantError: true, + }, + { + name: "negative percent", + input: "-5%", + expected: 0, + wantError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := parseChancePercentage(tt.input) + if tt.wantError { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.InDelta(t, tt.expected, result, 0.0001) + } + }) + } +} + +func TestIsValidEventTypeName(t *testing.T) { + tests := []struct { + name string + input string + expected bool + }{ + { + name: "valid event type", + input: "LIBP2P_TRACE_RPC_META_CONTROL_IWANT", + expected: true, + }, + { + name: "valid with wildcard", + input: "LIBP2P_TRACE_*", + expected: true, + }, + { + name: "empty string", + input: "", + expected: false, + }, + { + name: "lowercase", + input: "libp2p_trace", + expected: false, + }, + { + name: "no underscore", + input: "LIBP2PTRACE", + expected: false, + }, + { + name: "regex pattern", + input: ".*beacon.*", + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isValidEventTypeName(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestRandomSamplingConfigCompilePatterns(t *testing.T) { + tests := []struct { + name string + config RandomSamplingConfig + wantError bool + }{ + { + name: "valid GroupC pattern", + config: RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + "LIBP2P_TRACE_RPC_META_CONTROL_IWANT": {Chance: "2%"}, + }, + }, + wantError: false, + }, + { + name: "valid GroupA/B pattern with topic", + config: RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + "LIBP2P_TRACE_RPC_META_CONTROL_IHAVE*:.*attestation.*": {Chance: "5%"}, + }, + }, + wantError: false, + }, + { + name: "valid topic-only pattern", + config: RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + ".*beacon_block.*": {Chance: "10%"}, + }, + }, + wantError: false, + }, + { + name: "multiple patterns", + config: RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + "LIBP2P_TRACE_RPC_META_CONTROL_IWANT": {Chance: "2%"}, + "LIBP2P_TRACE_RPC_META_CONTROL_IHAVE*:.*attestation.*": {Chance: "5%"}, + ".*": {Chance: "0.5%"}, + }, + }, + wantError: false, + }, + { + name: "invalid chance format", + config: RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + "LIBP2P_TRACE_RPC_META_CONTROL_IWANT": {Chance: "invalid"}, + }, + }, + wantError: true, + }, + { + name: "invalid regex pattern", + config: RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + "LIBP2P_TRACE_*:[invalid(": {Chance: "2%"}, + }, + }, + wantError: true, + }, + { + name: "nil patterns", + config: RandomSamplingConfig{}, + wantError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.compilePatterns() + if tt.wantError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestRandomSamplingFindMatchingPattern(t *testing.T) { + config := RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + "LIBP2P_TRACE_RPC_META_CONTROL_IWANT": {Chance: "2%"}, + "LIBP2P_TRACE_RPC_META_CONTROL_IHAVE*:.*attestation.*": {Chance: "5%"}, + ".*beacon_block.*": {Chance: "10%"}, + }, + } + + err := config.compilePatterns() + require.NoError(t, err) + + tests := []struct { + name string + topic string + eventType xatu.Event_Name + expectMatch bool + expectedChance float64 + }{ + { + name: "GroupC IWANT match - no topic", + topic: "", + eventType: xatu.Event_LIBP2P_TRACE_RPC_META_CONTROL_IWANT, + expectMatch: true, + expectedChance: 0.02, + }, + { + name: "GroupA IHAVE match with attestation topic", + topic: "/eth2/mainnet/beacon_attestation/1/ssz_snappy", + eventType: xatu.Event_LIBP2P_TRACE_RPC_META_CONTROL_IHAVE, + expectMatch: true, + expectedChance: 0.05, + }, + { + name: "Topic pattern match for beacon block", + topic: "/eth2/mainnet/beacon_block/ssz_snappy", + eventType: xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK, + expectMatch: true, + expectedChance: 0.10, + }, + { + name: "No match for unrelated event type and topic", + topic: "/eth2/mainnet/sync_committee/1/ssz_snappy", + eventType: xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION, + expectMatch: false, + }, + { + name: "GroupC IDONTWANT - no match", + topic: "", + eventType: xatu.Event_LIBP2P_TRACE_RPC_META_CONTROL_IDONTWANT, + expectMatch: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := config.findMatchingPattern(tt.topic, tt.eventType) + if tt.expectMatch { + require.NotNil(t, result) + assert.InDelta(t, tt.expectedChance, result.parsedChance, 0.0001) + } else { + assert.Nil(t, result) + } + }) + } +} + +func TestRandomSamplingNoDuplicates(t *testing.T) { + // Test that when deterministic sharding accepts an event, + // random sampling is NOT invoked (no duplicates) + shardingConfig := &ShardingConfig{ + Topics: map[string]*TopicShardingConfig{ + ".*": { + TotalShards: 1, + ActiveShards: []uint64{0}, // 100% deterministic sampling + }, + }, + } + + err := shardingConfig.compilePatterns() + require.NoError(t, err) + + randomConfig := &RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + ".*": {Chance: "100%"}, // 100% random sampling + }, + } + + err = randomConfig.compilePatterns() + require.NoError(t, err) + + sharder, err := NewUnifiedSharder(shardingConfig, randomConfig, true) + require.NoError(t, err) + + // Should be accepted by deterministic sharding + // The reason should NOT contain "random" + shouldProcess, reason := sharder.ShouldProcess( + xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK, + "test-msg-id", + "/eth2/mainnet/beacon_block/ssz_snappy", + ) + + assert.True(t, shouldProcess) + assert.NotContains(t, reason, "random") + assert.Contains(t, reason, "group_a") +} + +func TestRandomSamplingSecondChance(t *testing.T) { + // Test that when deterministic sharding rejects an event, + // random sampling is tried + // + // We configure deterministic sharding to only accept shard 511, + // and use a message ID that we know hashes to a different shard. + shardingConfig := &ShardingConfig{ + Topics: map[string]*TopicShardingConfig{ + ".*": { + TotalShards: 512, + ActiveShards: []uint64{511}, // Only shard 511 is active + }, + }, + } + + err := shardingConfig.compilePatterns() + require.NoError(t, err) + + randomConfig := &RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + ".*": {Chance: "100%"}, // 100% random sampling + }, + } + + err = randomConfig.compilePatterns() + require.NoError(t, err) + + sharder, err := NewUnifiedSharder(shardingConfig, randomConfig, true) + require.NoError(t, err) + + // Find a message ID that doesn't hash to shard 511 + testMsgID := testMsgIDBase + for GetShard(testMsgID, 512) == 511 { + testMsgID = testMsgID + "x" + } + + // Deterministic should reject (msgID doesn't hash to shard 511) + // Random sampling at 100% should accept + shouldProcess, reason := sharder.ShouldProcess( + xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK, + testMsgID, + "/eth2/mainnet/beacon_block/ssz_snappy", + ) + + assert.True(t, shouldProcess) + assert.Contains(t, reason, "random_sampled") +} + +func TestRandomSamplingGroupC(t *testing.T) { + // Test GroupC events (IWANT) work with random sampling + shardingConfig := &ShardingConfig{ + Topics: map[string]*TopicShardingConfig{ + "LIBP2P_TRACE_RPC_META_CONTROL_IWANT": { + TotalShards: 512, + ActiveShards: []uint64{511}, // Only shard 511 is active + }, + }, + } + + err := shardingConfig.compilePatterns() + require.NoError(t, err) + + randomConfig := &RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + "LIBP2P_TRACE_RPC_META_CONTROL_IWANT": {Chance: "100%"}, + }, + } + + err = randomConfig.compilePatterns() + require.NoError(t, err) + + sharder, err := NewUnifiedSharder(shardingConfig, randomConfig, true) + require.NoError(t, err) + + // Find a message ID that doesn't hash to shard 511 + testMsgID := testMsgIDBase + for GetShard(testMsgID, 512) == 511 { + testMsgID = testMsgID + "x" + } + + // GroupC event with no topic - should be accepted by random sampling + shouldProcess, reason := sharder.ShouldProcess( + xatu.Event_LIBP2P_TRACE_RPC_META_CONTROL_IWANT, + testMsgID, + "", // GroupC events have no topic + ) + + assert.True(t, shouldProcess) + assert.Contains(t, reason, "random_sampled") +} + +func TestRandomSamplingDisabled(t *testing.T) { + // Test that when random sampling config is nil, it doesn't affect deterministic + shardingConfig := &ShardingConfig{ + Topics: map[string]*TopicShardingConfig{ + ".*": { + TotalShards: 1, + ActiveShards: []uint64{0}, // 100% deterministic + }, + }, + } + + err := shardingConfig.compilePatterns() + require.NoError(t, err) + + // No random sampling config + sharder, err := NewUnifiedSharder(shardingConfig, nil, true) + require.NoError(t, err) + + assert.False(t, sharder.randomSamplingEnabled) + + shouldProcess, reason := sharder.ShouldProcess( + xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK, + "test-msg-id", + "/eth2/mainnet/beacon_block/ssz_snappy", + ) + + assert.True(t, shouldProcess) + assert.NotContains(t, reason, "random") +} + +func TestRandomSamplingDistribution(t *testing.T) { + // Statistical test to verify approximate random sampling rate + shardingConfig := &ShardingConfig{ + Topics: map[string]*TopicShardingConfig{ + ".*": { + TotalShards: 512, + ActiveShards: []uint64{511}, // Only shard 511 is active + }, + }, + } + + err := shardingConfig.compilePatterns() + require.NoError(t, err) + + randomConfig := &RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + ".*": {Chance: "10%"}, // 10% random sampling + }, + } + + err = randomConfig.compilePatterns() + require.NoError(t, err) + + sharder, err := NewUnifiedSharder(shardingConfig, randomConfig, true) + require.NoError(t, err) + + // Find a message ID that doesn't hash to shard 511 + testMsgID := testMsgIDBase + for GetShard(testMsgID, 512) == 511 { + testMsgID = testMsgID + "x" + } + + // Run many trials + const trials = 10000 + + accepted := 0 + + for i := 0; i < trials; i++ { + shouldProcess, _ := sharder.ShouldProcess( + xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK, + testMsgID, // Same msgID that deterministic always rejects + "/eth2/mainnet/beacon_block/ssz_snappy", + ) + if shouldProcess { + accepted++ + } + } + + // With 10% sampling, we expect ~1000 accepted out of 10000 + // Allow for statistical variance (8% to 12%) + acceptedRate := float64(accepted) / float64(trials) + assert.InDelta(t, 0.10, acceptedRate, 0.02, "Expected ~10%% acceptance rate, got %.2f%%", acceptedRate*100) +} + +func TestRandomSamplingWildcardEventType(t *testing.T) { + // Test that wildcard event type patterns match multiple event types + config := RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + "LIBP2P_TRACE_RPC_META_*:.*attestation.*": {Chance: "100%"}, + }, + } + + err := config.compilePatterns() + require.NoError(t, err) + + topic := "/eth2/mainnet/beacon_attestation/1/ssz_snappy" + + // Should match IHAVE + result := config.findMatchingPattern(topic, xatu.Event_LIBP2P_TRACE_RPC_META_CONTROL_IHAVE) + require.NotNil(t, result, "IHAVE should match wildcard pattern") + + // Should match IWANT (even though it's GroupC, the wildcard should still match the event type) + result = config.findMatchingPattern(topic, xatu.Event_LIBP2P_TRACE_RPC_META_CONTROL_IWANT) + require.NotNil(t, result, "IWANT should match wildcard pattern") + + // Should NOT match GOSSIPSUB events + result = config.findMatchingPattern(topic, xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION) + assert.Nil(t, result, "GOSSIPSUB should not match RPC_META wildcard") +} + +func TestRandomSamplingZeroPercent(t *testing.T) { + // Test that 0% chance never accepts + shardingConfig := &ShardingConfig{ + Topics: map[string]*TopicShardingConfig{ + ".*": { + TotalShards: 512, + ActiveShards: []uint64{511}, // Only shard 511 is active + }, + }, + } + + err := shardingConfig.compilePatterns() + require.NoError(t, err) + + randomConfig := &RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + ".*": {Chance: "0%"}, // 0% should never accept + }, + } + + err = randomConfig.compilePatterns() + require.NoError(t, err) + + sharder, err := NewUnifiedSharder(shardingConfig, randomConfig, true) + require.NoError(t, err) + + // Find a message ID that doesn't hash to shard 511 + testMsgID := testMsgIDBase + for GetShard(testMsgID, 512) == 511 { + testMsgID = testMsgID + "x" + } + + // Run many trials - none should be accepted + for i := 0; i < 1000; i++ { + shouldProcess, _ := sharder.ShouldProcess( + xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK, + testMsgID, + "/eth2/mainnet/beacon_block/ssz_snappy", + ) + assert.False(t, shouldProcess, "0%% chance should never accept") + } +} + +func TestRandomSamplingGroupB(t *testing.T) { + // Test that topic-only patterns work for random sampling + shardingConfig := &ShardingConfig{ + Topics: map[string]*TopicShardingConfig{ + ".*": { + TotalShards: 512, + ActiveShards: []uint64{511}, // Only shard 511 is active + }, + }, + } + + err := shardingConfig.compilePatterns() + require.NoError(t, err) + + randomConfig := &RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + ".*beacon_block.*": {Chance: "100%"}, // Topic-only pattern + }, + } + + err = randomConfig.compilePatterns() + require.NoError(t, err) + + sharder, err := NewUnifiedSharder(shardingConfig, randomConfig, true) + require.NoError(t, err) + + // Find a message ID that doesn't hash to shard 511 + testMsgID := testMsgIDBase + for GetShard(testMsgID, 512) == 511 { + testMsgID = testMsgID + "x" + } + + // Topic-only pattern should match based on topic + shouldProcess, reason := sharder.ShouldProcess( + xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK, + testMsgID, + "/eth2/mainnet/beacon_block/ssz_snappy", + ) + + assert.True(t, shouldProcess) + assert.Contains(t, reason, "random_sampled") +} + +func TestRandomSamplingEmptyPatterns(t *testing.T) { + // Test that empty patterns map (not nil) disables random sampling + randomConfig := &RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{}, // Empty, not nil + } + + err := randomConfig.compilePatterns() + require.NoError(t, err) + + shardingConfig := &ShardingConfig{ + Topics: map[string]*TopicShardingConfig{ + ".*": { + TotalShards: 512, + ActiveShards: []uint64{511}, + }, + }, + } + + err = shardingConfig.compilePatterns() + require.NoError(t, err) + + sharder, err := NewUnifiedSharder(shardingConfig, randomConfig, true) + require.NoError(t, err) + + assert.False(t, sharder.randomSamplingEnabled, "Empty patterns should disable random sampling") +} + +func TestRandomSamplingLogSummary(t *testing.T) { + tests := []struct { + name string + config RandomSamplingConfig + contains []string + }{ + { + name: "empty config", + config: RandomSamplingConfig{}, + contains: []string{"disabled"}, + }, + { + name: "with patterns", + config: RandomSamplingConfig{ + Patterns: map[string]*RandomSamplingPatternConfig{ + "LIBP2P_TRACE_RPC_META_CONTROL_IWANT": {Chance: "2%"}, + }, + }, + contains: []string{"enabled", "1 patterns", "2%"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + summary := tt.config.LogSummary() + for _, substr := range tt.contains { + assert.Contains(t, summary, substr) + } + }) + } +} diff --git a/pkg/clmimicry/sharding.go b/pkg/clmimicry/sharding.go index d0532493f..b38cac030 100644 --- a/pkg/clmimicry/sharding.go +++ b/pkg/clmimicry/sharding.go @@ -2,6 +2,7 @@ package clmimicry import ( "fmt" + "math/rand/v2" "regexp" "sort" "strconv" @@ -18,9 +19,11 @@ const ( // UnifiedSharder provides a single sharding decision point for all events type UnifiedSharder struct { - config *ShardingConfig - eventCategorizer *EventCategorizer - enabled bool + config *ShardingConfig + randomSamplingConfig *RandomSamplingConfig + eventCategorizer *EventCategorizer + enabled bool + randomSamplingEnabled bool } // ShardingConfig represents the sharding configuration @@ -61,7 +64,11 @@ type CompiledPattern struct { } // NewUnifiedSharder creates a new unified sharder -func NewUnifiedSharder(config *ShardingConfig, enabled bool) (*UnifiedSharder, error) { +func NewUnifiedSharder( + config *ShardingConfig, + randomConfig *RandomSamplingConfig, + enabled bool, +) (*UnifiedSharder, error) { if config == nil { config = &ShardingConfig{ Topics: make(map[string]*TopicShardingConfig), @@ -81,14 +88,21 @@ func NewUnifiedSharder(config *ShardingConfig, enabled bool) (*UnifiedSharder, e return nil, fmt.Errorf("invalid sharding config: %w", err) } + // Check if random sampling is enabled + randomSamplingEnabled := randomConfig != nil && len(randomConfig.Patterns) > 0 + return &UnifiedSharder{ - config: config, - eventCategorizer: NewEventCategorizer(), - enabled: enabled, + config: config, + randomSamplingConfig: randomConfig, + eventCategorizer: NewEventCategorizer(), + enabled: enabled, + randomSamplingEnabled: randomSamplingEnabled, }, nil } -// ShouldProcess determines if an event should be processed based on sharding rules +// ShouldProcess determines if an event should be processed based on sharding rules. +// It first tries deterministic sharding, and if that rejects the event, it tries +// random sampling as a "second chance" mechanism. // //nolint:gocritic // named returns unnecessary. func (s *UnifiedSharder) ShouldProcess(eventType xatu.Event_Name, msgID, topic string) (bool, string) { @@ -96,6 +110,33 @@ func (s *UnifiedSharder) ShouldProcess(eventType xatu.Event_Name, msgID, topic s return true, "sharding_disabled" } + // Try deterministic sharding first + deterministicResult, deterministicReason := s.deterministicProcess(eventType, msgID, topic) + + // If deterministic sharding accepted, return immediately (no duplicates) + if deterministicResult { + return true, deterministicReason + } + + // Try random sampling as "second chance" if deterministic rejected + if s.randomSamplingEnabled { + randomResult, randomReason := s.randomSamplingProcess(eventType, topic) + if randomResult { + return true, randomReason + } + } + + // Neither deterministic nor random sampling accepted + return false, deterministicReason +} + +// deterministicProcess contains the deterministic sharding logic based on SipHash. +// +//nolint:gocritic // named returns unnecessary. +func (s *UnifiedSharder) deterministicProcess( + eventType xatu.Event_Name, + msgID, topic string, +) (bool, string) { eventInfo, exists := s.eventCategorizer.GetEventInfo(eventType) if !exists { // Unknown event type - default to process @@ -173,6 +214,33 @@ func (s *UnifiedSharder) ShouldProcess(eventType xatu.Event_Name, msgID, topic s } } +// randomSamplingProcess checks if random sampling should accept this event. +// Returns true if the event should be processed based on random chance. +// +//nolint:gocritic // named returns unnecessary. +func (s *UnifiedSharder) randomSamplingProcess( + eventType xatu.Event_Name, + topic string, +) (bool, string) { + if s.randomSamplingConfig == nil { + return false, "random_sampling_not_configured" + } + + // Find matching random sampling pattern + config := s.randomSamplingConfig.findMatchingPattern(topic, eventType) + if config == nil { + return false, "random_no_pattern_match" + } + + // Roll the dice - rand.Float64() is thread-safe in math/rand/v2 + //nolint:gosec // G404: crypto strength not needed for sampling + if rand.Float64() < config.parsedChance { + return true, fmt.Sprintf("random_sampled_%.2f_pct", config.parsedChance*100) + } + + return false, fmt.Sprintf("random_not_sampled_%.2f_pct", config.parsedChance*100) +} + // ShouldProcessBatch determines which events in a batch should be processed // This is used for RPC meta events where we have multiple events to evaluate func (s *UnifiedSharder) ShouldProcessBatch(eventType xatu.Event_Name, events []ShardableEvent) []bool { diff --git a/pkg/clmimicry/sharding_test.go b/pkg/clmimicry/sharding_test.go index 2ceb4f238..0f7fccd7b 100644 --- a/pkg/clmimicry/sharding_test.go +++ b/pkg/clmimicry/sharding_test.go @@ -40,7 +40,7 @@ func TestConfigurableTotalShards(t *testing.T) { assert.InDelta(t, 0.0039, config.Topics["large_shards"].GetSamplingRate(), 0.001) // Test that sharding works with different totalShards - _, err = NewUnifiedSharder(config, true) + _, err = NewUnifiedSharder(config, nil, true) require.NoError(t, err) // Test that the same key produces different shards with different totalShards @@ -251,7 +251,7 @@ func TestUnifiedSharder(t *testing.T) { require.NoError(t, err) // Create sharder - sharder, err := NewUnifiedSharder(tt.config, tt.name != "Sharding disabled") + sharder, err := NewUnifiedSharder(tt.config, nil, tt.name != "Sharding disabled") require.NoError(t, err) // Test sharding decision @@ -284,7 +284,7 @@ func TestShardDistribution(t *testing.T) { err := config.compilePatterns() require.NoError(t, err) - sharder, err := NewUnifiedSharder(config, true) + sharder, err := NewUnifiedSharder(config, nil, true) require.NoError(t, err) // Test shard distribution using chi-squared test @@ -533,7 +533,7 @@ func TestBatchProcessing(t *testing.T) { err := config.compilePatterns() require.NoError(t, err) - sharder, err := NewUnifiedSharder(config, true) + sharder, err := NewUnifiedSharder(config, nil, true) require.NoError(t, err) // Create batch of events @@ -688,7 +688,7 @@ func TestEventTypeAwareSharding(t *testing.T) { }, } - sharder, err := NewUnifiedSharder(config, true) + sharder, err := NewUnifiedSharder(config, nil, true) require.NoError(t, err) topic := "/eth2/12345678/beacon_attestation_1/ssz_snappy" @@ -809,7 +809,7 @@ func TestBackwardCompatibility(t *testing.T) { }, } - sharder, err := NewUnifiedSharder(config, true) + sharder, err := NewUnifiedSharder(config, nil, true) require.NoError(t, err) // Test that patterns without event constraints match all event types @@ -844,7 +844,7 @@ func TestGroupCEventTypeOnlyPatterns(t *testing.T) { }, } - sharder, err := NewUnifiedSharder(config, true) + sharder, err := NewUnifiedSharder(config, nil, true) require.NoError(t, err) // Test with multiple message IDs to verify 100% sampling @@ -886,7 +886,7 @@ func TestGroupCEventTypeOnlyPatterns(t *testing.T) { }, } - iwantSharder, err := NewUnifiedSharder(iwantConfig, true) + iwantSharder, err := NewUnifiedSharder(iwantConfig, nil, true) require.NoError(t, err) iwantProcessed := 0