Skip to content
81 changes: 45 additions & 36 deletions core/pkg/sync/kubernetes/kubernetes_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (

"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/open-feature-operator/apis/core/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"

"github.com/open-feature/open-feature-operator/apis/core/v1beta1"
)

var (
Expand All @@ -27,6 +27,8 @@ var (
featureFlagResource = v1beta1.GroupVersion.WithResource("featureflags")
)

const invalidAPIVersionMsg = "invalid api version %s, expected %s"

type SyncOption func(s *Sync)

type Sync struct {
Expand Down Expand Up @@ -99,22 +101,24 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {

dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI}

notifies := make(chan INotify)
// Buffer ensures notifier can publish the initial Ready event even if the watcher
// goroutine has not started reading yet.
notifies := make(chan INotify, 1)

var wg msync.WaitGroup

// Start K8s resource notifier
// Start notifier watcher
wg.Add(1)
go func() {
defer wg.Done()
k.notify(ctx, notifies)
k.watcher(ctx, notifies, dataSync)
}()

// Start notifier watcher
// Start K8s resource notifier
wg.Add(1)
go func() {
defer wg.Done()
k.watcher(ctx, notifies, dataSync)
k.notify(ctx, notifies)
}()

wg.Wait()
Expand Down Expand Up @@ -165,13 +169,13 @@ func (k *Sync) fetch(ctx context.Context) (string, error) {
}

if exist {
configuration, err := toFFCfg(item)
configuration, err := asUnstructured(item)
if err != nil {
return "", err
}

k.logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI))
return marshallFeatureFlagSpec(configuration)
return marshalFlagSpec(configuration)
}

// fallback to API access - this is an informer cache miss. Could happen at the startup where cache is not filled
Expand All @@ -186,11 +190,11 @@ func (k *Sync) fetch(ctx context.Context) (string, error) {

k.logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI))

ff, err := toFFCfg(ffObj)
ff, err := asUnstructured(ffObj)
if err != nil {
return "", fmt.Errorf("unable to convert object %s/%s to FeatureFlag: %w", k.namespace, k.crdName, err)
}
return marshallFeatureFlagSpec(ff)
return marshalFlagSpec(ff)
}

