Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions cmd/apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
genericfilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server"
serverfilters "k8s.io/apiserver/pkg/server/filters"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -157,7 +158,10 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
return
}
if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil {
genericfilters.WithRequestInfo(h, conf.RequestInfoResolver).ServeHTTP(w, r)
h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver)
h = genericfilters.WithAuditInit(h)
h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver)
h.ServeHTTP(w, r)
return
}
klog.Errorf("mc.crdRuntime unresolved at kube cluster=%s path=%s", cid, r.URL.Path)
Expand Down Expand Up @@ -346,7 +350,10 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil {
// Ensure RequestInfo is computed from the normalized /apis path
// before entering the cluster-scoped CRD runtime handler.
genericfilters.WithRequestInfo(h, conf.RequestInfoResolver).ServeHTTP(w, r)
h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver)
h = genericfilters.WithAuditInit(h)
h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver)
h.ServeHTTP(w, r)
return
}
klog.Errorf("mc.crdRuntime unresolved cluster=%s path=%s", cid, r.URL.Path)
Expand Down Expand Up @@ -395,7 +402,10 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
if served {
if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil {
genericfilters.WithRequestInfo(h, conf.RequestInfoResolver).ServeHTTP(w, r)
h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver)
h = genericfilters.WithAuditInit(h)
h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver)
h.ServeHTTP(w, r)
return
}
klog.Errorf("mc.crdRuntime unresolved at aggregator cluster=%s path=%s", cid, r.URL.Path)
Expand Down
99 changes: 99 additions & 0 deletions test/smoke/crd_per_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package smoke

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
Expand All @@ -14,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -166,6 +168,46 @@ func TestCRDResourceCRUDPerClusterIsolation(t *testing.T) {
}
}

func TestCRDStatusSubresourcePatchPerCluster(t *testing.T) {
etcd := os.Getenv("ETCD_ENDPOINTS")
s := startAPIServer(t, etcd)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

clusterID := "c-" + randSuffix(3)
group := fmt.Sprintf("status-%s.kplane.dev", randSuffix(3))
plural := "controlplaneregistrations"
crdName := plural + "." + group
gvr := schema.GroupVersionResource{Group: group, Version: "v1", Resource: plural}
namespace := "default"

crdClient := apixClientForCluster(t, s, clusterID)
kubeClient := kubeClientForCluster(t, s, clusterID)
dyn := dynamicClientForCluster(t, s, clusterID)

if err := waitForNamespace(ctx, kubeClient, namespace); err != nil {
t.Fatalf("cluster=%s wait namespace %s: %v", clusterID, namespace, err)
}

createTestCRDWithStatusSubresource(ctx, t, crdClient, crdName, group, plural, "ControlPlaneRegistration", "ControlPlaneRegistrationList")
waitForCRDEstablished(ctx, t, crdClient, clusterID, crdName)
waitForResourcePresence(t, kubeClient, clusterID, group+"/v1", plural, true)

if _, err := dyn.Resource(gvr).Namespace(namespace).Create(ctx, testCRWithStatus(group, "test-registration"), metav1.CreateOptions{}); err != nil {
t.Fatalf("cluster=%s create CR with status: %v", clusterID, err)
}

patchBody, _ := json.Marshal(map[string]any{
"status": map[string]any{
"phase": "Ready",
},
})
if _, err := dyn.Resource(gvr).Namespace(namespace).Patch(ctx, "test-registration", types.MergePatchType, patchBody, metav1.PatchOptions{}, "status"); err != nil {
t.Fatalf("cluster=%s patch status subresource failed: %v", clusterID, err)
}
}

func apixClientForCluster(t *testing.T, s *testAPIServer, clusterID string) *apiextensionsclient.Clientset {
t.Helper()
cfg := kubeConfigForCluster(s.clusterURL(clusterID))
Expand Down Expand Up @@ -201,6 +243,24 @@ func testWidget(group, name string) *unstructured.Unstructured {
}
}

func testCRWithStatus(group, name string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]any{
"apiVersion": group + "/v1",
"kind": "ControlPlaneRegistration",
"metadata": map[string]any{
"name": name,
},
"spec": map[string]any{
"foo": "bar",
},
"status": map[string]any{
"phase": "Pending",
},
},
}
}

func createTestCRD(ctx context.Context, t *testing.T, cs *apiextensionsclient.Clientset, crdName, group, plural string) {
t.Helper()
_, err := cs.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &apiextensionsv1.CustomResourceDefinition{
Expand Down Expand Up @@ -231,6 +291,45 @@ func createTestCRD(ctx context.Context, t *testing.T, cs *apiextensionsclient.Cl
}
}

func createTestCRDWithStatusSubresource(ctx context.Context, t *testing.T, cs *apiextensionsclient.Clientset, crdName, group, plural, kind, listKind string) {
t.Helper()
_, err := cs.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{Name: crdName},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: group,
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: plural,
Singular: strings.TrimSuffix(plural, "s"),
Kind: kind,
ListKind: listKind,
},
Scope: apiextensionsv1.NamespaceScoped,
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Storage: true,
Schema: &apiextensionsv1.CustomResourceValidation{
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensionsv1.JSONSchemaProps{
"spec": {Type: "object"},
"status": {Type: "object"},
},
},
},
Subresources: &apiextensionsv1.CustomResourceSubresources{
Status: &apiextensionsv1.CustomResourceSubresourceStatus{},
},
},
},
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("create CRD with status %s: %v", crdName, err)
}
}

func waitForCRDEstablished(ctx context.Context, t *testing.T, cs *apiextensionsclient.Clientset, clusterID, crdName string) {
t.Helper()
deadline := time.Now().Add(60 * time.Second)
Expand Down