Skip to content

Commit b96d0b7

Browse files
committed
add opt-in per-flagSetId incremental updates and add tests
1 parent 5bf055b commit b96d0b7

File tree

16 files changed

+153
-64
lines changed

16 files changed

+153
-64
lines changed

core/pkg/evaluator/fractional_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ func TestFractionalEvaluation(t *testing.T) {
409409
}
410410

411411
je := NewJSON(log, s)
412-
je.store.Update(source, tt.flags, model.Metadata{})
412+
je.store.Update(source, tt.flags, model.Metadata{}, false)
413413

414414
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
415415

@@ -537,7 +537,7 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
537537
b.Fatalf("NewStore failed: %v", err)
538538
}
539539
je := NewJSON(log, s)
540-
je.store.Update(source, tt.flags, model.Metadata{})
540+
je.store.Update(source, tt.flags, model.Metadata{}, false)
541541

542542
for i := 0; i < b.N; i++ {
543543
value, variant, reason, _, err := resolve[string](

core/pkg/evaluator/json.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (je *JSON) SetState(payload sync.DataSync) error {
137137
return err
138138
}
139139

140-
je.store.Update(payload.Source, definition.Flags, definition.Metadata)
140+
je.store.Update(payload.Source, definition.Flags, definition.Metadata, payload.IncrementalUpdates)
141141

142142
return nil
143143
}

core/pkg/evaluator/semver_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -798,7 +798,7 @@ func TestJSONEvaluator_semVerEvaluation(t *testing.T) {
798798
t.Fatalf("NewStore failed: %v", err)
799799
}
800800
je := NewJSON(log, s)
801-
je.store.Update(source, tt.flags, model.Metadata{})
801+
je.store.Update(source, tt.flags, model.Metadata{}, false)
802802

803803
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
804804

core/pkg/evaluator/string_comparison_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
157157
t.Fatalf("NewStore failed: %v", err)
158158
}
159159
je := NewJSON(log, s)
160-
je.store.Update(source, tt.flags, model.Metadata{})
160+
je.store.Update(source, tt.flags, model.Metadata{}, false)
161161

162162
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
163163

@@ -325,7 +325,7 @@ func TestJSONEvaluator_endsWithEvaluation(t *testing.T) {
325325
t.Fatalf("NewStore failed: %v", err)
326326
}
327327
je := NewJSON(log, s)
328-
je.store.Update(source, tt.flags, model.Metadata{})
328+
je.store.Update(source, tt.flags, model.Metadata{}, false)
329329

330330
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
331331

core/pkg/store/store.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type IStore interface {
2525
Get(ctx context.Context, key string, selector *Selector) (model.Flag, model.Metadata, error)
2626
GetAll(ctx context.Context, selector *Selector) ([]model.Flag, model.Metadata, error)
2727
Watch(ctx context.Context, selector *Selector, watcher chan<- FlagQueryResult)
28-
Update(source string, flags []model.Flag, metadata model.Metadata)
28+
Update(source string, flags []model.Flag, metadata model.Metadata, incrementalUpdate bool)
2929
}
3030

3131
var _ IStore = (*Store)(nil)
@@ -214,10 +214,15 @@ type flagIdentifier struct {
214214
}
215215

