diff --git a/cmd/apiserver/app/config.go b/cmd/apiserver/app/config.go index 73dd574..5c2f474 100644 --- a/cmd/apiserver/app/config.go +++ b/cmd/apiserver/app/config.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" namespaceplugin "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle" + apirequest "k8s.io/apiserver/pkg/endpoints/request" genericfilters "k8s.io/apiserver/pkg/endpoints/filters" "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/server" @@ -363,6 +364,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { // Ensure RequestInfo is computed from the normalized /apis path // before entering the cluster-scoped CRD runtime handler. h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver) + h = withClusterCRDRequestInfoRewrite(h, clusterID) h = genericfilters.WithAuditInit(h) h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver) h.ServeHTTP(w, r) @@ -461,6 +463,23 @@ func setCRDRequestResolvers( } } +func withClusterCRDRequestInfoRewrite(next http.Handler, clusterID string) http.Handler { + if next == nil || clusterID == "" { + return next + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ri, ok := apirequest.RequestInfoFrom(r.Context()) + if !ok || ri == nil || !ri.IsResourceRequest || ri.APIGroup == "" || ri.APIGroup == "apiextensions.k8s.io" { + next.ServeHTTP(w, r) + return + } + rewritten := *ri + rewritten.Resource = mcbootstrap.EncodeSharedCRDResourceName(clusterID, ri.Resource) + ctx := apirequest.WithRequestInfo(r.Context(), &rewritten) + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} + func decorateRESTOptionsGetter(server string, getter generic.RESTOptionsGetter, opts mc.Options) generic.RESTOptionsGetter { if _, ok := getter.(mc.RESTOptionsDecorator); ok { klog.Infof("mc.restOptionsGetter server=%s alreadyDecorated=true", server) diff --git a/pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go b/pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go index 3ccb215..2e20978 100644 --- a/pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go +++ b/pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go @@ -161,6 +161,9 @@ func (m *CRDRuntimeManager) ensureSharedRuntime(stopCh <-chan struct{}) (runtime return nil, fmt.Errorf("base apiextensions config is required") } baseGeneric := *m.opts.BaseAPIExtensionsConfig.GenericConfig + if baseGeneric.LoopbackClientConfig != nil { + baseGeneric.LoopbackClientConfig = allClustersLoopbackConfig(baseGeneric.LoopbackClientConfig) + } baseCfg := *m.opts.BaseAPIExtensionsConfig baseCfg.GenericConfig = &baseGeneric completed := baseCfg.Complete() @@ -580,6 +583,11 @@ func allClustersAPIExtensionsClient(base *rest.Config) (apiextensionsclient.Inte if base == nil { return nil, fmt.Errorf("base loopback config is required") } + cfg := allClustersLoopbackConfig(base) + return apiextensionsclient.NewForConfig(cfg) +} + +func allClustersLoopbackConfig(base *rest.Config) *rest.Config { cfg := rest.CopyConfig(base) cfg.Impersonate.UserName = mc.DefaultInternalCrossClusterUser cfg.Impersonate.Groups = []string{"system:authenticated", "system:masters"} @@ -592,7 +600,7 @@ func allClustersAPIExtensionsClient(base *rest.Config) (apiextensionsclient.Inte } else { cfg.UserAgent = mc.DefaultInternalCrossClusterUserAgent + " " + cfg.UserAgent } - return apiextensionsclient.NewForConfig(cfg) + return cfg } func (m *CRDRuntimeManager) sharedStartStopCh(stopCh <-chan struct{}) <-chan struct{} { @@ -650,6 +658,13 @@ func encodeSharedCRDName(clusterID, name string) string { return prefix + name } +// EncodeSharedCRDResourceName returns a cluster-unique resource token that +// preserves the "." CRD name relation used by +// upstream CRD handler lookup. +func EncodeSharedCRDResourceName(clusterID, resource string) string { + return encodeSharedCRDName(clusterID, resource) +} + func decodeSharedCRDName(clusterID, name string) string { if clusterID == "" || name == "" { return name diff --git a/pkg/multicluster/bootstrap/crd_serves_index.go b/pkg/multicluster/bootstrap/crd_serves_index.go index c4fd36a..6b27a9f 100644 --- a/pkg/multicluster/bootstrap/crd_serves_index.go +++ b/pkg/multicluster/bootstrap/crd_serves_index.go @@ -67,6 +67,9 @@ func (i *CRDServesIndex) UpsertCRD(clusterID string, crd *apiextensionsv1.Custom i.mu.Lock() defer i.mu.Unlock() i.setCRDKeysLocked(clusterID, crd.Name, keys) + // New clusters often appear after the initial informer sync/rebuild. + // Mark as synced on first observed event to avoid false "unknown" lookups. + i.clusterSynced[clusterID] = true } func (i *CRDServesIndex) DeleteCRD(clusterID string, crd *apiextensionsv1.CustomResourceDefinition) { @@ -76,6 +79,8 @@ func (i *CRDServesIndex) DeleteCRD(clusterID string, crd *apiextensionsv1.Custom i.mu.Lock() defer i.mu.Unlock() i.setCRDKeysLocked(clusterID, crd.Name, nil) + // Delete events are also a valid synchronization signal for the cluster. + i.clusterSynced[clusterID] = true } func (i *CRDServesIndex) InvalidateCluster(clusterID string) {