diff --git a/internal/envoygateway/config/config.go b/internal/envoygateway/config/config.go index fb0b8915d5..bddabf33b7 100644 --- a/internal/envoygateway/config/config.go +++ b/internal/envoygateway/config/config.go @@ -12,8 +12,8 @@ const ( EnvoyGatewayNamespace = "envoy-gateway-system" // EnvoyGatewayServiceName is the name of the Envoy Gateway service. EnvoyGatewayServiceName = "envoy-gateway" - // EnvoyConfigMapName is the name of the Envoy ConfigMap. - EnvoyConfigMapName = "envoy" + // EnvoyConfigMapPrefix is the prefix applied to the Envoy ConfigMap. + EnvoyConfigMapPrefix = "envoy" // EnvoyServicePrefix is the prefix applied to the Envoy Service. EnvoyServicePrefix = "envoy" // EnvoyDeploymentPrefix is the prefix applied to the Envoy Deployment. diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index d6182dd9e7..8f6942562d 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -89,6 +89,12 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { yamlInfraIR, _ := yaml.Marshal(&result.InfraIR) r.Logger.WithValues("output", "infra-ir").Info(string(yamlInfraIR)) + var curKeys, newKeys []string + // Get current IR keys + for key := range r.InfraIR.LoadAll() { + curKeys = append(curKeys, key) + } + // Publish the IRs. // Also validate the ir before sending it. for key, val := range result.InfraIR { @@ -96,6 +102,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { r.Logger.Error(err, "unable to validate infra ir, skipped sending it") } else { r.InfraIR.Store(key, val) + newKeys = append(newKeys, key) } } for key, val := range result.XdsIR { @@ -106,6 +113,14 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { } } + // Delete keys + // There is a 1:1 mapping between infra and xds IR keys + delKeys := getIRKeysToDelete(curKeys, newKeys) + for _, key := range delKeys { + r.InfraIR.Delete(key) + r.XdsIR.Delete(key) + } + // Update Status for _, gateway := range result.Gateways { key := utils.NamespacedName(gateway) @@ -119,3 +134,28 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { } r.Logger.Info("shutting down") } + +// getIRKeysToDelete returns the list of IR keys to delete +// based on the difference between the current keys and the +// new keys parameters passed to the function. +func getIRKeysToDelete(curKeys, newKeys []string) []string { + var delKeys []string + remaining := make(map[string]bool) + + // Add all current keys to the remaining map + for _, key := range curKeys { + remaining[key] = true + } + + // Delete newKeys from the remaining map + // to get keys that need to be deleted + for _, key := range newKeys { + delete(remaining, key) + } + + for key := range remaining { + delKeys = append(delKeys, key) + } + + return delKeys +} diff --git a/internal/gatewayapi/runner/runner_test.go b/internal/gatewayapi/runner/runner_test.go index 0389bc78f5..51cea2658b 100644 --- a/internal/gatewayapi/runner/runner_test.go +++ b/internal/gatewayapi/runner/runner_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/types" @@ -52,3 +53,49 @@ func TestRunner(t *testing.T) { }, time.Second*1, time.Millisecond*20) } + +func TestGetIRKeysToDelete(t *testing.T) { + testCases := []struct { + name string + curKeys []string + newKeys []string + delKeys []string + }{ + { + name: "empty", + curKeys: []string{}, + newKeys: []string{}, + delKeys: []string{}, + }, + {name: "no new keys", + curKeys: []string{"one", "two"}, + newKeys: []string{}, + delKeys: []string{"one", "two"}, + }, + { + name: "no cur keys", + curKeys: []string{}, + newKeys: []string{"one", "two"}, + delKeys: []string{}, + }, + { + name: "equal", + curKeys: []string{"one", "two"}, + newKeys: []string{"two", "one"}, + delKeys: []string{}, + }, + { + name: "mix", + curKeys: []string{"one", "two"}, + newKeys: []string{"two", "three"}, + delKeys: []string{"one"}, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + assert.ElementsMatch(t, tc.delKeys, getIRKeysToDelete(tc.curKeys, tc.newKeys)) + }) + } +} diff --git a/internal/infrastructure/kubernetes/configmap.go b/internal/infrastructure/kubernetes/configmap.go index 657d11bc9e..6361134e5d 100644 --- a/internal/infrastructure/kubernetes/configmap.go +++ b/internal/infrastructure/kubernetes/configmap.go @@ -11,6 +11,7 @@ import ( "k8s.io/apimachinery/pkg/types" "github.com/envoyproxy/gateway/internal/envoygateway/config" + "github.com/envoyproxy/gateway/internal/ir" ) const ( @@ -38,9 +39,9 @@ var ( ) // expectedConfigMap returns the expected ConfigMap based on the provided infra. -func (i *Infra) expectedConfigMap() *corev1.ConfigMap { +func (i *Infra) expectedConfigMap(infra *ir.Infra) *corev1.ConfigMap { ns := i.Namespace - name := config.EnvoyConfigMapName + name := expectedConfigMapName(infra.Proxy.Name) return &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -56,13 +57,13 @@ func (i *Infra) expectedConfigMap() *corev1.ConfigMap { // createOrUpdateConfigMap creates a ConfigMap in the Kube api server based on the provided // infra, if it doesn't exist and updates it if it does. -func (i *Infra) createOrUpdateConfigMap(ctx context.Context) (*corev1.ConfigMap, error) { - cm := i.expectedConfigMap() +func (i *Infra) createOrUpdateConfigMap(ctx context.Context, infra *ir.Infra) (*corev1.ConfigMap, error) { + cm := i.expectedConfigMap(infra) current := &corev1.ConfigMap{} key := types.NamespacedName{ Namespace: i.Namespace, - Name: config.EnvoyConfigMapName, + Name: expectedConfigMapName(infra.Proxy.Name), } if err := i.Client.Get(ctx, key, current); err != nil { @@ -85,11 +86,11 @@ func (i *Infra) createOrUpdateConfigMap(ctx context.Context) (*corev1.ConfigMap, } // deleteConfigMap deletes the Envoy ConfigMap in the kube api server, if it exists. -func (i *Infra) deleteConfigMap(ctx context.Context) error { +func (i *Infra) deleteConfigMap(ctx context.Context, infra *ir.Infra) error { cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: i.Namespace, - Name: config.EnvoyConfigMapName, + Name: expectedConfigMapName(infra.Proxy.Name), }, } @@ -102,3 +103,7 @@ func (i *Infra) deleteConfigMap(ctx context.Context) error { return nil } + +func expectedConfigMapName(proxyName string) string { + return fmt.Sprintf("%s-%s", config.EnvoyConfigMapPrefix, proxyName) +} diff --git a/internal/infrastructure/kubernetes/configmap_test.go b/internal/infrastructure/kubernetes/configmap_test.go index bbf93b5aa5..7b8039dfc6 100644 --- a/internal/infrastructure/kubernetes/configmap_test.go +++ b/internal/infrastructure/kubernetes/configmap_test.go @@ -12,15 +12,18 @@ import ( "github.com/envoyproxy/gateway/internal/envoygateway" "github.com/envoyproxy/gateway/internal/envoygateway/config" + "github.com/envoyproxy/gateway/internal/ir" ) func TestExpectedConfigMap(t *testing.T) { // Setup the infra. cli := fakeclient.NewClientBuilder().WithScheme(envoygateway.GetScheme()).WithObjects().Build() kube := NewInfra(cli) - cm := kube.expectedConfigMap() + infra := ir.NewInfra() + infra.Proxy.Name = "test" + cm := kube.expectedConfigMap(infra) - require.Equal(t, "envoy", cm.Name) + require.Equal(t, "envoy-test", cm.Name) require.Equal(t, "envoy-gateway-system", cm.Namespace) require.Contains(t, cm.Data, sdsCAFilename) assert.Equal(t, sdsCAConfigMapData, cm.Data[sdsCAFilename]) @@ -30,6 +33,8 @@ func TestExpectedConfigMap(t *testing.T) { func TestCreateOrUpdateConfigMap(t *testing.T) { kube := NewInfra(nil) + infra := ir.NewInfra() + infra.Proxy.Name = "test" testCases := []struct { name string @@ -41,7 +46,7 @@ func TestCreateOrUpdateConfigMap(t *testing.T) { expect: &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: config.EnvoyGatewayNamespace, - Name: config.EnvoyConfigMapName, + Name: "envoy-test", }, Data: map[string]string{sdsCAFilename: sdsCAConfigMapData, sdsCertFilename: sdsCertConfigMapData}, }, @@ -51,14 +56,14 @@ func TestCreateOrUpdateConfigMap(t *testing.T) { current: &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: config.EnvoyGatewayNamespace, - Name: config.EnvoyConfigMapName, + Name: "envoy-test", }, Data: map[string]string{"foo": "bar"}, }, expect: &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: config.EnvoyGatewayNamespace, - Name: config.EnvoyConfigMapName, + Name: "envoy-test", }, Data: map[string]string{sdsCAFilename: sdsCAConfigMapData, sdsCertFilename: sdsCertConfigMapData}, }, @@ -73,7 +78,7 @@ func TestCreateOrUpdateConfigMap(t *testing.T) { } else { kube.Client = fakeclient.NewClientBuilder().WithScheme(envoygateway.GetScheme()).Build() } - cm, err := kube.createOrUpdateConfigMap(context.Background()) + cm, err := kube.createOrUpdateConfigMap(context.Background(), infra) require.NoError(t, err) require.Equal(t, tc.expect.Namespace, cm.Namespace) require.Equal(t, tc.expect.Name, cm.Name) @@ -83,6 +88,9 @@ func TestCreateOrUpdateConfigMap(t *testing.T) { } func TestDeleteConfigMap(t *testing.T) { + infra := ir.NewInfra() + infra.Proxy.Name = "test" + testCases := []struct { name string current *corev1.ConfigMap @@ -93,7 +101,7 @@ func TestDeleteConfigMap(t *testing.T) { current: &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: config.EnvoyGatewayNamespace, - Name: config.EnvoyConfigMapName, + Name: "envoy-test", }, }, expect: true, @@ -115,7 +123,7 @@ func TestDeleteConfigMap(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() kube := NewInfra(fakeclient.NewClientBuilder().WithScheme(envoygateway.GetScheme()).WithObjects(tc.current).Build()) - err := kube.deleteConfigMap(context.Background()) + err := kube.deleteConfigMap(context.Background(), infra) require.NoError(t, err) }) } diff --git a/internal/infrastructure/kubernetes/deployment.go b/internal/infrastructure/kubernetes/deployment.go index 6abf23f466..594ddfc595 100644 --- a/internal/infrastructure/kubernetes/deployment.go +++ b/internal/infrastructure/kubernetes/deployment.go @@ -149,7 +149,7 @@ func (i *Infra) expectedDeployment(infra *ir.Infra) (*appsv1.Deployment, error) VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ - Name: config.EnvoyConfigMapName, + Name: expectedConfigMapName(infra.Proxy.Name), }, Items: []corev1.KeyToPath{ { diff --git a/internal/infrastructure/kubernetes/infra.go b/internal/infrastructure/kubernetes/infra.go index abd9ed951b..ef8fc0308d 100644 --- a/internal/infrastructure/kubernetes/infra.go +++ b/internal/infrastructure/kubernetes/infra.go @@ -96,7 +96,7 @@ func (i *Infra) CreateInfra(ctx context.Context, infra *ir.Infra) error { return err } - if _, err := i.createOrUpdateConfigMap(ctx); err != nil { + if _, err := i.createOrUpdateConfigMap(ctx, infra); err != nil { return err } @@ -125,7 +125,7 @@ func (i *Infra) DeleteInfra(ctx context.Context, infra *ir.Infra) error { return err } - if err := i.deleteConfigMap(ctx); err != nil { + if err := i.deleteConfigMap(ctx, infra); err != nil { return err } diff --git a/internal/infrastructure/runner/runner.go b/internal/infrastructure/runner/runner.go index aa831770e3..cc2c249b7d 100644 --- a/internal/infrastructure/runner/runner.go +++ b/internal/infrastructure/runner/runner.go @@ -41,21 +41,18 @@ func (r *Runner) Start(ctx context.Context) error { func (r *Runner) subscribeAndTranslate(ctx context.Context) { // Subscribe to resources - for range r.InfraIR.Subscribe(ctx) { + for snapshot := range r.InfraIR.Subscribe(ctx) { r.Logger.Info("received a notification") - for _, in := range r.InfraIR.LoadAll() { - switch { - case in == nil: - // The resource map is nil at startup. - r.Logger.Info("infra ir is nil, skipping") - continue - case in.Proxy == nil: - if err := r.mgr.DeleteInfra(ctx, in); err != nil { + for _, update := range snapshot.Updates { + val := update.Value + + if update.Delete { + if err := r.mgr.DeleteInfra(ctx, val); err != nil { r.Logger.Error(err, "failed to delete infra") } - default: + } else { // Manage the proxy infra. - if err := r.mgr.CreateInfra(ctx, in); err != nil { + if err := r.mgr.CreateInfra(ctx, val); err != nil { r.Logger.Error(err, "failed to create new infra") } } diff --git a/internal/xds/server/runner/runner.go b/internal/xds/server/runner/runner.go index 16d3c9647d..5c3a998897 100644 --- a/internal/xds/server/runner/runner.go +++ b/internal/xds/server/runner/runner.go @@ -112,16 +112,20 @@ func registerServer(srv controlplane_server_v3.Server, g *grpc.Server) { func (r *Runner) subscribeAndTranslate(ctx context.Context) { // Subscribe to resources - for range r.Xds.Subscribe(ctx) { + for snapshot := range r.Xds.Subscribe(ctx) { r.Logger.Info("received a notification") // Load all resources required for translation - for key, xds := range r.Xds.LoadAll() { - if xds == nil { - r.Logger.Info("xds is nil, skipping") - continue + for _, update := range snapshot.Updates { + key := update.Key + val := update.Value + + var err error + if update.Delete { + err = r.cache.GenerateNewSnapshot(key, nil) + } else { + // Update snapshot cache + err = r.cache.GenerateNewSnapshot(key, val.XdsResources) } - // Update snapshot cache - err := r.cache.GenerateNewSnapshot(key, xds.XdsResources) if err != nil { r.Logger.Error(err, "failed to generate a snapshot") } diff --git a/internal/xds/translator/runner/runner.go b/internal/xds/translator/runner/runner.go index 54a55664f1..daa8457087 100644 --- a/internal/xds/translator/runner/runner.go +++ b/internal/xds/translator/runner/runner.go @@ -36,20 +36,24 @@ func (r *Runner) Start(ctx context.Context) error { func (r *Runner) subscribeAndTranslate(ctx context.Context) { // Subscribe to resources - for range r.XdsIR.Subscribe(ctx) { + for snapshot := range r.XdsIR.Subscribe(ctx) { r.Logger.Info("received a notification") - for key, ir := range r.XdsIR.LoadAll() { - if ir == nil { - r.Logger.Info("xds ir is nil, skipping") - continue - } - // Translate to xds resources - result, err := translator.Translate(ir) - if err != nil { - r.Logger.Error(err, "failed to translate xds ir") + updates := snapshot.Updates + for _, update := range updates { + key := update.Key + val := update.Value + + if update.Delete { + r.Xds.Delete(key) } else { - // Publish - r.Xds.Store(key, result) + // Translate to xds resources + result, err := translator.Translate(val) + if err != nil { + r.Logger.Error(err, "failed to translate xds ir") + } else { + // Publish + r.Xds.Store(key, result) + } } } } diff --git a/internal/xds/translator/runner/runner_test.go b/internal/xds/translator/runner/runner_test.go index 0611d599e7..0cb8d659b9 100644 --- a/internal/xds/translator/runner/runner_test.go +++ b/internal/xds/translator/runner/runner_test.go @@ -65,19 +65,19 @@ func TestRunner(t *testing.T) { if out == nil { return false } + if out["test"] == nil { + return false + } // Ensure an xds listener is created return len(out["test"].XdsResources[resourcev3.ListenerType]) == 1 - }, time.Second*1, time.Millisecond*20) + }, time.Second*3, time.Millisecond*50) - // Update with an empty IR triggering a delete - xdsIR.Store("test", &ir.Xds{}) + // Delete the IR triggering an xds delete + xdsIR.Delete("test") require.Eventually(t, func() bool { out := xds.LoadAll() - if out == nil { - return false - } - // Ensure no xds listener exists - return len(out["test"].XdsResources[resourcev3.ListenerType]) == 0 - }, time.Second*1, time.Millisecond*20) + // Ensure that xds has no key, value pairs + return len(out) == 0 + }, time.Second*3, time.Millisecond*50) }