Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cmd/apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
namespaceplugin "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
genericfilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server"
Expand Down Expand Up @@ -363,6 +364,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
// Ensure RequestInfo is computed from the normalized /apis path
// before entering the cluster-scoped CRD runtime handler.
h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver)
h = withClusterCRDRequestInfoRewrite(h, clusterID)
h = genericfilters.WithAuditInit(h)
h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver)
h.ServeHTTP(w, r)
Expand Down Expand Up @@ -461,6 +463,23 @@ func setCRDRequestResolvers(
}
}

func withClusterCRDRequestInfoRewrite(next http.Handler, clusterID string) http.Handler {
if next == nil || clusterID == "" {
return next
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ri, ok := apirequest.RequestInfoFrom(r.Context())
if !ok || ri == nil || !ri.IsResourceRequest || ri.APIGroup == "" || ri.APIGroup == "apiextensions.k8s.io" {
next.ServeHTTP(w, r)
return
}
rewritten := *ri
rewritten.Resource = mcbootstrap.EncodeSharedCRDResourceName(clusterID, ri.Resource)
ctx := apirequest.WithRequestInfo(r.Context(), &rewritten)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

func decorateRESTOptionsGetter(server string, getter generic.RESTOptionsGetter, opts mc.Options) generic.RESTOptionsGetter {
if _, ok := getter.(mc.RESTOptionsDecorator); ok {
klog.Infof("mc.restOptionsGetter server=%s alreadyDecorated=true", server)
Expand Down
17 changes: 16 additions & 1 deletion pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (m *CRDRuntimeManager) ensureSharedRuntime(stopCh <-chan struct{}) (runtime
return nil, fmt.Errorf("base apiextensions config is required")
}
baseGeneric := *m.opts.BaseAPIExtensionsConfig.GenericConfig
if baseGeneric.LoopbackClientConfig != nil {
baseGeneric.LoopbackClientConfig = allClustersLoopbackConfig(baseGeneric.LoopbackClientConfig)
}
baseCfg := *m.opts.BaseAPIExtensionsConfig
baseCfg.GenericConfig = &baseGeneric
completed := baseCfg.Complete()
Expand Down Expand Up @@ -580,6 +583,11 @@ func allClustersAPIExtensionsClient(base *rest.Config) (apiextensionsclient.Inte
if base == nil {
return nil, fmt.Errorf("base loopback config is required")
}
cfg := allClustersLoopbackConfig(base)
return apiextensionsclient.NewForConfig(cfg)
}

func allClustersLoopbackConfig(base *rest.Config) *rest.Config {
cfg := rest.CopyConfig(base)
cfg.Impersonate.UserName = mc.DefaultInternalCrossClusterUser
cfg.Impersonate.Groups = []string{"system:authenticated", "system:masters"}
Expand All @@ -592,7 +600,7 @@ func allClustersAPIExtensionsClient(base *rest.Config) (apiextensionsclient.Inte
} else {
cfg.UserAgent = mc.DefaultInternalCrossClusterUserAgent + " " + cfg.UserAgent
}
return apiextensionsclient.NewForConfig(cfg)
return cfg
}

func (m *CRDRuntimeManager) sharedStartStopCh(stopCh <-chan struct{}) <-chan struct{} {
Expand Down Expand Up @@ -650,6 +658,13 @@ func encodeSharedCRDName(clusterID, name string) string {
return prefix + name
}

// EncodeSharedCRDResourceName returns a cluster-unique resource token that
// preserves the "<encodedResource>.<group>" CRD name relation used by
// upstream CRD handler lookup.
func EncodeSharedCRDResourceName(clusterID, resource string) string {
return encodeSharedCRDName(clusterID, resource)
}

func decodeSharedCRDName(clusterID, name string) string {
if clusterID == "" || name == "" {
return name
Expand Down
5 changes: 5 additions & 0 deletions pkg/multicluster/bootstrap/crd_serves_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (i *CRDServesIndex) UpsertCRD(clusterID string, crd *apiextensionsv1.Custom
i.mu.Lock()
defer i.mu.Unlock()
i.setCRDKeysLocked(clusterID, crd.Name, keys)
// New clusters often appear after the initial informer sync/rebuild.
// Mark as synced on first observed event to avoid false "unknown" lookups.
i.clusterSynced[clusterID] = true
}

func (i *CRDServesIndex) DeleteCRD(clusterID string, crd *apiextensionsv1.CustomResourceDefinition) {
Expand All @@ -76,6 +79,8 @@ func (i *CRDServesIndex) DeleteCRD(clusterID string, crd *apiextensionsv1.Custom
i.mu.Lock()
defer i.mu.Unlock()
i.setCRDKeysLocked(clusterID, crd.Name, nil)
// Delete events are also a valid synchronization signal for the cluster.
i.clusterSynced[clusterID] = true
}

func (i *CRDServesIndex) InvalidateCluster(clusterID string) {
Expand Down