216216
// Update the flag state with the provided flags.
217+
// When incrementalUpdate is true, deletion is scoped to only the flagSetIds present in
218+
// this payload (from metadata and flag-level overrides), allowing flags from other
219+
// flagSetIds to accumulate across updates. When false, all flags for the source are
220+
// replaced (the default full-snapshot behavior).
217221
func (s *Store) Update(
218222
source string,
219223
flags []model.Flag,
220224
metadata model.Metadata,
225+
incrementalUpdate bool,
221226
) {
222227
if source == "" {
223228
panic("source cannot be empty")
@@ -254,18 +259,16 @@ func (s *Store) Update(
254259
txn := s.db.Txn(true)
255260
defer txn.Abort()
256261

257-
// When metadata carries a flagSetId, scope deletion to only the flagSetIds touched by
258-
// this update (the metadata-level one plus any flag-level overrides). This allows
259-
// per-flagSetId updates (e.g., from per-project stream messages) to accumulate in
260-
// the store without deleting flags from unrelated flagSetIds.
261-
// When metadata has no flagSetId, fall back to source-scoped deletion (original behavior).
262-
// Note: we keep the fsi != "" guard because empty-string flagSetIds are not normalized
263-
// to nilFlagSetId at storage time, but NewSelector DOES normalize them, creating a
264-
// mismatch that would cause queries to miss the actual flags. Source-scoped fallback
265-
// is the correct behavior for empty-string flagSetIds.
262+
// When incrementalUpdate is enabled, scope deletion to only the flagSetIds touched
263+
// by this payload (metadata-level + flag-level overrides). This allows per-flagSetId
264+
// updates (e.g., from per-project stream messages) to accumulate without deleting
265+
// flags from unrelated flagSetIds. Otherwise, replace all flags for the source.
266266
var oldFlags []model.Flag
267-
if fsi, ok := metadata["flagSetId"].(string); ok && fsi != "" {
268-
seenFlagSetIds := map[string]struct{}{fsi: {}}
267+
if incrementalUpdate {
268+
seenFlagSetIds := make(map[string]struct{})
269+
if fsi, ok := metadata["flagSetId"].(string); ok && fsi != "" {
270+
seenFlagSetIds[fsi] = struct{}{}
271+
}
269272
for id := range newFlags {
270273
seenFlagSetIds[id.flagSetId] = struct{}{}
271274
}

core/pkg/store/store_test.go

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestUpdateFlags(t *testing.T) {
8080
}
8181
s.Update(source1, []model.Flag{
8282
{Key: "waka", DefaultVariant: "off"},
83-
}, nil)
83+
}, nil, false)
8484
return s
8585
},
8686
newFlags: []model.Flag{
@@ -100,7 +100,7 @@ func TestUpdateFlags(t *testing.T) {
100100
}
101101
s.Update(source1, []model.Flag{
102102
{Key: "waka", DefaultVariant: "off"},
103-
}, nil)
103+
}, nil, false)
104104
return s
105105
},
106106
newFlags: []model.Flag{
@@ -119,7 +119,7 @@ func TestUpdateFlags(t *testing.T) {
119119
if err != nil {
120120
t.Fatalf("NewStore failed: %v", err)
121121
}
122-
s.Update(source1, []model.Flag{}, model.Metadata{})
122+
s.Update(source1, []model.Flag{}, model.Metadata{}, false)
123123
return s
124124
},
125125
setMetadata: model.Metadata{
@@ -142,7 +142,7 @@ func TestUpdateFlags(t *testing.T) {
142142
if err != nil {
143143
t.Fatalf("NewStore failed: %v", err)
144144
}
145-
s.Update(source1, []model.Flag{}, model.Metadata{})
145+
s.Update(source1, []model.Flag{}, model.Metadata{}, false)
146146
return s
147147

148148
},
@@ -168,7 +168,7 @@ func TestUpdateFlags(t *testing.T) {
168168
if err != nil {
169169
t.Fatalf("NewStore failed: %v", err)
170170
}
171-
s.Update(source1, []model.Flag{}, model.Metadata{})
171+
s.Update(source1, []model.Flag{}, model.Metadata{}, false)
172172
return s
173173
},
174174
setMetadata: model.Metadata{
@@ -195,7 +195,7 @@ func TestUpdateFlags(t *testing.T) {
195195
t.Run(tt.name, func(t *testing.T) {
196196
t.Parallel()
197197
store := tt.setup(t)
198-
store.Update(tt.source, tt.newFlags, tt.setMetadata)
198+
store.Update(tt.source, tt.newFlags, tt.setMetadata, false)
199199
gotFlags, _, _ := store.GetAll(context.Background(), nil)
200200
sort.Slice(tt.wantFlags, func(i, j int) bool {
201201
return tt.wantFlags[i].FlagSetId+"|"+tt.wantFlags[i].Key > tt.wantFlags[j].FlagSetId+"|"+tt.wantFlags[j].Key
@@ -330,7 +330,7 @@ func TestGet(t *testing.T) {
330330
}
331331

332332
for _, source := range s.order {
333-
store.Update(source.Name, source.flags, nil)
333+
store.Update(source.Name, source.flags, nil, false)
334334
}
335335
gotFlag, _, err := store.Get(context.Background(), tt.key, tt.selector)
336336

@@ -473,7 +473,7 @@ func TestGetAllNoWatcher(t *testing.T) {
473473
}
474474

475475
for _, source := range s.order {
476-
store.Update(source.Name, source.flags, nil)
476+
store.Update(source.Name, source.flags, nil, false)
477477
}
478478
gotFlags, _, _ := store.GetAll(context.Background(), tt.selector)
479479

@@ -556,9 +556,9 @@ func TestWatch(t *testing.T) {
556556
}
557557

558558
// setup initial flags
559-
store.Update(sourceA, sourceAFlags, model.Metadata{})
560-
store.Update(sourceB, sourceBFlags, model.Metadata{})
561-
store.Update(sourceC, sourceCFlags, model.Metadata{})
559+
store.Update(sourceA, sourceAFlags, model.Metadata{}, false)
560+
store.Update(sourceB, sourceBFlags, model.Metadata{}, false)
561+
store.Update(sourceC, sourceCFlags, model.Metadata{}, false)
562562
watcher := make(chan FlagQueryResult, 1)
563563
time.Sleep(pauseTime)
564564

@@ -573,29 +573,29 @@ func TestWatch(t *testing.T) {
573573
// changing a flag default variant should trigger an update
574574
store.Update(sourceA, []model.Flag{
575575
{Key: "flagA", DefaultVariant: "on"},
576-
}, model.Metadata{})
576+
}, model.Metadata{}, false)
577577

578578
time.Sleep(pauseTime)
579579

580580
// changing a flag default variant should trigger an update
581581
store.Update(sourceB, []model.Flag{
582582
{Key: "flagB", DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
583-
}, model.Metadata{})
583+
}, model.Metadata{}, false)
584584

585585
time.Sleep(pauseTime)
586586

587587
// removing a flag set id should trigger an update (even for flag set id selectors; it should remove the flag from the set)
588588
// TODO: challenge this test and behaviour
589589
store.Update(sourceB, []model.Flag{
590590
{Key: "flagB", DefaultVariant: "on"},
591-
}, model.Metadata{})
591+
}, model.Metadata{}, false)
592592

593593
time.Sleep(pauseTime)
594594

595595
// adding a flag set id should trigger an update
596596
store.Update(sourceB, []model.Flag{
597597
{Key: "flagB", DefaultVariant: "on", Metadata: model.Metadata{"flagSetId": myFlagSetId}},
598-
}, model.Metadata{})
598+
}, model.Metadata{}, false)
599599
}()
600600

601601
updates := 0
@@ -625,8 +625,9 @@ func TestUpdateFlagSetIdScoping(t *testing.T) {
625625
sources := []string{src}
626626

627627
type updateStep struct {
628-
flags []model.Flag
629-
metadata model.Metadata
628+
flags []model.Flag
629+
metadata model.Metadata
630+
incrementalUpdate *bool // nil: incremental merge; explicit false: full replace for the source
630631
}
631632

632633
tests := []struct {
@@ -664,13 +665,29 @@ func TestUpdateFlagSetIdScoping(t *testing.T) {
664665
},
665666
{
666667
name: "no flagSetId in metadata falls back to source-scoped deletion",
667-
updates: []updateStep{
668-
{flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}},
669-
{flags: []model.Flag{{Key: "flagB"}}, metadata: model.Metadata{"flagSetId": "B"}},
670-
{flags: []model.Flag{}, metadata: nil},
671-
},
668+
updates: func() []updateStep {
669+
fullSnapshot := false
670+
return []updateStep{
671+
{flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}},
672+
{flags: []model.Flag{{Key: "flagB"}}, metadata: model.Metadata{"flagSetId": "B"}},
673+
{flags: []model.Flag{}, metadata: nil, incrementalUpdate: &fullSnapshot},
674+
}
675+
}(),
672676
wantAbsent: []string{"A/flagA", "B/flagB"},
673677
},
678+
{
679+
name: "incrementalUpdate=false with flagSetId still does full-source deletion",
680+
updates: func() []updateStep {
681+
fullSnapshot := false
682+
return []updateStep{
683+
{flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}},
684+
{flags: []model.Flag{{Key: "flagB"}}, metadata: model.Metadata{"flagSetId": "B"}},
685+
{flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}, incrementalUpdate: &fullSnapshot},
686+
}
687+
}(),
688+
wantPresent: []string{"A/flagA"},
689+
wantAbsent: []string{"B/flagB"},
690+
},
674691
{
675692
name: "empty update with flagSetId clears only that set",
676693
updates: []updateStep{
@@ -691,7 +708,11 @@ func TestUpdateFlagSetIdScoping(t *testing.T) {
691708
require.NoError(t, err)
692709

693710
for _, step := range tt.updates {
694-
s.Update(src, step.flags, step.metadata)
711+
inc := true
712+
if step.incrementalUpdate != nil {
713+
inc = *step.incrementalUpdate
714+
}
715+
s.Update(src, step.flags, step.metadata, inc)
695716
}
696717

697718
allFlags, _, _ := s.GetAll(context.Background(), nil)
@@ -770,7 +791,7 @@ func TestQueryMetadata(t *testing.T) {
770791
}
771792

772793
// setup initial flags
773-
store.Update(sourceA, sourceAFlags, model.Metadata{})
794+
store.Update(sourceA, sourceAFlags, model.Metadata{}, false)
774795

775796
// #1708 Until we decide on the Selector syntax, only a single key=value pair is supported
776797
// these tests should then also cover more complex selectors

core/pkg/sync/builder/syncbuilder.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,13 @@ func (sb *SyncBuilder) newGRPC(config sync.SourceConfig, logger *logger.Logger)
191191
zap.String("component", "sync"),
192192
zap.String("sync", "grpc"),
193193
),
194-
CredentialBuilder: &credentials.CredentialBuilder{},
195-
CertPath: config.CertPath,
196-
ProviderID: config.ProviderID,
197-
Secure: config.TLS,
198-
Selector: config.Selector,
199-
MaxMsgSize: config.MaxMsgSize,
194+
CredentialBuilder: &credentials.CredentialBuilder{},
195+
CertPath: config.CertPath,
196+
ProviderID: config.ProviderID,
197+
Secure: config.TLS,
198+
Selector: config.Selector,
199+
MaxMsgSize: config.MaxMsgSize,
200+
IncrementalUpdates: config.IncrementalUpdates,
200201
}
201202
}
202203

