diff --git a/core/pkg/sync/kubernetes/kubernetes_sync.go b/core/pkg/sync/kubernetes/kubernetes_sync.go index 21f08420a..88fee6f32 100644 --- a/core/pkg/sync/kubernetes/kubernetes_sync.go +++ b/core/pkg/sync/kubernetes/kubernetes_sync.go @@ -10,15 +10,15 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/sync" - "github.com/open-feature/open-feature-operator/apis/core/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + + "github.com/open-feature/open-feature-operator/apis/core/v1beta1" ) var ( @@ -27,6 +27,8 @@ var ( featureFlagResource = v1beta1.GroupVersion.WithResource("featureflags") ) +const invalidAPIVersionMsg = "invalid api version %s, expected %s" + type SyncOption func(s *Sync) type Sync struct { @@ -99,22 +101,24 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI} - notifies := make(chan INotify) + // Buffer ensures notifier can publish the initial Ready event even if the watcher + // goroutine has not started reading yet. + notifies := make(chan INotify, 1) var wg msync.WaitGroup - // Start K8s resource notifier + // Start notifier watcher wg.Add(1) go func() { defer wg.Done() - k.notify(ctx, notifies) + k.watcher(ctx, notifies, dataSync) }() - // Start notifier watcher + // Start K8s resource notifier wg.Add(1) go func() { defer wg.Done() - k.watcher(ctx, notifies, dataSync) + k.notify(ctx, notifies) }() wg.Wait() @@ -165,13 +169,13 @@ func (k *Sync) fetch(ctx context.Context) (string, error) { } if exist { - configuration, err := toFFCfg(item) + configuration, err := asUnstructured(item) if err != nil { return "", err } k.logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI)) - return marshallFeatureFlagSpec(configuration) + return marshalFlagSpec(configuration) } // fallback to API access - this is an informer cache miss. Could happen at the startup where cache is not filled @@ -186,11 +190,11 @@ func (k *Sync) fetch(ctx context.Context) (string, error) { k.logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI)) - ff, err := toFFCfg(ffObj) + ff, err := asUnstructured(ffObj) if err != nil { return "", fmt.Errorf("unable to convert object %s/%s to FeatureFlag: %w", k.namespace, k.crdName, err) } - return marshallFeatureFlagSpec(ff) + return marshalFlagSpec(ff) } func (k *Sync) notify(ctx context.Context, c chan<- INotify) { @@ -233,16 +237,16 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) { // commonHandler emits the desired event if and only if handler receive an object matching apiVersion and resource name func commonHandler(obj interface{}, object types.NamespacedName, emitEvent DefaultEventType, c chan<- INotify) error { - ffObj, err := toFFCfg(obj) + u, err := asUnstructured(obj) if err != nil { return err } - if ffObj.APIVersion != apiVersion { - return fmt.Errorf("invalid api version %s, expected %s", ffObj.APIVersion, apiVersion) + if u.GetAPIVersion() != apiVersion { + return fmt.Errorf(invalidAPIVersionMsg, u.GetAPIVersion(), apiVersion) } - if ffObj.Name == object.Name { + if u.GetName() == object.Name { c <- &Notifier{ Event: Event[DefaultEventType]{ EventType: emitEvent, @@ -255,26 +259,25 @@ func commonHandler(obj interface{}, object types.NamespacedName, emitEvent Defau // updateFuncHandler handles updates. Event is emitted if and only if resource name, apiVersion of old & new are equal func updateFuncHandler(oldObj interface{}, newObj interface{}, object types.NamespacedName, c chan<- INotify) error { - ffOldObj, err := toFFCfg(oldObj) + ffOldObj, err := asUnstructured(oldObj) if err != nil { return err } - if ffOldObj.APIVersion != apiVersion { - return fmt.Errorf("invalid api version %s, expected %s", ffOldObj.APIVersion, apiVersion) + if ffOldObj.GetAPIVersion() != apiVersion { + return fmt.Errorf(invalidAPIVersionMsg, ffOldObj.GetAPIVersion(), apiVersion) } - ffNewObj, err := toFFCfg(newObj) + ffNewObj, err := asUnstructured(newObj) if err != nil { return err } - if ffNewObj.APIVersion != apiVersion { - return fmt.Errorf("invalid api version %s, expected %s", ffNewObj.APIVersion, apiVersion) + if ffNewObj.GetAPIVersion() != apiVersion { + return fmt.Errorf(invalidAPIVersionMsg, ffNewObj.GetAPIVersion(), apiVersion) } - if object.Name == ffNewObj.Name && ffOldObj.ResourceVersion != ffNewObj.ResourceVersion { - // Only update if there is an actual featureFlagSpec change + if object.Name == ffNewObj.GetName() && ffOldObj.GetResourceVersion() != ffNewObj.GetResourceVersion() { c <- &Notifier{ Event: Event[DefaultEventType]{ EventType: DefaultEventTypeModify, @@ -284,20 +287,18 @@ func updateFuncHandler(oldObj interface{}, newObj interface{}, object types.Name return nil } -// toFFCfg attempts to covert unstructured payload to configurations -func toFFCfg(object interface{}) (*v1beta1.FeatureFlag, error) { - u, ok := object.(*unstructured.Unstructured) - if !ok { - return nil, fmt.Errorf("provided value is not of type *unstructured.Unstructured") +func asUnstructured(object interface{}) (*unstructured.Unstructured, error) { + if u, ok := object.(*unstructured.Unstructured); ok { + return u, nil } - var ffObj v1beta1.FeatureFlag - err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &ffObj) - if err != nil { - return nil, fmt.Errorf("unable to convert unstructured to v1beta1.FeatureFlag: %w", err) + if tombstone, ok := object.(cache.DeletedFinalStateUnknown); ok { + if u, ok := tombstone.Obj.(*unstructured.Unstructured); ok { + return u, nil + } } - return &ffObj, nil + return nil, fmt.Errorf("provided value is not of type *unstructured.Unstructured") } // parseURI parse provided uri in the format of / to namespace, crdName. Results in an error @@ -310,10 +311,18 @@ func parseURI(uri string) (string, string, error) { return s[0], s[1], nil } -func marshallFeatureFlagSpec(ff *v1beta1.FeatureFlag) (string, error) { - b, err := json.Marshal(ff.Spec.FlagSpec) +func marshalFlagSpec(ff *unstructured.Unstructured) (string, error) { + flagSpec, found, err := unstructured.NestedMap(ff.Object, "spec", "flagSpec") + if err != nil { + return "", fmt.Errorf("failed to parse spec.flagSpec: %w", err) + } + if !found { + return "", fmt.Errorf("missing spec.flagSpec") + } + + b, err := json.Marshal(flagSpec) if err != nil { - return "", fmt.Errorf("failed to marshall FlagSpec: %s", err.Error()) + return "", fmt.Errorf("failed to marshal spec.flagSpec: %w", err) } return string(b), nil } diff --git a/core/pkg/sync/kubernetes/kubernetes_sync_test.go b/core/pkg/sync/kubernetes/kubernetes_sync_test.go index 51a553264..1ca8765c2 100644 --- a/core/pkg/sync/kubernetes/kubernetes_sync_test.go +++ b/core/pkg/sync/kubernetes/kubernetes_sync_test.go @@ -7,6 +7,7 @@ import ( "fmt" "reflect" "strings" + stdsync "sync" "testing" "time" @@ -73,7 +74,7 @@ func Test_parseURI(t *testing.T) { } } -func Test_toFFCfg(t *testing.T) { +func TestAsUnstructured(t *testing.T) { validFFCfg := v1beta1.FeatureFlag{ TypeMeta: Metadata, } @@ -81,13 +82,21 @@ func Test_toFFCfg(t *testing.T) { tests := []struct { name string input interface{} - want *v1beta1.FeatureFlag + want *unstructured.Unstructured wantErr bool }{ { name: "Simple success", input: toUnstructured(t, validFFCfg), - want: &validFFCfg, + want: toUnstructured(t, validFFCfg), + wantErr: false, + }, + { + name: "Tombstone unwraps", + input: cache.DeletedFinalStateUnknown{ + Obj: toUnstructured(t, validFFCfg), + }, + want: toUnstructured(t, validFFCfg), wantErr: false, }, { @@ -102,15 +111,15 @@ func Test_toFFCfg(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := toFFCfg(tt.input) + got, err := asUnstructured(tt.input) if (err != nil) != tt.wantErr { - t.Errorf("toFFCfg() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("asUnstructured() error = %v, wantErr %v", err, tt.wantErr) return } if !reflect.DeepEqual(got, tt.want) { - t.Errorf("toFFCfg() got = %v, want %v", got, tt.want) + t.Errorf("asUnstructured() got = %v, want %v", got, tt.want) } }) } @@ -657,23 +666,35 @@ func TestSync_ReSync(t *testing.T) { t.Errorf("The Sync should not be ready") } dataChannel := make(chan sync.DataSync, tt.countMsg) + ctx, cancel := context.WithCancel(context.Background()) if tt.async { + var wg stdsync.WaitGroup + wg.Add(1) go func() { - if err := tt.k.Sync(context.TODO(), dataChannel); err != nil { - t.Errorf("Unexpected error: %v", e) - } - if err := tt.k.ReSync(context.TODO(), dataChannel); err != nil { - t.Errorf("Unexpected error: %v", e) + defer wg.Done() + if err := tt.k.Sync(ctx, dataChannel); err != nil { + t.Errorf("Unexpected error: %v", err) } }() - i := tt.countMsg - for i > 0 { - d := <-dataChannel - if d.FlagData != payload { - t.Errorf("Expected %v, got %v", payload, d.FlagData) + + if err := tt.k.ReSync(ctx, dataChannel); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + + for i := tt.countMsg; i > 0; i-- { + select { + case d := <-dataChannel: + if d.FlagData != payload { + t.Errorf("Expected %v, got %v", payload, d.FlagData) + } + case <-time.After(time.Second): + t.Fatalf("timeout waiting for data") } - i-- } + + cancel() + wg.Wait() } else { if err := tt.k.Sync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") { t.Errorf("Unexpected error: %v", err) @@ -817,12 +838,16 @@ func getCFG(name, namespace string) map[string]interface{} { "name": name, "namespace": namespace, }, - "spec": map[string]interface{}{}, + "spec": map[string]interface{}{ + "flagSpec": map[string]interface{}{ + "flags": nil, + }, + }, } } // toUnstructured helper to convert an interface to unstructured.Unstructured -func toUnstructured(t *testing.T, obj interface{}) interface{} { +func toUnstructured(t *testing.T, obj interface{}) *unstructured.Unstructured { bytes, err := json.Marshal(obj) if err != nil { t.Errorf("test setup faulure: %s", err.Error()) @@ -852,7 +877,7 @@ func (m *MockInformer) GetStore() cache.Store { } func TestMeasure(t *testing.T) { - res, err := marshallFeatureFlagSpec(&v1beta1.FeatureFlag{ + res, err := marshalFlagSpec(toUnstructured(t, v1beta1.FeatureFlag{ Spec: v1beta1.FeatureFlagSpec{ FlagSpec: v1beta1.FlagSpec{ Flags: v1beta1.Flags{ @@ -864,8 +889,8 @@ func TestMeasure(t *testing.T) { }, }, }, - }) + })) - require.Nil(t, err) - require.Equal(t, "{\"flags\":{\"flag\":{\"state\":\"\",\"variants\":null,\"defaultVariant\":\"kubernetes\"}}}", res) + require.NoError(t, err) + require.JSONEq(t, `{"flags":{"flag":{"state":"","variants":null,"defaultVariant":"kubernetes"}}}`, res) }