From 5bf055b05509ebe06cd8a630d48d8cea74c8754b Mon Sep 17 00:00:00 2001 From: Eren Atas Date: Wed, 1 Apr 2026 18:53:12 +0200 Subject: [PATCH 1/2] fix(store): scope flag deletion by flagSetId to support per-selector sync When an upstream provider sends per-selector stream messages (each with a flagSetId in metadata), store.Update now only deletes stale flags within the same flagSetId(s), preserving flags from other selector. Signed-off-by: Eren Atas --- core/pkg/store/query.go | 17 +++-- core/pkg/store/store.go | 38 +++++++++- core/pkg/store/store_test.go | 135 +++++++++++++++++++++++++++++++++++ 3 files changed, 182 insertions(+), 8 deletions(-) diff --git a/core/pkg/store/query.go b/core/pkg/store/query.go index f72a3a25e..478440a8b 100644 --- a/core/pkg/store/query.go +++ b/core/pkg/store/query.go @@ -136,10 +136,17 @@ func (s *Selector) ToMetadata() model.Metadata { } func (s *Selector) ToLogString() string { - if s != nil && len(s.indexMap) == 1 { - for k, v := range s.indexMap { - return fmt.Sprintf("'%s=%s'", k, v) - } + if s == nil || len(s.indexMap) == 0 { + return "" } - return "" + keys := make([]string, 0, len(s.indexMap)) + for k := range s.indexMap { + keys = append(keys, k) + } + sort.Strings(keys) + parts := make([]string, 0, len(keys)) + for _, k := range keys { + parts = append(parts, fmt.Sprintf("%s=%s", k, s.indexMap[k])) + } + return "'" + strings.Join(parts, ",") + "'" } diff --git a/core/pkg/store/store.go b/core/pkg/store/store.go index baad3892c..059a4ecdd 100644 --- a/core/pkg/store/store.go +++ b/core/pkg/store/store.go @@ -254,9 +254,41 @@ func (s *Store) Update( txn := s.db.Txn(true) defer txn.Abort() - // get all flags for the source we are updating - selector := NewSelector(sourceIndex + "=" + source) - oldFlags, _, _ := s.GetAll(context.Background(), &selector) + // When metadata carries a flagSetId, scope deletion to only the flagSetIds touched by + // this update (the metadata-level one plus any flag-level overrides). This allows + // per-flagSetId updates (e.g., from per-project stream messages) to accumulate in + // the store without deleting flags from unrelated flagSetIds. + // When metadata has no flagSetId, fall back to source-scoped deletion (original behavior). + // Note: we keep the fsi != "" guard because empty-string flagSetIds are not normalized + // to nilFlagSetId at storage time, but NewSelector DOES normalize them, creating a + // mismatch that would cause queries to miss the actual flags. Source-scoped fallback + // is the correct behavior for empty-string flagSetIds. + var oldFlags []model.Flag + if fsi, ok := metadata["flagSetId"].(string); ok && fsi != "" { + seenFlagSetIds := map[string]struct{}{fsi: {}} + for id := range newFlags { + seenFlagSetIds[id.flagSetId] = struct{}{} + } + for fsi := range seenFlagSetIds { + sel := NewSelector(flagSetIdIndex + "=" + fsi).WithIndex(sourceIndex, source) + indexId, constraints := sel.ToQuery() + it, err := txn.Get(flagsTable, indexId, constraints...) + if err != nil { + s.logger.Error(fmt.Sprintf("unable to query flags for flagSetId %s: %v", fsi, err)) + continue + } + oldFlags = append(oldFlags, s.collect(it)...) + } + } else { + sel := NewSelector(sourceIndex + "=" + source) + indexId, constraints := sel.ToQuery() + it, err := txn.Get(flagsTable, indexId, constraints...) + if err != nil { + s.logger.Error(fmt.Sprintf("unable to query flags for source %s: %v", source, err)) + } else { + oldFlags = s.collect(it) + } + } for _, oldFlag := range oldFlags { if _, ok := newFlags[flagIdentifier{flagSetId: oldFlag.FlagSetId, key: oldFlag.Key}]; !ok { diff --git a/core/pkg/store/store_test.go b/core/pkg/store/store_test.go index 677bc59d0..fb893429a 100644 --- a/core/pkg/store/store_test.go +++ b/core/pkg/store/store_test.go @@ -618,6 +618,141 @@ func TestWatch(t *testing.T) { } } +func TestUpdateFlagSetIdScoping(t *testing.T) { + t.Parallel() + + const src = "src1" + sources := []string{src} + + type updateStep struct { + flags []model.Flag + metadata model.Metadata + } + + tests := []struct { + name string + updates []updateStep + wantPresent []string // "flagSetId/key" entries expected in the store + wantAbsent []string // "flagSetId/key" entries expected to be gone + }{ + { + name: "per-flagSetId update preserves flags from other flagSetIds", + updates: []updateStep{ + {flags: []model.Flag{{Key: "flagA1"}, {Key: "flagA2"}}, metadata: model.Metadata{"flagSetId": "A"}}, + {flags: []model.Flag{{Key: "flagB1"}}, metadata: model.Metadata{"flagSetId": "B"}}, + {flags: []model.Flag{{Key: "flagA1"}}, metadata: model.Metadata{"flagSetId": "A"}}, + }, + wantPresent: []string{"A/flagA1", "B/flagB1"}, + wantAbsent: []string{"A/flagA2"}, + }, + { + name: "out-of-scope flag-level override persists when not in batch", + updates: []updateStep{ + {flags: []model.Flag{{Key: "kept"}, {Key: "override", Metadata: model.Metadata{"flagSetId": "Y"}}}, metadata: model.Metadata{"flagSetId": "X"}}, + {flags: []model.Flag{{Key: "kept"}}, metadata: model.Metadata{"flagSetId": "X"}}, + }, + wantPresent: []string{"X/kept", "Y/override"}, + }, + { + name: "stale flag deleted when its flagSetId is in scope", + updates: []updateStep{ + {flags: []model.Flag{{Key: "inX"}, {Key: "inY", Metadata: model.Metadata{"flagSetId": "Y"}}}, metadata: model.Metadata{"flagSetId": "X"}}, + {flags: []model.Flag{{Key: "inY", Metadata: model.Metadata{"flagSetId": "Y"}}}, metadata: model.Metadata{"flagSetId": "X"}}, + }, + wantPresent: []string{"Y/inY"}, + wantAbsent: []string{"X/inX"}, + }, + { + name: "no flagSetId in metadata falls back to source-scoped deletion", + updates: []updateStep{ + {flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}}, + {flags: []model.Flag{{Key: "flagB"}}, metadata: model.Metadata{"flagSetId": "B"}}, + {flags: []model.Flag{}, metadata: nil}, + }, + wantAbsent: []string{"A/flagA", "B/flagB"}, + }, + { + name: "empty update with flagSetId clears only that set", + updates: []updateStep{ + {flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}}, + {flags: []model.Flag{{Key: "flagB"}}, metadata: model.Metadata{"flagSetId": "B"}}, + {flags: []model.Flag{}, metadata: model.Metadata{"flagSetId": "A"}}, + }, + wantPresent: []string{"B/flagB"}, + wantAbsent: []string{"A/flagA"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + s, err := NewStore(logger.NewLogger(nil, false), sources) + require.NoError(t, err) + + for _, step := range tt.updates { + s.Update(src, step.flags, step.metadata) + } + + allFlags, _, _ := s.GetAll(context.Background(), nil) + flagKeys := make(map[string]struct{}, len(allFlags)) + for _, f := range allFlags { + flagKeys[f.FlagSetId+"/"+f.Key] = struct{}{} + } + + for _, key := range tt.wantPresent { + assert.Contains(t, flagKeys, key) + } + for _, key := range tt.wantAbsent { + assert.NotContains(t, flagKeys, key) + } + }) + } +} + +func TestToLogStringCompound(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + selector *Selector + want string + }{ + { + name: "nil selector", + selector: nil, + want: "", + }, + { + name: "empty selector", + selector: &Selector{indexMap: map[string]string{}}, + want: "", + }, + { + name: "single key", + selector: &Selector{indexMap: map[string]string{"source": "mySource"}}, + want: "'source=mySource'", + }, + { + name: "compound selector", + selector: &Selector{indexMap: map[string]string{"flagSetId": "abc", "source": "mySource"}}, + want: "'flagSetId=abc,source=mySource'", + }, + { + name: "three keys sorted", + selector: &Selector{indexMap: map[string]string{"source": "s", "key": "k", "flagSetId": "f"}}, + want: "'flagSetId=f,key=k,source=s'", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.selector.ToLogString() + assert.Equal(t, tt.want, got) + }) + } +} + func TestQueryMetadata(t *testing.T) { sourceA := "sourceA" From 88690180310f94956707310d0e0496e4e97a0b39 Mon Sep 17 00:00:00 2001 From: Eren Atas Date: Wed, 8 Apr 2026 15:11:21 +0200 Subject: [PATCH 2/2] add opt-in per-flagSetId incremental updates and add tests Signed-off-by: Eren Atas --- core/pkg/evaluator/fractional_test.go | 4 +- core/pkg/evaluator/json.go | 2 +- core/pkg/evaluator/semver_test.go | 2 +- core/pkg/evaluator/string_comparison_test.go | 4 +- core/pkg/store/store.go | 27 ++++---- core/pkg/store/store_test.go | 69 ++++++++++++------- core/pkg/sync/builder/syncbuilder.go | 13 ++-- core/pkg/sync/builder/syncbuilder_test.go | 18 +++++ core/pkg/sync/grpc/grpc_sync.go | 15 ++-- core/pkg/sync/grpc/grpc_sync_test.go | 32 +++++++++ core/pkg/sync/isync.go | 11 +++ .../flag-evaluation/connect_service_test.go | 2 +- .../service/flag-evaluation/eventing_test.go | 8 +-- flagd/pkg/service/flag-sync/handler_test.go | 4 +- .../service/flag-sync/sync_service_test.go | 2 +- flagd/pkg/service/flag-sync/util_test.go | 4 +- 16 files changed, 153 insertions(+), 64 deletions(-) diff --git a/core/pkg/evaluator/fractional_test.go b/core/pkg/evaluator/fractional_test.go index c1dfb9a38..34b3e16ee 100644 --- a/core/pkg/evaluator/fractional_test.go +++ b/core/pkg/evaluator/fractional_test.go @@ -409,7 +409,7 @@ func TestFractionalEvaluation(t *testing.T) { } je := NewJSON(log, s) - je.store.Update(source, tt.flags, model.Metadata{}) + je.store.Update(source, tt.flags, model.Metadata{}, false) value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant) @@ -537,7 +537,7 @@ func BenchmarkFractionalEvaluation(b *testing.B) { b.Fatalf("NewStore failed: %v", err) } je := NewJSON(log, s) - je.store.Update(source, tt.flags, model.Metadata{}) + je.store.Update(source, tt.flags, model.Metadata{}, false) for i := 0; i < b.N; i++ { value, variant, reason, _, err := resolve[string]( diff --git a/core/pkg/evaluator/json.go b/core/pkg/evaluator/json.go index 45a13175d..428ff35c8 100644 --- a/core/pkg/evaluator/json.go +++ b/core/pkg/evaluator/json.go @@ -137,7 +137,7 @@ func (je *JSON) SetState(payload sync.DataSync) error { return err } - je.store.Update(payload.Source, definition.Flags, definition.Metadata) + je.store.Update(payload.Source, definition.Flags, definition.Metadata, payload.IncrementalUpdates) return nil } diff --git a/core/pkg/evaluator/semver_test.go b/core/pkg/evaluator/semver_test.go index 52f59a913..0d25af9cc 100644 --- a/core/pkg/evaluator/semver_test.go +++ b/core/pkg/evaluator/semver_test.go @@ -798,7 +798,7 @@ func TestJSONEvaluator_semVerEvaluation(t *testing.T) { t.Fatalf("NewStore failed: %v", err) } je := NewJSON(log, s) - je.store.Update(source, tt.flags, model.Metadata{}) + je.store.Update(source, tt.flags, model.Metadata{}, false) value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant) diff --git a/core/pkg/evaluator/string_comparison_test.go b/core/pkg/evaluator/string_comparison_test.go index f22466f02..01905a446 100644 --- a/core/pkg/evaluator/string_comparison_test.go +++ b/core/pkg/evaluator/string_comparison_test.go @@ -157,7 +157,7 @@ func TestJSONEvaluator_startsWithEvaluation(t *testing.T) { t.Fatalf("NewStore failed: %v", err) } je := NewJSON(log, s) - je.store.Update(source, tt.flags, model.Metadata{}) + je.store.Update(source, tt.flags, model.Metadata{}, false) value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant) @@ -325,7 +325,7 @@ func TestJSONEvaluator_endsWithEvaluation(t *testing.T) { t.Fatalf("NewStore failed: %v", err) } je := NewJSON(log, s) - je.store.Update(source, tt.flags, model.Metadata{}) + je.store.Update(source, tt.flags, model.Metadata{}, false) value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant) diff --git a/core/pkg/store/store.go b/core/pkg/store/store.go index 059a4ecdd..0bf96dc7b 100644 --- a/core/pkg/store/store.go +++ b/core/pkg/store/store.go @@ -25,7 +25,7 @@ type IStore interface { Get(ctx context.Context, key string, selector *Selector) (model.Flag, model.Metadata, error) GetAll(ctx context.Context, selector *Selector) ([]model.Flag, model.Metadata, error) Watch(ctx context.Context, selector *Selector, watcher chan<- FlagQueryResult) - Update(source string, flags []model.Flag, metadata model.Metadata) + Update(source string, flags []model.Flag, metadata model.Metadata, incrementalUpdate bool) } var _ IStore = (*Store)(nil) @@ -214,10 +214,15 @@ type flagIdentifier struct { } // Update the flag state with the provided flags. +// When incrementalUpdate is true, deletion is scoped to only the flagSetIds present in +// this payload (from metadata and flag-level overrides), allowing flags from other +// flagSetIds to accumulate across updates. When false, all flags for the source are +// replaced (the default full-snapshot behavior). func (s *Store) Update( source string, flags []model.Flag, metadata model.Metadata, + incrementalUpdate bool, ) { if source == "" { panic("source cannot be empty") @@ -254,18 +259,16 @@ func (s *Store) Update( txn := s.db.Txn(true) defer txn.Abort() - // When metadata carries a flagSetId, scope deletion to only the flagSetIds touched by - // this update (the metadata-level one plus any flag-level overrides). This allows - // per-flagSetId updates (e.g., from per-project stream messages) to accumulate in - // the store without deleting flags from unrelated flagSetIds. - // When metadata has no flagSetId, fall back to source-scoped deletion (original behavior). - // Note: we keep the fsi != "" guard because empty-string flagSetIds are not normalized - // to nilFlagSetId at storage time, but NewSelector DOES normalize them, creating a - // mismatch that would cause queries to miss the actual flags. Source-scoped fallback - // is the correct behavior for empty-string flagSetIds. + // When incrementalUpdate is enabled, scope deletion to only the flagSetIds touched + // by this payload (metadata-level + flag-level overrides). This allows per-flagSetId + // updates (e.g., from per-project stream messages) to accumulate without deleting + // flags from unrelated flagSetIds. Otherwise, replace all flags for the source. var oldFlags []model.Flag - if fsi, ok := metadata["flagSetId"].(string); ok && fsi != "" { - seenFlagSetIds := map[string]struct{}{fsi: {}} + if incrementalUpdate { + seenFlagSetIds := make(map[string]struct{}) + if fsi, ok := metadata["flagSetId"].(string); ok && fsi != "" { + seenFlagSetIds[fsi] = struct{}{} + } for id := range newFlags { seenFlagSetIds[id.flagSetId] = struct{}{} } diff --git a/core/pkg/store/store_test.go b/core/pkg/store/store_test.go index fb893429a..d204240f4 100644 --- a/core/pkg/store/store_test.go +++ b/core/pkg/store/store_test.go @@ -80,7 +80,7 @@ func TestUpdateFlags(t *testing.T) { } s.Update(source1, []model.Flag{ {Key: "waka", DefaultVariant: "off"}, - }, nil) + }, nil, false) return s }, newFlags: []model.Flag{ @@ -100,7 +100,7 @@ func TestUpdateFlags(t *testing.T) { } s.Update(source1, []model.Flag{ {Key: "waka", DefaultVariant: "off"}, - }, nil) + }, nil, false) return s }, newFlags: []model.Flag{ @@ -119,7 +119,7 @@ func TestUpdateFlags(t *testing.T) { if err != nil { t.Fatalf("NewStore failed: %v", err) } - s.Update(source1, []model.Flag{}, model.Metadata{}) + s.Update(source1, []model.Flag{}, model.Metadata{}, false) return s }, setMetadata: model.Metadata{ @@ -142,7 +142,7 @@ func TestUpdateFlags(t *testing.T) { if err != nil { t.Fatalf("NewStore failed: %v", err) } - s.Update(source1, []model.Flag{}, model.Metadata{}) + s.Update(source1, []model.Flag{}, model.Metadata{}, false) return s }, @@ -168,7 +168,7 @@ func TestUpdateFlags(t *testing.T) { if err != nil { t.Fatalf("NewStore failed: %v", err) } - s.Update(source1, []model.Flag{}, model.Metadata{}) + s.Update(source1, []model.Flag{}, model.Metadata{}, false) return s }, setMetadata: model.Metadata{ @@ -195,7 +195,7 @@ func TestUpdateFlags(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() store := tt.setup(t) - store.Update(tt.source, tt.newFlags, tt.setMetadata) + store.Update(tt.source, tt.newFlags, tt.setMetadata, false) gotFlags, _, _ := store.GetAll(context.Background(), nil) sort.Slice(tt.wantFlags, func(i, j int) bool { return tt.wantFlags[i].FlagSetId+"|"+tt.wantFlags[i].Key > tt.wantFlags[j].FlagSetId+"|"+tt.wantFlags[j].Key @@ -330,7 +330,7 @@ func TestGet(t *testing.T) { } for _, source := range s.order { - store.Update(source.Name, source.flags, nil) + store.Update(source.Name, source.flags, nil, false) } gotFlag, _, err := store.Get(context.Background(), tt.key, tt.selector) @@ -473,7 +473,7 @@ func TestGetAllNoWatcher(t *testing.T) { } for _, source := range s.order { - store.Update(source.Name, source.flags, nil) + store.Update(source.Name, source.flags, nil, false) } gotFlags, _, _ := store.GetAll(context.Background(), tt.selector) @@ -556,9 +556,9 @@ func TestWatch(t *testing.T) { } // setup initial flags - store.Update(sourceA, sourceAFlags, model.Metadata{}) - store.Update(sourceB, sourceBFlags, model.Metadata{}) - store.Update(sourceC, sourceCFlags, model.Metadata{}) + store.Update(sourceA, sourceAFlags, model.Metadata{}, false) + store.Update(sourceB, sourceBFlags, model.Metadata{}, false) + store.Update(sourceC, sourceCFlags, model.Metadata{}, false) watcher := make(chan FlagQueryResult, 1) time.Sleep(pauseTime) @@ -573,14 +573,14 @@ func TestWatch(t *testing.T) { // changing a flag default variant should trigger an update store.Update(sourceA, []model.Flag{ {Key: "flagA", DefaultVariant: "on"}, - }, model.Metadata{}) + }, model.Metadata{}, false) time.Sleep(pauseTime) // changing a flag default variant should trigger an update store.Update(sourceB, []model.Flag{ {Key: "flagB", DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": myFlagSetId}}, - }, model.Metadata{}) + }, model.Metadata{}, false) time.Sleep(pauseTime) @@ -588,14 +588,14 @@ func TestWatch(t *testing.T) { // TODO: challenge this test and behaviour store.Update(sourceB, []model.Flag{ {Key: "flagB", DefaultVariant: "on"}, - }, model.Metadata{}) + }, model.Metadata{}, false) time.Sleep(pauseTime) // adding a flag set id should trigger an update store.Update(sourceB, []model.Flag{ {Key: "flagB", DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": myFlagSetId}}, - }, model.Metadata{}) + }, model.Metadata{}, false) }() updates := 0 @@ -625,8 +625,9 @@ func TestUpdateFlagSetIdScoping(t *testing.T) { sources := []string{src} type updateStep struct { - flags []model.Flag - metadata model.Metadata + flags []model.Flag + metadata model.Metadata + incrementalUpdate *bool // nil: incremental merge; explicit false: full replace for the source } tests := []struct { @@ -664,13 +665,29 @@ func TestUpdateFlagSetIdScoping(t *testing.T) { }, { name: "no flagSetId in metadata falls back to source-scoped deletion", - updates: []updateStep{ - {flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}}, - {flags: []model.Flag{{Key: "flagB"}}, metadata: model.Metadata{"flagSetId": "B"}}, - {flags: []model.Flag{}, metadata: nil}, - }, + updates: func() []updateStep { + fullSnapshot := false + return []updateStep{ + {flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}}, + {flags: []model.Flag{{Key: "flagB"}}, metadata: model.Metadata{"flagSetId": "B"}}, + {flags: []model.Flag{}, metadata: nil, incrementalUpdate: &fullSnapshot}, + } + }(), wantAbsent: []string{"A/flagA", "B/flagB"}, }, + { + name: "incrementalUpdate=false with flagSetId still does full-source deletion", + updates: func() []updateStep { + fullSnapshot := false + return []updateStep{ + {flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}}, + {flags: []model.Flag{{Key: "flagB"}}, metadata: model.Metadata{"flagSetId": "B"}}, + {flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}, incrementalUpdate: &fullSnapshot}, + } + }(), + wantPresent: []string{"A/flagA"}, + wantAbsent: []string{"B/flagB"}, + }, { name: "empty update with flagSetId clears only that set", updates: []updateStep{ @@ -691,7 +708,11 @@ func TestUpdateFlagSetIdScoping(t *testing.T) { require.NoError(t, err) for _, step := range tt.updates { - s.Update(src, step.flags, step.metadata) + inc := true + if step.incrementalUpdate != nil { + inc = *step.incrementalUpdate + } + s.Update(src, step.flags, step.metadata, inc) } allFlags, _, _ := s.GetAll(context.Background(), nil) @@ -770,7 +791,7 @@ func TestQueryMetadata(t *testing.T) { } // setup initial flags - store.Update(sourceA, sourceAFlags, model.Metadata{}) + store.Update(sourceA, sourceAFlags, model.Metadata{}, false) // #1708 Until we decide on the Selector syntax, only a single key=value pair is supported // these tests should then also cover more complex selectors diff --git a/core/pkg/sync/builder/syncbuilder.go b/core/pkg/sync/builder/syncbuilder.go index 947b9fec3..abbc9bbef 100644 --- a/core/pkg/sync/builder/syncbuilder.go +++ b/core/pkg/sync/builder/syncbuilder.go @@ -191,12 +191,13 @@ func (sb *SyncBuilder) newGRPC(config sync.SourceConfig, logger *logger.Logger) zap.String("component", "sync"), zap.String("sync", "grpc"), ), - CredentialBuilder: &credentials.CredentialBuilder{}, - CertPath: config.CertPath, - ProviderID: config.ProviderID, - Secure: config.TLS, - Selector: config.Selector, - MaxMsgSize: config.MaxMsgSize, + CredentialBuilder: &credentials.CredentialBuilder{}, + CertPath: config.CertPath, + ProviderID: config.ProviderID, + Secure: config.TLS, + Selector: config.Selector, + MaxMsgSize: config.MaxMsgSize, + IncrementalUpdates: config.IncrementalUpdates, } } diff --git a/core/pkg/sync/builder/syncbuilder_test.go b/core/pkg/sync/builder/syncbuilder_test.go index ed2d81023..5835a679c 100644 --- a/core/pkg/sync/builder/syncbuilder_test.go +++ b/core/pkg/sync/builder/syncbuilder_test.go @@ -277,6 +277,24 @@ func Test_SyncsFromFromConfig(t *testing.T) { } } +func Test_GrpcIncrementalUpdates(t *testing.T) { + lg := logger.NewLogger(nil, false) + sb := NewSyncBuilder() + + syncs, err := sb.SyncsFromConfig([]sync.SourceConfig{ + { + URI: "grpc://host:port", + Provider: syncProviderGrpc, + IncrementalUpdates: true, + }, + }, lg) + require.NoError(t, err) + require.Len(t, syncs, 1) + grpcSync, ok := syncs[0].(*grpc.Sync) + require.True(t, ok) + require.True(t, grpcSync.IncrementalUpdates, "IncrementalUpdates should be propagated from SourceConfig to grpc.Sync") +} + func Test_GcsConfig(t *testing.T) { lg := logger.NewLogger(nil, false) defaultInterval := uint32(5) diff --git a/core/pkg/sync/grpc/grpc_sync.go b/core/pkg/sync/grpc/grpc_sync.go index 53dca4105..ad6bb75bb 100644 --- a/core/pkg/sync/grpc/grpc_sync.go +++ b/core/pkg/sync/grpc/grpc_sync.go @@ -53,6 +53,7 @@ type Sync struct { Selector string URI string MaxMsgSize int + IncrementalUpdates bool client FlagSyncServiceClient ready bool @@ -104,8 +105,9 @@ func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error return err } dataSync <- sync.DataSync{ - FlagData: res.GetFlagConfiguration(), - Source: g.URI, + FlagData: res.GetFlagConfiguration(), + Source: g.URI, + IncrementalUpdates: g.IncrementalUpdates, } return nil } @@ -199,10 +201,11 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, } dataSync <- sync.DataSync{ - FlagData: data.FlagConfiguration, - SyncContext: data.SyncContext, - Source: g.URI, - Selector: g.Selector, + FlagData: data.FlagConfiguration, + SyncContext: data.SyncContext, + Source: g.URI, + Selector: g.Selector, + IncrementalUpdates: g.IncrementalUpdates, } g.Logger.Debug("received full configuration payload") diff --git a/core/pkg/sync/grpc/grpc_sync_test.go b/core/pkg/sync/grpc/grpc_sync_test.go index 75bd20619..2d320630a 100644 --- a/core/pkg/sync/grpc/grpc_sync_test.go +++ b/core/pkg/sync/grpc/grpc_sync_test.go @@ -192,6 +192,38 @@ func Test_ReSyncTests(t *testing.T) { } } +func Test_IncrementalUpdatesPropagatesToDataSync(t *testing.T) { + const target = "localBufCon" + + bufCon := bufconn.Listen(5) + bufServer := bufferedServer{ + listener: bufCon, + fetchAllFlagsResponse: &v1.FetchAllFlagsResponse{FlagConfiguration: "flags"}, + } + go serve(&bufServer) + + dial, err := grpc.Dial(target, + grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return bufCon.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + + grpcSync := Sync{ + URI: target, + Logger: logger.NewLogger(nil, false), + IncrementalUpdates: true, + client: syncv1grpc.NewFlagSyncServiceClient(dial), + } + + syncChan := make(chan sync.DataSync, 1) + err = grpcSync.ReSync(context.Background(), syncChan) + require.NoError(t, err) + + out := <-syncChan + require.True(t, out.IncrementalUpdates, "IncrementalUpdates should be propagated from Sync to DataSync via ReSync") +} + func Test_StreamListener(t *testing.T) { const target = "localBufCon" diff --git a/core/pkg/sync/isync.go b/core/pkg/sync/isync.go index b58ce2f7d..977724c56 100644 --- a/core/pkg/sync/isync.go +++ b/core/pkg/sync/isync.go @@ -32,6 +32,11 @@ type DataSync struct { SyncContext *structpb.Struct Source string Selector string + + // When true, the store scopes deletion to only the flagSetIds present in + // this payload rather than wiping all flags for the source. This must be + // explicitly opted-in per source via SourceConfig.IncrementalUpdates. + IncrementalUpdates bool } // SourceConfig is configuration option for flagd. This maps to startup parameter sources @@ -48,6 +53,12 @@ type SourceConfig struct { MaxMsgSize int `json:"maxMsgSize,omitempty"` TimeoutS int `json:"timeoutS,omitempty"` + // IncrementalUpdates opts this source into per-flagSetId scoped deletion. + // When false (default), each update replaces all flags for the source. + // When true, only flags matching the flagSetIds in the payload are replaced, + // allowing flags from other flagSetIds to accumulate across updates. + IncrementalUpdates bool `json:"incrementalUpdates,omitempty"` + OAuth *OAuthCredentialHandler `json:"oauth,omitempty"` } diff --git a/flagd/pkg/service/flag-evaluation/connect_service_test.go b/flagd/pkg/service/flag-evaluation/connect_service_test.go index 9405bb562..74f111dba 100644 --- a/flagd/pkg/service/flag-evaluation/connect_service_test.go +++ b/flagd/pkg/service/flag-evaluation/connect_service_test.go @@ -250,7 +250,7 @@ func TestConnectServiceWatcher(t *testing.T) { Key: "flag1", DefaultVariant: "off", }, - }, model.Metadata{}) + }, model.Metadata{}, false) // notification type ofType := iservice.ConfigurationChange diff --git a/flagd/pkg/service/flag-evaluation/eventing_test.go b/flagd/pkg/service/flag-evaluation/eventing_test.go index 0fcb06b29..d2870187a 100644 --- a/flagd/pkg/service/flag-evaluation/eventing_test.go +++ b/flagd/pkg/service/flag-evaluation/eventing_test.go @@ -84,12 +84,12 @@ func TestNotificationCompatibleWithStructpb(t *testing.T) { // first update sets up oldFlags. s.Update(sources[0], []model.Flag{ {Key: "flag1", DefaultVariant: "off"}, - }, model.Metadata{}) + }, model.Metadata{}, false) // second update triggers a ConfigurationChange with a real diff. s.Update(sources[0], []model.Flag{ {Key: "flag1", DefaultVariant: "on"}, - }, model.Metadata{}) + }, model.Metadata{}, false) select { case n := <-notifyChan: @@ -115,7 +115,7 @@ func TestNoNotificationWhenFlagsUnchanged(t *testing.T) { // first update creates flag1 — this produces a notification (create). s.Update(sources[0], []model.Flag{ {Key: "flag1", DefaultVariant: "off"}, - }, model.Metadata{}) + }, model.Metadata{}, false) // drain the first notification (flag creation). select { @@ -127,7 +127,7 @@ func TestNoNotificationWhenFlagsUnchanged(t *testing.T) { // second update with the same flags — should not produce a notification. s.Update(sources[0], []model.Flag{ {Key: "flag1", DefaultVariant: "off"}, - }, model.Metadata{}) + }, model.Metadata{}, false) select { case n := <-notifyChan: diff --git a/flagd/pkg/service/flag-sync/handler_test.go b/flagd/pkg/service/flag-sync/handler_test.go index 85af91eb8..7621be845 100644 --- a/flagd/pkg/service/flag-sync/handler_test.go +++ b/flagd/pkg/service/flag-sync/handler_test.go @@ -203,8 +203,8 @@ func TestSyncHandler_SelectorLocationPrecedence(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { flagStore, err := store.NewStore(logger.NewLogger(nil, false), []string{}) - flagStore.Update("header-source", headerFlags, nil) - flagStore.Update("body-source", bodyFlags, nil) + flagStore.Update("header-source", headerFlags, nil, false) + flagStore.Update("body-source", bodyFlags, nil, false) require.NoError(t, err) handler := syncHandler{ diff --git a/flagd/pkg/service/flag-sync/sync_service_test.go b/flagd/pkg/service/flag-sync/sync_service_test.go index efffb786e..9db2b007e 100644 --- a/flagd/pkg/service/flag-sync/sync_service_test.go +++ b/flagd/pkg/service/flag-sync/sync_service_test.go @@ -111,7 +111,7 @@ func TestSyncServiceEndToEnd(t *testing.T) { flagStore.Update(testSource1, testSource1Flags, model.Metadata{ "keyDuped": "value", "keyA": "valueA", - }) + }, false) select { case <-dataReceived: diff --git a/flagd/pkg/service/flag-sync/util_test.go b/flagd/pkg/service/flag-sync/util_test.go index d50199d2a..db4ac56e8 100644 --- a/flagd/pkg/service/flag-sync/util_test.go +++ b/flagd/pkg/service/flag-sync/util_test.go @@ -50,12 +50,12 @@ func getSimpleFlagStore(t testing.TB) (store.IStore, []string) { flagStore.Update(testSource1, testSource1Flags, model.Metadata{ "keyDuped": "value", "keyA": "valueA", - }) + }, false) flagStore.Update(testSource2, testSource2Flags, model.Metadata{ "keyDuped": "value", "keyB": "valueB", - }) + }, false) return flagStore, sources }