core/pkg/sync/builder/syncbuilder_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,24 @@ func Test_SyncsFromFromConfig(t *testing.T) {
277277
}
278278
}
279279

280+
func Test_GrpcIncrementalUpdates(t *testing.T) {
281+
lg := logger.NewLogger(nil, false)
282+
sb := NewSyncBuilder()
283+
284+
syncs, err := sb.SyncsFromConfig([]sync.SourceConfig{
285+
{
286+
URI: "grpc://host:port",
287+
Provider: syncProviderGrpc,
288+
IncrementalUpdates: true,
289+
},
290+
}, lg)
291+
require.NoError(t, err)
292+
require.Len(t, syncs, 1)
293+
grpcSync, ok := syncs[0].(*grpc.Sync)
294+
require.True(t, ok)
295+
require.True(t, grpcSync.IncrementalUpdates, "IncrementalUpdates should be propagated from SourceConfig to grpc.Sync")
296+
}
297+
280298
func Test_GcsConfig(t *testing.T) {
281299
lg := logger.NewLogger(nil, false)
282300
defaultInterval := uint32(5)

core/pkg/sync/grpc/grpc_sync.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type Sync struct {
5353
Selector string
5454
URI string
5555
MaxMsgSize int
56+
IncrementalUpdates bool
5657

5758
client FlagSyncServiceClient
5859
ready bool
@@ -104,8 +105,9 @@ func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
104105
return err
105106
}
106107
dataSync <- sync.DataSync{
107-
FlagData: res.GetFlagConfiguration(),
108-
Source: g.URI,
108+
FlagData: res.GetFlagConfiguration(),
109+
Source: g.URI,
110+
IncrementalUpdates: g.IncrementalUpdates,
109111
}
110112
return nil
111113
}
@@ -199,10 +201,11 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
199201
}
200202

201203
dataSync <- sync.DataSync{
202-
FlagData: data.FlagConfiguration,
203-
SyncContext: data.SyncContext,
204-
Source: g.URI,
205-
Selector: g.Selector,
204+
FlagData: data.FlagConfiguration,
205+
SyncContext: data.SyncContext,
206+
Source: g.URI,
207+
Selector: g.Selector,
208+
IncrementalUpdates: g.IncrementalUpdates,
206209
}
207210

208211
g.Logger.Debug("received full configuration payload")

0 commit comments

Comments
 (0)