Skip to content

Commit 5bf055b

Browse files
committed
fix(store): scope flag deletion by flagSetId to support per-selector sync
When an upstream provider sends per-selector stream messages (each with a flagSetId in metadata), store.Update now only deletes stale flags within the same flagSetId(s), preserving flags from other selector. Signed-off-by: Eren Atas <eren_atas@hotmail.com>
1 parent d9b5aa2 commit 5bf055b

File tree

3 files changed

+182
-8
lines changed

3 files changed

+182
-8
lines changed

core/pkg/store/query.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,17 @@ func (s *Selector) ToMetadata() model.Metadata {
136136
}
137137

138138
func (s *Selector) ToLogString() string {
139-
if s != nil && len(s.indexMap) == 1 {
140-
for k, v := range s.indexMap {
141-
return fmt.Sprintf("'%s=%s'", k, v)
142-
}
139+
if s == nil || len(s.indexMap) == 0 {
140+
return "<none>"
143141
}
144-
return "<none>"
142+
keys := make([]string, 0, len(s.indexMap))
143+
for k := range s.indexMap {
144+
keys = append(keys, k)
145+
}
146+
sort.Strings(keys)
147+
parts := make([]string, 0, len(keys))
148+
for _, k := range keys {
149+
parts = append(parts, fmt.Sprintf("%s=%s", k, s.indexMap[k]))
150+
}
151+
return "'" + strings.Join(parts, ",") + "'"
145152
}

core/pkg/store/store.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,41 @@ func (s *Store) Update(
254254
txn := s.db.Txn(true)
255255
defer txn.Abort()
256256

257-
// get all flags for the source we are updating
258-
selector := NewSelector(sourceIndex + "=" + source)
259-
oldFlags, _, _ := s.GetAll(context.Background(), &selector)
257+
// When metadata carries a flagSetId, scope deletion to only the flagSetIds touched by
258+
// this update (the metadata-level one plus any flag-level overrides). This allows
259+
// per-flagSetId updates (e.g., from per-project stream messages) to accumulate in
260+
// the store without deleting flags from unrelated flagSetIds.
261+
// When metadata has no flagSetId, fall back to source-scoped deletion (original behavior).
262+
// Note: we keep the fsi != "" guard because empty-string flagSetIds are not normalized
263+
// to nilFlagSetId at storage time, but NewSelector DOES normalize them, creating a
264+
// mismatch that would cause queries to miss the actual flags. Source-scoped fallback
265+
// is the correct behavior for empty-string flagSetIds.
266+
var oldFlags []model.Flag
267+
if fsi, ok := metadata["flagSetId"].(string); ok && fsi != "" {
268+
seenFlagSetIds := map[string]struct{}{fsi: {}}
269+
for id := range newFlags {
270+
seenFlagSetIds[id.flagSetId] = struct{}{}
271+
}
272+
for fsi := range seenFlagSetIds {
273+
sel := NewSelector(flagSetIdIndex + "=" + fsi).WithIndex(sourceIndex, source)
274+
indexId, constraints := sel.ToQuery()
275+
it, err := txn.Get(flagsTable, indexId, constraints...)
276+
if err != nil {
277+
s.logger.Error(fmt.Sprintf("unable to query flags for flagSetId %s: %v", fsi, err))
278+
continue
279+
}
280+
oldFlags = append(oldFlags, s.collect(it)...)
281+
}
282+
} else {
283+
sel := NewSelector(sourceIndex + "=" + source)
284+
indexId, constraints := sel.ToQuery()
285+
it, err := txn.Get(flagsTable, indexId, constraints...)
286+
if err != nil {
287+
s.logger.Error(fmt.Sprintf("unable to query flags for source %s: %v", source, err))
288+
} else {
289+
oldFlags = s.collect(it)
290+
}
291+
}
260292

261293
for _, oldFlag := range oldFlags {
262294
if _, ok := newFlags[flagIdentifier{flagSetId: oldFlag.FlagSetId, key: oldFlag.Key}]; !ok {

core/pkg/store/store_test.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,141 @@ func TestWatch(t *testing.T) {
618618
}
619619
}
620620

621+
func TestUpdateFlagSetIdScoping(t *testing.T) {
622+
t.Parallel()
623+
624+
const src = "src1"
625+
sources := []string{src}
626+
627+
type updateStep struct {
628+
flags []model.Flag
629+
metadata model.Metadata
630+
}
631+
632+
tests := []struct {
633+
name string
634+
updates []updateStep
635+
wantPresent []string // "flagSetId/key" entries expected in the store
636+
wantAbsent []string // "flagSetId/key" entries expected to be gone
637+
}{
638+
{
639+
name: "per-flagSetId update preserves flags from other flagSetIds",
640+
updates: []updateStep{
641+
{flags: []model.Flag{{Key: "flagA1"}, {Key: "flagA2"}}, metadata: model.Metadata{"flagSetId": "A"}},
642+
{flags: []model.Flag{{Key: "flagB1"}}, metadata: model.Metadata{"flagSetId": "B"}},
643+
{flags: []model.Flag{{Key: "flagA1"}}, metadata: model.Metadata{"flagSetId": "A"}},
644+
},
645+
wantPresent: []string{"A/flagA1", "B/flagB1"},
646+
wantAbsent: []string{"A/flagA2"},
647+
},
648+
{
649+
name: "out-of-scope flag-level override persists when not in batch",
650+
updates: []updateStep{
651+
{flags: []model.Flag{{Key: "kept"}, {Key: "override", Metadata: model.Metadata{"flagSetId": "Y"}}}, metadata: model.Metadata{"flagSetId": "X"}},
652+
{flags: []model.Flag{{Key: "kept"}}, metadata: model.Metadata{"flagSetId": "X"}},
653+
},
654+
wantPresent: []string{"X/kept", "Y/override"},
655+
},
656+
{
657+
name: "stale flag deleted when its flagSetId is in scope",
658+
updates: []updateStep{
659+
{flags: []model.Flag{{Key: "inX"}, {Key: "inY", Metadata: model.Metadata{"flagSetId": "Y"}}}, metadata: model.Metadata{"flagSetId": "X"}},
660+
{flags: []model.Flag{{Key: "inY", Metadata: model.Metadata{"flagSetId": "Y"}}}, metadata: model.Metadata{"flagSetId": "X"}},
661+
},
662+
wantPresent: []string{"Y/inY"},
663+
wantAbsent: []string{"X/inX"},
664+
},
665+
{
666+
name: "no flagSetId in metadata falls back to source-scoped deletion",
667+
updates: []updateStep{
668+
{flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}},
669+
{flags: []model.Flag{{Key: "flagB"}}, metadata: model.Metadata{"flagSetId": "B"}},
670+
{flags: []model.Flag{}, metadata: nil},
671+
},
672+
wantAbsent: []string{"A/flagA", "B/flagB"},
673+
},
674+
{
675+
name: "empty update with flagSetId clears only that set",
676+
updates: []updateStep{
677+
{flags: []model.Flag{{Key: "flagA"}}, metadata: model.Metadata{"flagSetId": "A"}},
678+
{flags: []model.Flag{{Key: "flagB"}}, metadata: model.Metadata{"flagSetId": "B"}},
679+
{flags: []model.Flag{}, metadata: model.Metadata{"flagSetId": "A"}},
680+
},
681+
wantPresent: []string{"B/flagB"},
682+
wantAbsent: []string{"A/flagA"},
683+
},
684+
}
685+
686+
for _, tt := range tests {
687+
tt := tt
688+
t.Run(tt.name, func(t *testing.T) {
689+
t.Parallel()
690+
s, err := NewStore(logger.NewLogger(nil, false), sources)
691+
require.NoError(t, err)
692+
693+
for _, step := range tt.updates {
694+
s.Update(src, step.flags, step.metadata)
695+
}
696+
697+
allFlags, _, _ := s.GetAll(context.Background(), nil)
698+
flagKeys := make(map[string]struct{}, len(allFlags))
699+
for _, f := range allFlags {
700+
flagKeys[f.FlagSetId+"/"+f.Key] = struct{}{}
701+
}
702+
703+
for _, key := range tt.wantPresent {
704+
assert.Contains(t, flagKeys, key)
705+
}
706+
for _, key := range tt.wantAbsent {
707+
assert.NotContains(t, flagKeys, key)
708+
}
709+
})
710+
}
711+
}
712+
713+
func TestToLogStringCompound(t *testing.T) {
714+
t.Parallel()
715+
716+
tests := []struct {
717+
name string
718+
selector *Selector
719+
want string
720+
}{
721+
{
722+
name: "nil selector",
723+
selector: nil,
724+
want: "<none>",
725+
},
726+
{
727+
name: "empty selector",
728+
selector: &Selector{indexMap: map[string]string{}},
729+
want: "<none>",
730+
},
731+
{
732+
name: "single key",
733+
selector: &Selector{indexMap: map[string]string{"source": "mySource"}},
734+
want: "'source=mySource'",
735+
},
736+
{
737+
name: "compound selector",
738+
selector: &Selector{indexMap: map[string]string{"flagSetId": "abc", "source": "mySource"}},
739+
want: "'flagSetId=abc,source=mySource'",
740+
},
741+
{
742+
name: "three keys sorted",
743+
selector: &Selector{indexMap: map[string]string{"source": "s", "key": "k", "flagSetId": "f"}},
744+
want: "'flagSetId=f,key=k,source=s'",
745+
},
746+
}
747+
748+
for _, tt := range tests {
749+
t.Run(tt.name, func(t *testing.T) {
750+
got := tt.selector.ToLogString()
751+
assert.Equal(t, tt.want, got)
752+
})
753+
}
754+
}
755+
621756
func TestQueryMetadata(t *testing.T) {
622757

623758
sourceA := "sourceA"

0 commit comments

Comments
 (0)