func (k *Sync) notify(ctx context.Context, c chan<- INotify) {
Expand Down Expand Up @@ -233,16 +237,16 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) {

// commonHandler emits the desired event if and only if handler receive an object matching apiVersion and resource name
func commonHandler(obj interface{}, object types.NamespacedName, emitEvent DefaultEventType, c chan<- INotify) error {
ffObj, err := toFFCfg(obj)
u, err := asUnstructured(obj)
if err != nil {
return err
}

if ffObj.APIVersion != apiVersion {
return fmt.Errorf("invalid api version %s, expected %s", ffObj.APIVersion, apiVersion)
if u.GetAPIVersion() != apiVersion {
return fmt.Errorf(invalidAPIVersionMsg, u.GetAPIVersion(), apiVersion)
}

if ffObj.Name == object.Name {
if u.GetName() == object.Name {
c <- &Notifier{
Event: Event[DefaultEventType]{
EventType: emitEvent,
Expand All @@ -255,26 +259,25 @@ func commonHandler(obj interface{}, object types.NamespacedName, emitEvent Defau

// updateFuncHandler handles updates. Event is emitted if and only if resource name, apiVersion of old & new are equal
func updateFuncHandler(oldObj interface{}, newObj interface{}, object types.NamespacedName, c chan<- INotify) error {
ffOldObj, err := toFFCfg(oldObj)
ffOldObj, err := asUnstructured(oldObj)
if err != nil {
return err
}

if ffOldObj.APIVersion != apiVersion {
return fmt.Errorf("invalid api version %s, expected %s", ffOldObj.APIVersion, apiVersion)
if ffOldObj.GetAPIVersion() != apiVersion {
return fmt.Errorf(invalidAPIVersionMsg, ffOldObj.GetAPIVersion(), apiVersion)
}

ffNewObj, err := toFFCfg(newObj)
ffNewObj, err := asUnstructured(newObj)
if err != nil {
return err
}

if ffNewObj.APIVersion != apiVersion {
return fmt.Errorf("invalid api version %s, expected %s", ffNewObj.APIVersion, apiVersion)
if ffNewObj.GetAPIVersion() != apiVersion {
return fmt.Errorf(invalidAPIVersionMsg, ffNewObj.GetAPIVersion(), apiVersion)
}

if object.Name == ffNewObj.Name && ffOldObj.ResourceVersion != ffNewObj.ResourceVersion {
// Only update if there is an actual featureFlagSpec change
if object.Name == ffNewObj.GetName() && ffOldObj.GetResourceVersion() != ffNewObj.GetResourceVersion() {
c <- &Notifier{
Event: Event[DefaultEventType]{
EventType: DefaultEventTypeModify,
Expand All @@ -284,20 +287,18 @@ func updateFuncHandler(oldObj interface{}, newObj interface{}, object types.Name
return nil
}

// toFFCfg attempts to covert unstructured payload to configurations
func toFFCfg(object interface{}) (*v1beta1.FeatureFlag, error) {
u, ok := object.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("provided value is not of type *unstructured.Unstructured")
func asUnstructured(object interface{}) (*unstructured.Unstructured, error) {
if u, ok := object.(*unstructured.Unstructured); ok {
return u, nil
}

var ffObj v1beta1.FeatureFlag
err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &ffObj)
if err != nil {
return nil, fmt.Errorf("unable to convert unstructured to v1beta1.FeatureFlag: %w", err)
if tombstone, ok := object.(cache.DeletedFinalStateUnknown); ok {
if u, ok := tombstone.Obj.(*unstructured.Unstructured); ok {
return u, nil
}
}

return &ffObj, nil
return nil, fmt.Errorf("provided value is not of type *unstructured.Unstructured")
}

// parseURI parse provided uri in the format of <namespace>/<crdName> to namespace, crdName. Results in an error
Expand All @@ -310,10 +311,18 @@ func parseURI(uri string) (string, string, error) {
return s[0], s[1], nil
}

func marshallFeatureFlagSpec(ff *v1beta1.FeatureFlag) (string, error) {
b, err := json.Marshal(ff.Spec.FlagSpec)
func marshalFlagSpec(ff *unstructured.Unstructured) (string, error) {
flagSpec, found, err := unstructured.NestedMap(ff.Object, "spec", "flagSpec")
if err != nil {
return "", fmt.Errorf("failed to parse spec.flagSpec: %w", err)
}
if !found {
return "", fmt.Errorf("missing spec.flagSpec")
}

b, err := json.Marshal(flagSpec)
if err != nil {
return "", fmt.Errorf("failed to marshall FlagSpec: %s", err.Error())
return "", fmt.Errorf("failed to marshal spec.flagSpec: %w", err)
}
return string(b), nil
}
71 changes: 48 additions & 23 deletions core/pkg/sync/kubernetes/kubernetes_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"reflect"
"strings"
stdsync "sync"
"testing"
"time"

Expand Down Expand Up @@ -73,21 +74,29 @@ func Test_parseURI(t *testing.T) {
}
}

func Test_toFFCfg(t *testing.T) {
func TestAsUnstructured(t *testing.T) {
validFFCfg := v1beta1.FeatureFlag{
TypeMeta: Metadata,
}

tests := []struct {
name string
input interface{}
want *v1beta1.FeatureFlag
want *unstructured.Unstructured
wantErr bool
}{
{
name: "Simple success",
input: toUnstructured(t, validFFCfg),
want: &validFFCfg,
want: toUnstructured(t, validFFCfg),
wantErr: false,
},
{
name: "Tombstone unwraps",
input: cache.DeletedFinalStateUnknown{
Obj: toUnstructured(t, validFFCfg),
},
want: toUnstructured(t, validFFCfg),
wantErr: false,
},
{
Expand All @@ -102,15 +111,15 @@ func Test_toFFCfg(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := toFFCfg(tt.input)
got, err := asUnstructured(tt.input)

if (err != nil) != tt.wantErr {
t.Errorf("toFFCfg() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("asUnstructured() error = %v, wantErr %v", err, tt.wantErr)
return
}

if !reflect.DeepEqual(got, tt.want) {
t.Errorf("toFFCfg() got = %v, want %v", got, tt.want)
t.Errorf("asUnstructured() got = %v, want %v", got, tt.want)
}
})
}
Expand Down Expand Up @@ -657,23 +666,35 @@ func TestSync_ReSync(t *testing.T) {
t.Errorf("The Sync should not be ready")
}
dataChannel := make(chan sync.DataSync, tt.countMsg)
ctx, cancel := context.WithCancel(context.Background())
if tt.async {
var wg stdsync.WaitGroup
wg.Add(1)
go func() {
if err := tt.k.Sync(context.TODO(), dataChannel); err != nil {
t.Errorf("Unexpected error: %v", e)
}
if err := tt.k.ReSync(context.TODO(), dataChannel); err != nil {
t.Errorf("Unexpected error: %v", e)
defer wg.Done()
if err := tt.k.Sync(ctx, dataChannel); err != nil {
t.Errorf("Unexpected error: %v", err)
}
}()
i := tt.countMsg
for i > 0 {
d := <-dataChannel
if d.FlagData != payload {
t.Errorf("Expected %v, got %v", payload, d.FlagData)

if err := tt.k.ReSync(ctx, dataChannel); err != nil {
t.Errorf("Unexpected error: %v", err)
}


for i := tt.countMsg; i > 0; i-- {
select {
case d := <-dataChannel:
if d.FlagData != payload {
t.Errorf("Expected %v, got %v", payload, d.FlagData)
}
case <-time.After(time.Second):
t.Fatalf("timeout waiting for data")
}
i--
}

cancel()
wg.Wait()
} else {
if err := tt.k.Sync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") {
t.Errorf("Unexpected error: %v", err)
Expand Down Expand Up @@ -817,12 +838,16 @@ func getCFG(name, namespace string) map[string]interface{} {
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{},
"spec": map[string]interface{}{
"flagSpec": map[string]interface{}{
"flags": nil,
},
},
}
}

// toUnstructured helper to convert an interface to unstructured.Unstructured
func toUnstructured(t *testing.T, obj interface{}) interface{} {
func toUnstructured(t *testing.T, obj interface{}) *unstructured.Unstructured {
bytes, err := json.Marshal(obj)
if err != nil {
t.Errorf("test setup faulure: %s", err.Error())
Expand Down Expand Up @@ -852,7 +877,7 @@ func (m *MockInformer) GetStore() cache.Store {
}

func TestMeasure(t *testing.T) {
res, err := marshallFeatureFlagSpec(&v1beta1.FeatureFlag{
res, err := marshalFlagSpec(toUnstructured(t, v1beta1.FeatureFlag{
Spec: v1beta1.FeatureFlagSpec{
FlagSpec: v1beta1.FlagSpec{
Flags: v1beta1.Flags{
Expand All @@ -864,8 +889,8 @@ func TestMeasure(t *testing.T) {
},
},
},
})
}))

require.Nil(t, err)
require.Equal(t, "{\"flags\":{\"flag\":{\"state\":\"\",\"variants\":null,\"defaultVariant\":\"kubernetes\"}}}", res)
require.NoError(t, err)
require.JSONEq(t, `{"flags":{"flag":{"state":"","variants":null,"defaultVariant":"kubernetes"}}}`, res)
}
Loading