From 44f0a5b2473f61b7ce3268b12f00cfd01f48f2fe Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Fri, 13 Feb 2026 15:14:31 -0800 Subject: [PATCH] fix: route all crd traffic through mc aggregator --- cmd/apiserver/app/config.go | 15 +----- test/smoke/crd_per_cluster_test.go | 87 ++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 13 deletions(-) diff --git a/cmd/apiserver/app/config.go b/cmd/apiserver/app/config.go index b0d2a9c..0e4959a 100644 --- a/cmd/apiserver/app/config.go +++ b/cmd/apiserver/app/config.go @@ -385,8 +385,8 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { base := server.DefaultBuildHandlerChain(h, conf) dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { cid, _, _ := mc.FromContext(r.Context()) - if r.Method == http.MethodGet && cid != "" && cid != mcOpts.DefaultCluster && crdRuntimeMgr != nil { - if group, version, ok := exactAPIsGroupVersionDiscoveryPath(r.URL.Path); ok { + if cid != "" && cid != mcOpts.DefaultCluster && crdRuntimeMgr != nil { + if group, version, ok := apisGroupVersionFromPath(r.URL.Path); ok { served, err := crdRuntimeMgr.ServesGroupVersion(cid, group, version, genericConfig.DrainedNotify()) if err != nil { klog.Errorf("mc.crdRuntime lookup failed at aggregator cluster=%s path=%s err=%v", cid, r.URL.Path, err) @@ -447,15 +447,4 @@ func apisGroupVersionFromPath(path string) (group, version string, ok bool) { return parts[1], parts[2], true } -func exactAPIsGroupVersionDiscoveryPath(path string) (group, version string, ok bool) { - group, version, ok = apisGroupVersionFromPath(path) - if !ok { - return "", "", false - } - parts := strings.Split(strings.Trim(path, "/"), "/") - if len(parts) != 3 { - return "", "", false - } - return group, version, true -} diff --git a/test/smoke/crd_per_cluster_test.go b/test/smoke/crd_per_cluster_test.go index 88dff61..308c816 100644 --- a/test/smoke/crd_per_cluster_test.go +++ b/test/smoke/crd_per_cluster_test.go @@ -4,13 +4,17 @@ import ( "context" "fmt" "os" + "strings" "testing" "time" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -104,6 +108,64 @@ func TestCRDDiscoveryUpdateInADoesNotAffectB(t *testing.T) { waitForResourcePresence(t, kubeClientB, clusterB, group+"/v2", plural, false) } +func TestCRDResourceCRUDPerClusterIsolation(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + clusterA := "c-" + randSuffix(3) + clusterB := "c-" + randSuffix(3) + group := fmt.Sprintf("crud-%s.kplane.dev", randSuffix(3)) + plural := "testwidgets" + crdName := plural + "." + group + gvr := schema.GroupVersionResource{Group: group, Version: "v1", Resource: plural} + namespace := "default" + + crdClientA := apixClientForCluster(t, s, clusterA) + crdClientB := apixClientForCluster(t, s, clusterB) + kubeClientA := kubeClientForCluster(t, s, clusterA) + kubeClientB := kubeClientForCluster(t, s, clusterB) + dynA := dynamicClientForCluster(t, s, clusterA) + dynB := dynamicClientForCluster(t, s, clusterB) + + if err := waitForNamespace(ctx, kubeClientA, namespace); err != nil { + t.Fatalf("cluster=%s wait namespace %s: %v", clusterA, namespace, err) + } + if err := waitForNamespace(ctx, kubeClientB, namespace); err != nil { + t.Fatalf("cluster=%s wait namespace %s: %v", clusterB, namespace, err) + } + + createTestCRD(ctx, t, crdClientA, crdName, group, plural) + waitForCRDEstablished(ctx, t, crdClientA, clusterA, crdName) + waitForResourcePresence(t, kubeClientA, clusterA, group+"/v1", plural, true) + + _, err := dynA.Resource(gvr).Namespace(namespace).Create(ctx, testWidget(group, "w-a"), metav1.CreateOptions{}) + if err != nil { + t.Fatalf("cluster=%s create CR instance: %v", clusterA, err) + } + if _, err := dynA.Resource(gvr).Namespace(namespace).Get(ctx, "w-a", metav1.GetOptions{}); err != nil { + t.Fatalf("cluster=%s get CR instance: %v", clusterA, err) + } + + _, err = dynB.Resource(gvr).Namespace(namespace).Create(ctx, testWidget(group, "w-b"), metav1.CreateOptions{}) + if err == nil { + t.Fatalf("cluster=%s expected create CR instance to fail before CRD install", clusterB) + } + if !apierrors.IsNotFound(err) && !strings.Contains(strings.ToLower(err.Error()), "not found") { + t.Fatalf("cluster=%s expected not found before CRD install, got: %v", clusterB, err) + } + + createTestCRD(ctx, t, crdClientB, crdName, group, plural) + waitForCRDEstablished(ctx, t, crdClientB, clusterB, crdName) + waitForResourcePresence(t, kubeClientB, clusterB, group+"/v1", plural, true) + + if _, err := dynB.Resource(gvr).Namespace(namespace).Create(ctx, testWidget(group, "w-b"), metav1.CreateOptions{}); err != nil { + t.Fatalf("cluster=%s create CR instance after CRD install: %v", clusterB, err) + } +} + func apixClientForCluster(t *testing.T, s *testAPIServer, clusterID string) *apiextensionsclient.Clientset { t.Helper() cfg := kubeConfigForCluster(s.clusterURL(clusterID)) @@ -114,6 +176,31 @@ func apixClientForCluster(t *testing.T, s *testAPIServer, clusterID string) *api return cs } +func dynamicClientForCluster(t *testing.T, s *testAPIServer, clusterID string) dynamic.Interface { + t.Helper() + cfg := kubeConfigForCluster(s.clusterURL(clusterID)) + c, err := dynamic.NewForConfig(rest.CopyConfig(cfg)) + if err != nil { + t.Fatalf("cluster=%s build dynamic client: %v", clusterID, err) + } + return c +} + +func testWidget(group, name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": group + "/v1", + "kind": "TestWidget", + "metadata": map[string]any{ + "name": name, + }, + "spec": map[string]any{ + "foo": "bar", + }, + }, + } +} + func createTestCRD(ctx context.Context, t *testing.T, cs *apiextensionsclient.Clientset, crdName, group, plural string) { t.Helper() _, err := cs.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &apiextensionsv1.CustomResourceDefinition{