From b6052b096b7a4364d84ac6386db9527c91dc9dd0 Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Fri, 13 Feb 2026 11:19:28 -0800 Subject: [PATCH 1/2] feat: support crd isolation and discovery per control plane --- cmd/apiserver/app/config.go | 137 ++++++++- .../bootstrap/crd_runtime_manager.go | 285 ++++++++++++++++++ pkg/multicluster/handler.go | 7 +- test/smoke/crd_per_cluster_test.go | 216 +++++++++++++ 4 files changed, 641 insertions(+), 4 deletions(-) create mode 100644 pkg/multicluster/bootstrap/crd_runtime_manager.go create mode 100644 test/smoke/crd_per_cluster_test.go diff --git a/cmd/apiserver/app/config.go b/cmd/apiserver/app/config.go index 97a1457..8a7ac99 100644 --- a/cmd/apiserver/app/config.go +++ b/cmd/apiserver/app/config.go @@ -18,12 +18,15 @@ package app import ( "net/http" + "strings" + "time" apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" namespaceplugin "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle" + genericfilters "k8s.io/apiserver/pkg/endpoints/filters" "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/server" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -121,6 +124,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { } clientPool := mc.NewClientPool(genericConfig.LoopbackClientConfig, mcOpts.PathPrefix, mcOpts.ControlPlaneSegment) informerPool := mc.NewInformerPoolFromClientPool(clientPool, 0, genericConfig.DrainedNotify()) + var crdRuntimeMgr *mcbootstrap.CRDRuntimeManager systemNamespaceBootstrapper := mcbootstrap.NewSystemNamespaceBootstrapper(mcbootstrap.SystemNamespaceOptions{ ClientForCluster: clientPool.KubeClientForCluster, Namespaces: opts.SystemNamespaces, @@ -138,7 +142,33 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { }) genericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler { ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment} - return mc.WithClusterRouting(server.DefaultBuildHandlerChain(h, conf), ex, mcOpts) + base := server.DefaultBuildHandlerChain(h, conf) + dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + cid, _, _ := mc.FromContext(r.Context()) + if cid != "" && cid != mcOpts.DefaultCluster && crdRuntimeMgr != nil { + if group, version, ok := apisGroupVersionFromPath(r.URL.Path); ok { + served, err := crdRuntimeMgr.ServesGroupVersion(cid, group, version, genericConfig.DrainedNotify()) + if err != nil { + klog.Errorf("mc.crdRuntime lookup failed at kube cluster=%s path=%s err=%v", cid, r.URL.Path, err) + http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable) + return + } + if !served { + base.ServeHTTP(w, r) + return + } + if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil { + genericfilters.WithRequestInfo(h, conf.RequestInfoResolver).ServeHTTP(w, r) + return + } + klog.Errorf("mc.crdRuntime unresolved at kube cluster=%s path=%s", cid, r.URL.Path) + http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable) + return + } + } + base.ServeHTTP(w, r) + }) + return mc.WithClusterRouting(dispatch, ex, mcOpts) } authManager := mcauth.NewManager(wait.ContextForChannel(genericConfig.DrainedNotify()), mcauth.Options{ @@ -280,10 +310,63 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { if apiExtensions.GenericConfig.RESTOptionsGetter != nil { apiExtensions.GenericConfig.RESTOptionsGetter = decorateRESTOptionsGetter("apiextensions", apiExtensions.GenericConfig.RESTOptionsGetter, mcOpts) } + crdRuntimeMgr = mcbootstrap.NewCRDRuntimeManager(mcbootstrap.CRDRuntimeManagerOptions{ + BaseLoopbackClientConfig: apiExtensions.GenericConfig.LoopbackClientConfig, + PathPrefix: mcOpts.PathPrefix, + ControlPlaneSegment: mcOpts.ControlPlaneSegment, + DefaultCluster: mcOpts.DefaultCluster, + CRDRESTOptionsGetter: apiExtensions.ExtraConfig.CRDRESTOptionsGetter, + Admission: apiExtensions.GenericConfig.AdmissionControl, + ServiceResolver: apiExtensions.ExtraConfig.ServiceResolver, + AuthResolverWrapper: apiExtensions.ExtraConfig.AuthResolverWrapper, + MasterCount: apiExtensions.ExtraConfig.MasterCount, + Authorizer: apiExtensions.GenericConfig.Authorization.Authorizer, + RequestTimeout: apiExtensions.GenericConfig.RequestTimeout, + MinRequestTimeout: time.Duration(apiExtensions.GenericConfig.MinRequestTimeout) * time.Second, + MaxRequestBodyBytes: apiExtensions.GenericConfig.MaxRequestBodyBytes, + }) + prevOnClusterSelected := mcOpts.OnClusterSelected + mcOpts.OnClusterSelected = func(clusterID string) { + if prevOnClusterSelected != nil { + prevOnClusterSelected(clusterID) + } + if clusterID == "" || clusterID == mcOpts.DefaultCluster { + return + } + _, _ = crdRuntimeMgr.Runtime(clusterID, genericConfig.DrainedNotify()) + } // Ensure CRDs are also routed through the multicluster handler apiExtensions.GenericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler { ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment} - return mc.WithClusterRouting(server.DefaultBuildHandlerChain(h, conf), ex, mcOpts) + base := server.DefaultBuildHandlerChain(h, conf) + dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + cid, _, _ := mc.FromContext(r.Context()) + if cid != "" && cid != mcOpts.DefaultCluster { + if group, version, ok := apisGroupVersionFromPath(r.URL.Path); ok { + served, err := crdRuntimeMgr.ServesGroupVersion(cid, group, version, genericConfig.DrainedNotify()) + if err != nil { + klog.Errorf("mc.crdRuntime lookup failed at apiextensions cluster=%s path=%s err=%v", cid, r.URL.Path, err) + http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable) + return + } + if !served { + base.ServeHTTP(w, r) + return + } + if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil { + // Ensure RequestInfo is computed from the normalized /apis path + // before entering the cluster-scoped CRD runtime handler. + genericfilters.WithRequestInfo(h, conf.RequestInfoResolver).ServeHTTP(w, r) + return + } + klog.Errorf("mc.crdRuntime unresolved cluster=%s path=%s", cid, r.URL.Path) + http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable) + return + } + } + base.ServeHTTP(w, r) + }) + return mc.WithClusterRouting(dispatch, ex, mcOpts) } // Install admission chain on apiextensions as well { @@ -309,7 +392,31 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { // Ensure aggregator also receives multicluster routing aggregator.GenericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler { ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment} - return mc.WithClusterRouting(server.DefaultBuildHandlerChain(h, conf), ex, mcOpts) + base := server.DefaultBuildHandlerChain(h, conf) + dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + cid, _, _ := mc.FromContext(r.Context()) + if r.Method == http.MethodGet && cid != "" && cid != mcOpts.DefaultCluster && crdRuntimeMgr != nil { + if group, version, ok := exactAPIsGroupVersionDiscoveryPath(r.URL.Path); ok { + served, err := crdRuntimeMgr.ServesGroupVersion(cid, group, version, genericConfig.DrainedNotify()) + if err != nil { + klog.Errorf("mc.crdRuntime lookup failed at aggregator cluster=%s path=%s err=%v", cid, r.URL.Path, err) + http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable) + return + } + if served { + if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil { + genericfilters.WithRequestInfo(h, conf.RequestInfoResolver).ServeHTTP(w, r) + return + } + klog.Errorf("mc.crdRuntime unresolved at aggregator cluster=%s path=%s", cid, r.URL.Path) + http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable) + return + } + } + } + base.ServeHTTP(w, r) + }) + return mc.WithClusterRouting(dispatch, ex, mcOpts) } // Install admission chain on aggregator { @@ -338,3 +445,27 @@ func decorateRESTOptionsGetter(server string, getter generic.RESTOptionsGetter, klog.Infof("mc.restOptionsGetter server=%s decorated=%t", server, true) return decorated } + +func apisGroupVersionFromPath(path string) (group, version string, ok bool) { + parts := strings.Split(strings.Trim(path, "/"), "/") + if len(parts) < 3 || parts[0] != "apis" || parts[1] == "" || parts[2] == "" { + return "", "", false + } + if parts[1] == "apiextensions.k8s.io" { + return "", "", false + } + return parts[1], parts[2], true +} + +func exactAPIsGroupVersionDiscoveryPath(path string) (group, version string, ok bool) { + group, version, ok = apisGroupVersionFromPath(path) + if !ok { + return "", "", false + } + parts := strings.Split(strings.Trim(path, "/"), "/") + if len(parts) != 3 { + return "", "", false + } + return group, version, true +} + diff --git a/pkg/multicluster/bootstrap/crd_runtime_manager.go b/pkg/multicluster/bootstrap/crd_runtime_manager.go new file mode 100644 index 0000000..1ac6c64 --- /dev/null +++ b/pkg/multicluster/bootstrap/crd_runtime_manager.go @@ -0,0 +1,285 @@ +package bootstrap + +import ( + "context" + "fmt" + "net/http" + "sync" + "time" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/authorization/authorizer" + genericregistry "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/util/webhook" + "k8s.io/client-go/rest" + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/kube-openapi/pkg/validation/spec" + "golang.org/x/sync/singleflight" + + mc "github.com/kplane-dev/apiserver/pkg/multicluster" +) + +const ( + servesLookupTimeout = 2 * time.Second + servesCacheTTL = 5 * time.Second + watchRetryBackoff = 500 * time.Millisecond +) + +var ( + crdRuntimeMetricsOnce = sync.Once{} + crdRuntimeCreateTotal = metrics.NewCounterVec(&metrics.CounterOpts{Name: "mc_crd_runtime_create_total", Help: "Per-cluster CRD runtime creations."}, []string{"status"}) + crdServesLookupTotal = metrics.NewCounterVec(&metrics.CounterOpts{Name: "mc_crd_serves_lookup_total", Help: "CRD serves-group-version lookups."}, []string{"result"}) + crdServesCacheHit = metrics.NewCounterVec(&metrics.CounterOpts{Name: "mc_crd_serves_cache_hit_total", Help: "CRD serves cache hits."}, []string{"result"}) + crdServesCacheMiss = metrics.NewCounterVec(&metrics.CounterOpts{Name: "mc_crd_serves_cache_miss_total", Help: "CRD serves cache misses."}, []string{"result"}) + crdServesLookupLat = metrics.NewHistogramVec(&metrics.HistogramOpts{Name: "mc_crd_serves_lookup_latency_seconds", Help: "CRD serves lookup latency.", Buckets: metrics.ExponentialBuckets(0.001, 2, 12)}, []string{"result"}) +) + +type CRDRuntimeManagerOptions struct { + BaseLoopbackClientConfig *rest.Config + PathPrefix, ControlPlaneSegment, DefaultCluster string + Delegate http.Handler + CRDRESTOptionsGetter genericregistry.RESTOptionsGetter + Admission admission.Interface + ServiceResolver webhook.ServiceResolver + AuthResolverWrapper webhook.AuthenticationInfoResolverWrapper + MasterCount int + Authorizer authorizer.Authorizer + RequestTimeout, MinRequestTimeout time.Duration + StaticOpenAPISpec map[string]*spec.Schema + MaxRequestBodyBytes int64 +} + +type servesCacheEntry struct { + served bool + exp time.Time +} + +type clusterState struct { + rt *apiextensionsapiserver.ClusterScopedCRDRuntime + cs *apiextensionsclient.Clientset +} + +type CRDRuntimeManager struct { + opts CRDRuntimeManagerOptions + + mu sync.Mutex + runtimes map[string]*apiextensionsapiserver.ClusterScopedCRDRuntime + clients map[string]*apiextensionsclient.Clientset + started map[string]bool + + cache map[string]servesCacheEntry + clusterKeys map[string]map[string]struct{} + createSF singleflight.Group +} + +func NewCRDRuntimeManager(opts CRDRuntimeManagerOptions) *CRDRuntimeManager { + crdRuntimeMetricsOnce.Do(func() { + legacyregistry.MustRegister(crdRuntimeCreateTotal, crdServesLookupTotal, crdServesCacheHit, crdServesCacheMiss, crdServesLookupLat) + }) + if opts.BaseLoopbackClientConfig != nil { + opts.BaseLoopbackClientConfig = rest.CopyConfig(opts.BaseLoopbackClientConfig) + } + return &CRDRuntimeManager{ + opts: opts, + runtimes: map[string]*apiextensionsapiserver.ClusterScopedCRDRuntime{}, + clients: map[string]*apiextensionsclient.Clientset{}, + started: map[string]bool{}, + cache: map[string]servesCacheEntry{}, + clusterKeys: map[string]map[string]struct{}{}, + } +} + +func (m *CRDRuntimeManager) Runtime(clusterID string, stopCh <-chan struct{}) (http.Handler, error) { + if m == nil || clusterID == "" || clusterID == m.opts.DefaultCluster || m.opts.BaseLoopbackClientConfig == nil { + return nil, nil + } + rt, _, err := m.ensureClusterState(clusterID, stopCh) + if err != nil || rt == nil { + return nil, err + } + rt.Start(stopCh) + return rt.Handler(), nil +} + +func (m *CRDRuntimeManager) ServesGroupVersion(clusterID, group, version string, stopCh <-chan struct{}) (bool, error) { + if m == nil || clusterID == "" || clusterID == m.opts.DefaultCluster || group == "" || version == "" { + return false, nil + } + start := time.Now() + key := clusterID + "\x00" + group + "\x00" + version + if served, ok := m.getCache(key); ok { + r := result(served) + crdServesCacheHit.WithLabelValues(r).Inc() + crdServesLookupTotal.WithLabelValues(r).Inc() + crdServesLookupLat.WithLabelValues(r).Observe(time.Since(start).Seconds()) + return served, nil + } + crdServesCacheMiss.WithLabelValues("miss").Inc() + _, cs, err := m.ensureClusterState(clusterID, stopCh) + if err != nil { + crdServesLookupTotal.WithLabelValues("error").Inc() + crdServesLookupLat.WithLabelValues("error").Observe(time.Since(start).Seconds()) + return false, err + } + ctx, cancel := context.WithTimeout(context.Background(), servesLookupTimeout) + defer cancel() + list, err := cs.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) + if err != nil { + crdServesLookupTotal.WithLabelValues("error").Inc() + crdServesLookupLat.WithLabelValues("error").Observe(time.Since(start).Seconds()) + return false, err + } + served := false + for i := range list.Items { + crd := &list.Items[i] + if crd.Spec.Group != group || !isCRDEstablished(crd) { + continue + } + for _, v := range crd.Spec.Versions { + if v.Name == version && v.Served { + served = true + break + } + } + if served { + break + } + } + m.setCache(clusterID, key, served) + r := result(served) + crdServesLookupTotal.WithLabelValues(r).Inc() + crdServesLookupLat.WithLabelValues(r).Observe(time.Since(start).Seconds()) + return served, nil +} + +func (m *CRDRuntimeManager) ensureClusterState(clusterID string, stopCh <-chan struct{}) (*apiextensionsapiserver.ClusterScopedCRDRuntime, *apiextensionsclient.Clientset, error) { + m.mu.Lock() + if rt, ok := m.runtimes[clusterID]; ok { + cs := m.clients[clusterID] + m.mu.Unlock() + return rt, cs, nil + } + m.mu.Unlock() + v, err, _ := m.createSF.Do(clusterID, func() (any, error) { + m.mu.Lock() + if rt, ok := m.runtimes[clusterID]; ok { + cs := m.clients[clusterID] + m.mu.Unlock() + return clusterState{rt: rt, cs: cs}, nil + } + m.mu.Unlock() + cfg := rest.CopyConfig(m.opts.BaseLoopbackClientConfig) + host, err := mc.ClusterHost(cfg.Host, mc.Options{PathPrefix: m.opts.PathPrefix, ControlPlaneSegment: m.opts.ControlPlaneSegment}, clusterID) + if err != nil { + crdRuntimeCreateTotal.WithLabelValues("error").Inc() + return nil, fmt.Errorf("build cluster host: %w", err) + } + cfg.Host = host + rt, err := apiextensionsapiserver.NewClusterScopedCRDRuntime(apiextensionsapiserver.ClusterScopedCRDConfig{ + LoopbackClientConfig: cfg, Delegate: m.opts.Delegate, CRDRESTOptionsGetter: m.opts.CRDRESTOptionsGetter, + Admission: m.opts.Admission, ServiceResolver: m.opts.ServiceResolver, AuthResolverWrapper: m.opts.AuthResolverWrapper, + MasterCount: m.opts.MasterCount, Authorizer: m.opts.Authorizer, RequestTimeout: m.opts.RequestTimeout, MinRequestTimeout: m.opts.MinRequestTimeout, + StaticOpenAPISpec: m.opts.StaticOpenAPISpec, MaxRequestBodyBytes: m.opts.MaxRequestBodyBytes, + }) + if err != nil { + crdRuntimeCreateTotal.WithLabelValues("error").Inc() + return nil, err + } + cs, err := apiextensionsclient.NewForConfig(rest.CopyConfig(cfg)) + if err != nil { + crdRuntimeCreateTotal.WithLabelValues("error").Inc() + return nil, err + } + rt.Start(stopCh) + m.mu.Lock() + m.runtimes[clusterID] = rt + m.clients[clusterID] = cs + if !m.started[clusterID] { + m.started[clusterID] = true + go m.watchUpdates(clusterID, cs, stopCh) + } + m.mu.Unlock() + crdRuntimeCreateTotal.WithLabelValues("success").Inc() + return clusterState{rt: rt, cs: cs}, nil + }) + if err != nil { + return nil, nil, err + } + s := v.(clusterState) + return s.rt, s.cs, nil +} + +func (m *CRDRuntimeManager) watchUpdates(clusterID string, cs *apiextensionsclient.Clientset, stopCh <-chan struct{}) { + for { + select { case <-stopCh: return; default: } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + w, err := cs.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{AllowWatchBookmarks: true, ResourceVersion: "0"}) + if err != nil { + cancel() + select { case <-stopCh: return; case <-time.After(watchRetryBackoff): continue } + } + m.invalidateCluster(clusterID) + closed := false + for !closed { + select { + case <-stopCh: + w.Stop(); cancel(); return + case _, ok := <-w.ResultChan(): + if !ok { closed = true; break } + m.invalidateCluster(clusterID) + } + } + w.Stop(); cancel() + select { case <-stopCh: return; case <-time.After(watchRetryBackoff): } + } +} + +func (m *CRDRuntimeManager) getCache(key string) (bool, bool) { + now := time.Now() + m.mu.Lock(); defer m.mu.Unlock() + v, ok := m.cache[key] + if !ok || now.After(v.exp) { + delete(m.cache, key) + return false, false + } + return v.served, true +} + +func (m *CRDRuntimeManager) setCache(clusterID, key string, served bool) { + m.mu.Lock(); defer m.mu.Unlock() + m.cache[key] = servesCacheEntry{served: served, exp: time.Now().Add(servesCacheTTL)} + if m.clusterKeys[clusterID] == nil { + m.clusterKeys[clusterID] = map[string]struct{}{} + } + m.clusterKeys[clusterID][key] = struct{}{} +} + +func (m *CRDRuntimeManager) invalidateCluster(clusterID string) { + m.mu.Lock(); defer m.mu.Unlock() + for k := range m.clusterKeys[clusterID] { + delete(m.cache, k) + } + delete(m.clusterKeys, clusterID) +} + +func isCRDEstablished(crd *apiextensionsv1.CustomResourceDefinition) bool { + for _, c := range crd.Status.Conditions { + if c.Type == apiextensionsv1.Established && c.Status == apiextensionsv1.ConditionTrue { + return true + } + } + return false +} + +func result(served bool) string { + if served { + return "served" + } + return "not_served" +} + diff --git a/pkg/multicluster/handler.go b/pkg/multicluster/handler.go index 7a38525..564dc23 100644 --- a/pkg/multicluster/handler.go +++ b/pkg/multicluster/handler.go @@ -17,7 +17,12 @@ func WithClusterRouting(next http.Handler, ex Extractor, o Options) http.Handler return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { cid, all, _ := ex.Extract(r.Context(), r) if cid == "" { - cid = o.DefaultCluster + if existingCID, existingAll, ok := FromContext(r.Context()); ok && existingCID != "" { + cid = existingCID + all = existingAll + } else { + cid = o.DefaultCluster + } } if o.OnClusterSelected != nil && cid != "" { o.OnClusterSelected(cid) diff --git a/test/smoke/crd_per_cluster_test.go b/test/smoke/crd_per_cluster_test.go new file mode 100644 index 0000000..88dff61 --- /dev/null +++ b/test/smoke/crd_per_cluster_test.go @@ -0,0 +1,216 @@ +package smoke + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +func TestCRDEstablishesInVirtualCluster(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + clusterID := "c-" + randSuffix(3) + group := fmt.Sprintf("testing-%s.kplane.dev", randSuffix(3)) + plural := "testwidgets" + crdName := plural + "." + group + + crdClient := apixClientForCluster(t, s, clusterID) + kubeClient := kubeClientForCluster(t, s, clusterID) + createTestCRD(ctx, t, crdClient, crdName, group, plural) + waitForCRDEstablished(ctx, t, crdClient, clusterID, crdName) + waitForResourcePresence(t, kubeClient, clusterID, group+"/v1", plural, true) +} + +func TestCRDDefinitionsIsolatedAcrossClusters(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + clusterA := "c-" + randSuffix(3) + clusterB := "c-" + randSuffix(3) + group := fmt.Sprintf("isolated-%s.kplane.dev", randSuffix(3)) + plural := "testwidgets" + crdName := plural + "." + group + + crdClientA := apixClientForCluster(t, s, clusterA) + crdClientB := apixClientForCluster(t, s, clusterB) + kubeClientA := kubeClientForCluster(t, s, clusterA) + kubeClientB := kubeClientForCluster(t, s, clusterB) + + createTestCRD(ctx, t, crdClientA, crdName, group, plural) + waitForCRDEstablished(ctx, t, crdClientA, clusterA, crdName) + waitForResourcePresence(t, kubeClientA, clusterA, group+"/v1", plural, true) + waitForResourcePresence(t, kubeClientB, clusterB, group+"/v1", plural, false) + + createTestCRD(ctx, t, crdClientB, crdName, group, plural) + waitForCRDEstablished(ctx, t, crdClientB, clusterB, crdName) + waitForResourcePresence(t, kubeClientB, clusterB, group+"/v1", plural, true) + + crdA, err := crdClientA.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("cluster=%s get CRD %s: %v", clusterA, crdName, err) + } + crdB, err := crdClientB.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("cluster=%s get CRD %s: %v", clusterB, crdName, err) + } + if crdA.UID == crdB.UID { + t.Fatalf("expected per-cluster CRD objects; got same UID %q", crdA.UID) + } +} + +func TestCRDDiscoveryUpdateInADoesNotAffectB(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + clusterA := "c-" + randSuffix(3) + clusterB := "c-" + randSuffix(3) + group := fmt.Sprintf("update-%s.kplane.dev", randSuffix(3)) + plural := "testwidgets" + crdName := plural + "." + group + + crdClientA := apixClientForCluster(t, s, clusterA) + crdClientB := apixClientForCluster(t, s, clusterB) + kubeClientA := kubeClientForCluster(t, s, clusterA) + kubeClientB := kubeClientForCluster(t, s, clusterB) + + createTestCRD(ctx, t, crdClientA, crdName, group, plural) + createTestCRD(ctx, t, crdClientB, crdName, group, plural) + waitForCRDEstablished(ctx, t, crdClientA, clusterA, crdName) + waitForCRDEstablished(ctx, t, crdClientB, clusterB, crdName) + waitForResourcePresence(t, kubeClientA, clusterA, group+"/v1", plural, true) + waitForResourcePresence(t, kubeClientB, clusterB, group+"/v1", plural, true) + + updateCRDAddVersion(ctx, t, crdClientA, crdName) + waitForResourcePresence(t, kubeClientA, clusterA, group+"/v2", plural, true) + waitForResourcePresence(t, kubeClientB, clusterB, group+"/v2", plural, false) +} + +func apixClientForCluster(t *testing.T, s *testAPIServer, clusterID string) *apiextensionsclient.Clientset { + t.Helper() + cfg := kubeConfigForCluster(s.clusterURL(clusterID)) + cs, err := apiextensionsclient.NewForConfig(rest.CopyConfig(cfg)) + if err != nil { + t.Fatalf("cluster=%s build apiextensions client: %v", clusterID, err) + } + return cs +} + +func createTestCRD(ctx context.Context, t *testing.T, cs *apiextensionsclient.Clientset, crdName, group, plural string) { + t.Helper() + _, err := cs.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: crdName}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: group, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: plural, + Singular: "testwidget", + Kind: "TestWidget", + ListKind: "TestWidgetList", + }, + Scope: apiextensionsv1.NamespaceScoped, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Served: true, + Storage: true, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{Type: "object"}, + }, + }, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("create CRD %s: %v", crdName, err) + } +} + +func waitForCRDEstablished(ctx context.Context, t *testing.T, cs *apiextensionsclient.Clientset, clusterID, crdName string) { + t.Helper() + deadline := time.Now().Add(60 * time.Second) + var lastErr error + for time.Now().Before(deadline) { + crd, err := cs.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdName, metav1.GetOptions{}) + if err == nil { + for _, c := range crd.Status.Conditions { + if c.Type == apiextensionsv1.Established && c.Status == apiextensionsv1.ConditionTrue { + return + } + } + lastErr = fmt.Errorf("conditions=%v", crd.Status.Conditions) + } else { + lastErr = err + } + time.Sleep(500 * time.Millisecond) + } + t.Fatalf("cluster=%s CRD %s not Established: %v", clusterID, crdName, lastErr) +} + +func updateCRDAddVersion(ctx context.Context, t *testing.T, cs *apiextensionsclient.Clientset, crdName string) { + t.Helper() + crd, err := cs.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("get CRD %s before update: %v", crdName, err) + } + crd.Spec.Versions = append(crd.Spec.Versions, apiextensionsv1.CustomResourceDefinitionVersion{ + Name: "v2", + Served: true, + Storage: false, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{Type: "object"}, + }, + }) + if _, err := cs.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, crd, metav1.UpdateOptions{}); err != nil { + t.Fatalf("update CRD %s add v2: %v", crdName, err) + } +} + +func waitForResourcePresence(t *testing.T, cs kubernetes.Interface, clusterID, gv, resource string, want bool) { + t.Helper() + deadline := time.Now().Add(60 * time.Second) + var lastErr error + for time.Now().Before(deadline) { + rl, err := cs.Discovery().ServerResourcesForGroupVersion(gv) + if err != nil { + if !want && apierrors.IsNotFound(err) { + return + } + lastErr = err + time.Sleep(500 * time.Millisecond) + continue + } + found := false + for _, r := range rl.APIResources { + if r.Name == resource { + found = true + break + } + } + if found == want { + return + } + lastErr = fmt.Errorf("resource %q in %s presence=%t want=%t", resource, gv, found, want) + time.Sleep(500 * time.Millisecond) + } + t.Fatalf("cluster=%s resource discovery mismatch for %s %s: %v", clusterID, gv, resource, lastErr) +} + From fecda1061043b63795daa36e96c59f78674fe3b7 Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Fri, 13 Feb 2026 11:45:17 -0800 Subject: [PATCH 2/2] fix: dont rely on vendor patch --- cmd/apiserver/app/config.go | 18 +- ...ager.go => crd_runtime_manager_wrapped.go} | 198 +++++++++++------- 2 files changed, 127 insertions(+), 89 deletions(-) rename pkg/multicluster/bootstrap/{crd_runtime_manager.go => crd_runtime_manager_wrapped.go} (62%) diff --git a/cmd/apiserver/app/config.go b/cmd/apiserver/app/config.go index 8a7ac99..b0d2a9c 100644 --- a/cmd/apiserver/app/config.go +++ b/cmd/apiserver/app/config.go @@ -19,7 +19,6 @@ package app import ( "net/http" "strings" - "time" apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" "k8s.io/apimachinery/pkg/runtime" @@ -311,19 +310,10 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { apiExtensions.GenericConfig.RESTOptionsGetter = decorateRESTOptionsGetter("apiextensions", apiExtensions.GenericConfig.RESTOptionsGetter, mcOpts) } crdRuntimeMgr = mcbootstrap.NewCRDRuntimeManager(mcbootstrap.CRDRuntimeManagerOptions{ - BaseLoopbackClientConfig: apiExtensions.GenericConfig.LoopbackClientConfig, - PathPrefix: mcOpts.PathPrefix, - ControlPlaneSegment: mcOpts.ControlPlaneSegment, - DefaultCluster: mcOpts.DefaultCluster, - CRDRESTOptionsGetter: apiExtensions.ExtraConfig.CRDRESTOptionsGetter, - Admission: apiExtensions.GenericConfig.AdmissionControl, - ServiceResolver: apiExtensions.ExtraConfig.ServiceResolver, - AuthResolverWrapper: apiExtensions.ExtraConfig.AuthResolverWrapper, - MasterCount: apiExtensions.ExtraConfig.MasterCount, - Authorizer: apiExtensions.GenericConfig.Authorization.Authorizer, - RequestTimeout: apiExtensions.GenericConfig.RequestTimeout, - MinRequestTimeout: time.Duration(apiExtensions.GenericConfig.MinRequestTimeout) * time.Second, - MaxRequestBodyBytes: apiExtensions.GenericConfig.MaxRequestBodyBytes, + BaseAPIExtensionsConfig: apiExtensions, + PathPrefix: mcOpts.PathPrefix, + ControlPlaneSegment: mcOpts.ControlPlaneSegment, + DefaultCluster: mcOpts.DefaultCluster, }) prevOnClusterSelected := mcOpts.OnClusterSelected mcOpts.OnClusterSelected = func(clusterID string) { diff --git a/pkg/multicluster/bootstrap/crd_runtime_manager.go b/pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go similarity index 62% rename from pkg/multicluster/bootstrap/crd_runtime_manager.go rename to pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go index 1ac6c64..eea6e23 100644 --- a/pkg/multicluster/bootstrap/crd_runtime_manager.go +++ b/pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go @@ -7,19 +7,15 @@ import ( "sync" "time" + "golang.org/x/sync/singleflight" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apiserver/pkg/admission" - "k8s.io/apiserver/pkg/authorization/authorizer" - genericregistry "k8s.io/apiserver/pkg/registry/generic" - "k8s.io/apiserver/pkg/util/webhook" + "k8s.io/apiserver/pkg/server" "k8s.io/client-go/rest" "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" - "k8s.io/kube-openapi/pkg/validation/spec" - "golang.org/x/sync/singleflight" mc "github.com/kplane-dev/apiserver/pkg/multicluster" ) @@ -40,18 +36,11 @@ var ( ) type CRDRuntimeManagerOptions struct { - BaseLoopbackClientConfig *rest.Config - PathPrefix, ControlPlaneSegment, DefaultCluster string - Delegate http.Handler - CRDRESTOptionsGetter genericregistry.RESTOptionsGetter - Admission admission.Interface - ServiceResolver webhook.ServiceResolver - AuthResolverWrapper webhook.AuthenticationInfoResolverWrapper - MasterCount int - Authorizer authorizer.Authorizer - RequestTimeout, MinRequestTimeout time.Duration - StaticOpenAPISpec map[string]*spec.Schema - MaxRequestBodyBytes int64 + BaseAPIExtensionsConfig *apiextensionsapiserver.Config + PathPrefix string + ControlPlaneSegment string + DefaultCluster string + Delegate http.Handler } type servesCacheEntry struct { @@ -59,51 +48,52 @@ type servesCacheEntry struct { exp time.Time } +type runtimeEntry struct { + handler http.Handler + server *server.GenericAPIServer + cancel context.CancelFunc +} + type clusterState struct { - rt *apiextensionsapiserver.ClusterScopedCRDRuntime - cs *apiextensionsclient.Clientset + r runtimeEntry + c *apiextensionsclient.Clientset } type CRDRuntimeManager struct { opts CRDRuntimeManagerOptions - mu sync.Mutex - runtimes map[string]*apiextensionsapiserver.ClusterScopedCRDRuntime - clients map[string]*apiextensionsclient.Clientset - started map[string]bool - - cache map[string]servesCacheEntry + mu sync.Mutex + runtimes map[string]runtimeEntry + clients map[string]*apiextensionsclient.Clientset + started map[string]bool + cache map[string]servesCacheEntry clusterKeys map[string]map[string]struct{} - createSF singleflight.Group + createSF singleflight.Group } func NewCRDRuntimeManager(opts CRDRuntimeManagerOptions) *CRDRuntimeManager { crdRuntimeMetricsOnce.Do(func() { legacyregistry.MustRegister(crdRuntimeCreateTotal, crdServesLookupTotal, crdServesCacheHit, crdServesCacheMiss, crdServesLookupLat) }) - if opts.BaseLoopbackClientConfig != nil { - opts.BaseLoopbackClientConfig = rest.CopyConfig(opts.BaseLoopbackClientConfig) - } return &CRDRuntimeManager{ - opts: opts, - runtimes: map[string]*apiextensionsapiserver.ClusterScopedCRDRuntime{}, - clients: map[string]*apiextensionsclient.Clientset{}, - started: map[string]bool{}, - cache: map[string]servesCacheEntry{}, + opts: opts, + runtimes: map[string]runtimeEntry{}, + clients: map[string]*apiextensionsclient.Clientset{}, + started: map[string]bool{}, + cache: map[string]servesCacheEntry{}, clusterKeys: map[string]map[string]struct{}{}, } } func (m *CRDRuntimeManager) Runtime(clusterID string, stopCh <-chan struct{}) (http.Handler, error) { - if m == nil || clusterID == "" || clusterID == m.opts.DefaultCluster || m.opts.BaseLoopbackClientConfig == nil { + if m == nil || clusterID == "" || clusterID == m.opts.DefaultCluster || m.opts.BaseAPIExtensionsConfig == nil { return nil, nil } - rt, _, err := m.ensureClusterState(clusterID, stopCh) - if err != nil || rt == nil { + state, err := m.ensureClusterState(clusterID, stopCh) + if err != nil { return nil, err } - rt.Start(stopCh) - return rt.Handler(), nil + return state.r.handler, nil } func (m *CRDRuntimeManager) ServesGroupVersion(clusterID, group, version string, stopCh <-chan struct{}) (bool, error) { @@ -120,7 +110,8 @@ func (m *CRDRuntimeManager) ServesGroupVersion(clusterID, group, version string, return served, nil } crdServesCacheMiss.WithLabelValues("miss").Inc() - _, cs, err := m.ensureClusterState(clusterID, stopCh) + + state, err := m.ensureClusterState(clusterID, stopCh) if err != nil { crdServesLookupTotal.WithLabelValues("error").Inc() crdServesLookupLat.WithLabelValues("error").Observe(time.Since(start).Seconds()) @@ -128,12 +119,13 @@ func (m *CRDRuntimeManager) ServesGroupVersion(clusterID, group, version string, } ctx, cancel := context.WithTimeout(context.Background(), servesLookupTimeout) defer cancel() - list, err := cs.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) + list, err := state.c.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) if err != nil { crdServesLookupTotal.WithLabelValues("error").Inc() crdServesLookupLat.WithLabelValues("error").Observe(time.Since(start).Seconds()) return false, err } + served := false for i := range list.Items { crd := &list.Items[i] @@ -157,91 +149,146 @@ func (m *CRDRuntimeManager) ServesGroupVersion(clusterID, group, version string, return served, nil } -func (m *CRDRuntimeManager) ensureClusterState(clusterID string, stopCh <-chan struct{}) (*apiextensionsapiserver.ClusterScopedCRDRuntime, *apiextensionsclient.Clientset, error) { +func (m *CRDRuntimeManager) ensureClusterState(clusterID string, stopCh <-chan struct{}) (clusterState, error) { m.mu.Lock() - if rt, ok := m.runtimes[clusterID]; ok { - cs := m.clients[clusterID] + if r, ok := m.runtimes[clusterID]; ok { + c := m.clients[clusterID] m.mu.Unlock() - return rt, cs, nil + return clusterState{r: r, c: c}, nil } m.mu.Unlock() + v, err, _ := m.createSF.Do(clusterID, func() (any, error) { m.mu.Lock() - if rt, ok := m.runtimes[clusterID]; ok { - cs := m.clients[clusterID] + if r, ok := m.runtimes[clusterID]; ok { + c := m.clients[clusterID] m.mu.Unlock() - return clusterState{rt: rt, cs: cs}, nil + return clusterState{r: r, c: c}, nil } m.mu.Unlock() - cfg := rest.CopyConfig(m.opts.BaseLoopbackClientConfig) - host, err := mc.ClusterHost(cfg.Host, mc.Options{PathPrefix: m.opts.PathPrefix, ControlPlaneSegment: m.opts.ControlPlaneSegment}, clusterID) + + if m.opts.BaseAPIExtensionsConfig == nil || m.opts.BaseAPIExtensionsConfig.GenericConfig == nil { + crdRuntimeCreateTotal.WithLabelValues("error").Inc() + return nil, fmt.Errorf("base apiextensions config is required") + } + baseGeneric := *m.opts.BaseAPIExtensionsConfig.GenericConfig + loopback := rest.CopyConfig(baseGeneric.LoopbackClientConfig) + host, err := mc.ClusterHost(loopback.Host, mc.Options{ + PathPrefix: m.opts.PathPrefix, + ControlPlaneSegment: m.opts.ControlPlaneSegment, + }, clusterID) if err != nil { crdRuntimeCreateTotal.WithLabelValues("error").Inc() return nil, fmt.Errorf("build cluster host: %w", err) } - cfg.Host = host - rt, err := apiextensionsapiserver.NewClusterScopedCRDRuntime(apiextensionsapiserver.ClusterScopedCRDConfig{ - LoopbackClientConfig: cfg, Delegate: m.opts.Delegate, CRDRESTOptionsGetter: m.opts.CRDRESTOptionsGetter, - Admission: m.opts.Admission, ServiceResolver: m.opts.ServiceResolver, AuthResolverWrapper: m.opts.AuthResolverWrapper, - MasterCount: m.opts.MasterCount, Authorizer: m.opts.Authorizer, RequestTimeout: m.opts.RequestTimeout, MinRequestTimeout: m.opts.MinRequestTimeout, - StaticOpenAPISpec: m.opts.StaticOpenAPISpec, MaxRequestBodyBytes: m.opts.MaxRequestBodyBytes, - }) + loopback.Host = host + baseGeneric.LoopbackClientConfig = loopback + + baseCfg := *m.opts.BaseAPIExtensionsConfig + baseCfg.GenericConfig = &baseGeneric + completed := baseCfg.Complete() + delegate := m.opts.Delegate + if delegate == nil { + delegate = http.NotFoundHandler() + } + crdServer, err := completed.New(server.NewEmptyDelegateWithCustomHandler(delegate)) if err != nil { crdRuntimeCreateTotal.WithLabelValues("error").Inc() return nil, err } - cs, err := apiextensionsclient.NewForConfig(rest.CopyConfig(cfg)) + runCtx, cancel := context.WithCancel(context.Background()) + go crdServer.GenericAPIServer.RunPostStartHooks(runCtx) + + cs, err := apiextensionsclient.NewForConfig(rest.CopyConfig(loopback)) if err != nil { + cancel() + crdServer.GenericAPIServer.Destroy() crdRuntimeCreateTotal.WithLabelValues("error").Inc() return nil, err } - rt.Start(stopCh) + entry := runtimeEntry{ + handler: crdServer.GenericAPIServer.Handler.NonGoRestfulMux, + server: crdServer.GenericAPIServer, + cancel: cancel, + } + if stopCh != nil { + go func() { + <-stopCh + cancel() + crdServer.GenericAPIServer.Destroy() + }() + } + m.mu.Lock() - m.runtimes[clusterID] = rt + m.runtimes[clusterID] = entry m.clients[clusterID] = cs if !m.started[clusterID] { m.started[clusterID] = true go m.watchUpdates(clusterID, cs, stopCh) } m.mu.Unlock() + crdRuntimeCreateTotal.WithLabelValues("success").Inc() - return clusterState{rt: rt, cs: cs}, nil + return clusterState{r: entry, c: cs}, nil }) if err != nil { - return nil, nil, err + return clusterState{}, err + } + state, ok := v.(clusterState) + if !ok { + return clusterState{}, fmt.Errorf("unexpected cluster state type %T", v) } - s := v.(clusterState) - return s.rt, s.cs, nil + return state, nil } func (m *CRDRuntimeManager) watchUpdates(clusterID string, cs *apiextensionsclient.Clientset, stopCh <-chan struct{}) { for { - select { case <-stopCh: return; default: } + select { + case <-stopCh: + return + default: + } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) w, err := cs.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{AllowWatchBookmarks: true, ResourceVersion: "0"}) if err != nil { cancel() - select { case <-stopCh: return; case <-time.After(watchRetryBackoff): continue } + select { + case <-stopCh: + return + case <-time.After(watchRetryBackoff): + continue + } } m.invalidateCluster(clusterID) closed := false for !closed { select { case <-stopCh: - w.Stop(); cancel(); return + w.Stop() + cancel() + return case _, ok := <-w.ResultChan(): - if !ok { closed = true; break } + if !ok { + closed = true + break + } m.invalidateCluster(clusterID) } } - w.Stop(); cancel() - select { case <-stopCh: return; case <-time.After(watchRetryBackoff): } + w.Stop() + cancel() + select { + case <-stopCh: + return + case <-time.After(watchRetryBackoff): + } } } func (m *CRDRuntimeManager) getCache(key string) (bool, bool) { now := time.Now() - m.mu.Lock(); defer m.mu.Unlock() + m.mu.Lock() + defer m.mu.Unlock() v, ok := m.cache[key] if !ok || now.After(v.exp) { delete(m.cache, key) @@ -251,7 +298,8 @@ func (m *CRDRuntimeManager) getCache(key string) (bool, bool) { } func (m *CRDRuntimeManager) setCache(clusterID, key string, served bool) { - m.mu.Lock(); defer m.mu.Unlock() + m.mu.Lock() + defer m.mu.Unlock() m.cache[key] = servesCacheEntry{served: served, exp: time.Now().Add(servesCacheTTL)} if m.clusterKeys[clusterID] == nil { m.clusterKeys[clusterID] = map[string]struct{}{} @@ -260,7 +308,8 @@ func (m *CRDRuntimeManager) setCache(clusterID, key string, served bool) { } func (m *CRDRuntimeManager) invalidateCluster(clusterID string) { - m.mu.Lock(); defer m.mu.Unlock() + m.mu.Lock() + defer m.mu.Unlock() for k := range m.clusterKeys[clusterID] { delete(m.cache, k) } @@ -282,4 +331,3 @@ func result(served bool) string { } return "not_served" } -