Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 124 additions & 3 deletions cmd/apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package app

import (
"net/http"
"strings"

apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
namespaceplugin "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
genericfilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -121,6 +123,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
clientPool := mc.NewClientPool(genericConfig.LoopbackClientConfig, mcOpts.PathPrefix, mcOpts.ControlPlaneSegment)
informerPool := mc.NewInformerPoolFromClientPool(clientPool, 0, genericConfig.DrainedNotify())
var crdRuntimeMgr *mcbootstrap.CRDRuntimeManager
systemNamespaceBootstrapper := mcbootstrap.NewSystemNamespaceBootstrapper(mcbootstrap.SystemNamespaceOptions{
ClientForCluster: clientPool.KubeClientForCluster,
Namespaces: opts.SystemNamespaces,
Expand All @@ -138,7 +141,33 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
})
genericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler {
ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment}
return mc.WithClusterRouting(server.DefaultBuildHandlerChain(h, conf), ex, mcOpts)
base := server.DefaultBuildHandlerChain(h, conf)
dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cid, _, _ := mc.FromContext(r.Context())
if cid != "" && cid != mcOpts.DefaultCluster && crdRuntimeMgr != nil {
if group, version, ok := apisGroupVersionFromPath(r.URL.Path); ok {
served, err := crdRuntimeMgr.ServesGroupVersion(cid, group, version, genericConfig.DrainedNotify())
if err != nil {
klog.Errorf("mc.crdRuntime lookup failed at kube cluster=%s path=%s err=%v", cid, r.URL.Path, err)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return
}
if !served {
base.ServeHTTP(w, r)
return
}
if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil {
genericfilters.WithRequestInfo(h, conf.RequestInfoResolver).ServeHTTP(w, r)
return
}
klog.Errorf("mc.crdRuntime unresolved at kube cluster=%s path=%s", cid, r.URL.Path)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return
}
}
base.ServeHTTP(w, r)
})
return mc.WithClusterRouting(dispatch, ex, mcOpts)
}

authManager := mcauth.NewManager(wait.ContextForChannel(genericConfig.DrainedNotify()), mcauth.Options{
Expand Down Expand Up @@ -280,10 +309,54 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
if apiExtensions.GenericConfig.RESTOptionsGetter != nil {
apiExtensions.GenericConfig.RESTOptionsGetter = decorateRESTOptionsGetter("apiextensions", apiExtensions.GenericConfig.RESTOptionsGetter, mcOpts)
}
crdRuntimeMgr = mcbootstrap.NewCRDRuntimeManager(mcbootstrap.CRDRuntimeManagerOptions{
BaseAPIExtensionsConfig: apiExtensions,
PathPrefix: mcOpts.PathPrefix,
ControlPlaneSegment: mcOpts.ControlPlaneSegment,
DefaultCluster: mcOpts.DefaultCluster,
})
prevOnClusterSelected := mcOpts.OnClusterSelected
mcOpts.OnClusterSelected = func(clusterID string) {
if prevOnClusterSelected != nil {
prevOnClusterSelected(clusterID)
}
if clusterID == "" || clusterID == mcOpts.DefaultCluster {
return
}
_, _ = crdRuntimeMgr.Runtime(clusterID, genericConfig.DrainedNotify())
}
// Ensure CRDs are also routed through the multicluster handler
apiExtensions.GenericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler {
ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment}
return mc.WithClusterRouting(server.DefaultBuildHandlerChain(h, conf), ex, mcOpts)
base := server.DefaultBuildHandlerChain(h, conf)
dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cid, _, _ := mc.FromContext(r.Context())
if cid != "" && cid != mcOpts.DefaultCluster {
if group, version, ok := apisGroupVersionFromPath(r.URL.Path); ok {
served, err := crdRuntimeMgr.ServesGroupVersion(cid, group, version, genericConfig.DrainedNotify())
if err != nil {
klog.Errorf("mc.crdRuntime lookup failed at apiextensions cluster=%s path=%s err=%v", cid, r.URL.Path, err)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return
}
if !served {
base.ServeHTTP(w, r)
return
}
if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil {
// Ensure RequestInfo is computed from the normalized /apis path
// before entering the cluster-scoped CRD runtime handler.
genericfilters.WithRequestInfo(h, conf.RequestInfoResolver).ServeHTTP(w, r)
return
}
klog.Errorf("mc.crdRuntime unresolved cluster=%s path=%s", cid, r.URL.Path)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return
}
}
base.ServeHTTP(w, r)
})
return mc.WithClusterRouting(dispatch, ex, mcOpts)
}
// Install admission chain on apiextensions as well
{
Expand All @@ -309,7 +382,31 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
// Ensure aggregator also receives multicluster routing
aggregator.GenericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler {
ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment}
return mc.WithClusterRouting(server.DefaultBuildHandlerChain(h, conf), ex, mcOpts)
base := server.DefaultBuildHandlerChain(h, conf)
dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cid, _, _ := mc.FromContext(r.Context())
if r.Method == http.MethodGet && cid != "" && cid != mcOpts.DefaultCluster && crdRuntimeMgr != nil {
if group, version, ok := exactAPIsGroupVersionDiscoveryPath(r.URL.Path); ok {
served, err := crdRuntimeMgr.ServesGroupVersion(cid, group, version, genericConfig.DrainedNotify())
if err != nil {
klog.Errorf("mc.crdRuntime lookup failed at aggregator cluster=%s path=%s err=%v", cid, r.URL.Path, err)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return
}
if served {
if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil {
genericfilters.WithRequestInfo(h, conf.RequestInfoResolver).ServeHTTP(w, r)
return
}
klog.Errorf("mc.crdRuntime unresolved at aggregator cluster=%s path=%s", cid, r.URL.Path)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return
}
}
}
base.ServeHTTP(w, r)
})
return mc.WithClusterRouting(dispatch, ex, mcOpts)
}
// Install admission chain on aggregator
{
Expand Down Expand Up @@ -338,3 +435,27 @@ func decorateRESTOptionsGetter(server string, getter generic.RESTOptionsGetter,
klog.Infof("mc.restOptionsGetter server=%s decorated=%t", server, true)
return decorated
}

func apisGroupVersionFromPath(path string) (group, version string, ok bool) {
parts := strings.Split(strings.Trim(path, "/"), "/")
if len(parts) < 3 || parts[0] != "apis" || parts[1] == "" || parts[2] == "" {
return "", "", false
}
if parts[1] == "apiextensions.k8s.io" {
return "", "", false
}
return parts[1], parts[2], true
}

func exactAPIsGroupVersionDiscoveryPath(path string) (group, version string, ok bool) {
group, version, ok = apisGroupVersionFromPath(path)
if !ok {
return "", "", false
}
parts := strings.Split(strings.Trim(path, "/"), "/")
if len(parts) != 3 {
return "", "", false
}
return group, version, true
}

Loading