Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ coverage.*
*.coverprofile
profile.cov

# Profiling artifacts
dev/profiles/

# Dependency directories (remove the comment below to include it)
# vendor/

Expand Down
27 changes: 6 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,29 +147,14 @@ keyFunc := func(obj runtime.Object) (string, error) {
}
```

#### Admission: server-owned cluster label
We stamp a server-owned cluster label on persisted objects and validate it on
create/update to prevent cross-cluster writes and support watchcache keying.
#### Admission: internal cluster identity
We store cluster identity in managed fields and validate it on create/update
to prevent cross-cluster writes and support watchcache keying.

```go
lbls := accessor.GetLabels()
if lbls == nil {
lbls = map[string]string{}
}
key := m.Options.ClusterAnnotationKey
if key == "" {
key = mcv1.DefaultClusterAnnotation
}
lbls[key] = cid
accessor.SetLabels(lbls)
```
Note: the `ClusterAnnotationKey` fallback to `DefaultClusterAnnotation` is
temporary and not ideal. It exists to preserve compatibility while we migrate
callers to an explicit label key; avoid relying on this fallback long term.

```go
if cid := acc.GetLabels()[key]; cid != reqCID {
return fmt.Errorf("cluster label %q=%q must match request cluster %q", key, cid, reqCID)
mc.SetObjectClusterIdentity(obj, reqCID)
if cid := mc.ObjectClusterIdentity(obj); cid != reqCID {
return fmt.Errorf("cluster identity %q must match request cluster %q", cid, reqCID)
}
```

Expand Down
80 changes: 45 additions & 35 deletions cmd/apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
mcOpts.DefaultCluster = opts.RootControlPlaneName
}
clientPool := mc.NewClientPool(genericConfig.LoopbackClientConfig, mcOpts.PathPrefix, mcOpts.ControlPlaneSegment)
informerPool := mc.NewInformerPoolFromClientPool(clientPool, 0, genericConfig.DrainedNotify())
informerRegistry := mc.NewInformerRegistry(wait.ContextForChannel(genericConfig.DrainedNotify()))
mcOpts.InformerRegistry = informerRegistry
var crdRuntimeMgr *mcbootstrap.CRDRuntimeManager
systemNamespaceBootstrapper := mcbootstrap.NewSystemNamespaceBootstrapper(mcbootstrap.SystemNamespaceOptions{
ClientForCluster: clientPool.KubeClientForCluster,
Expand All @@ -146,7 +147,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
})
genericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler {
ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment}
base := server.DefaultBuildHandlerChain(h, conf)
base := withVersionOverride(server.DefaultBuildHandlerChain(h, conf))
dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cid, _, _ := mc.FromContext(r.Context())
if cid != "" && cid != mcOpts.DefaultCluster && crdRuntimeMgr != nil {
Expand All @@ -162,10 +163,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
return
}
if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil {
h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver)
h = genericfilters.WithAuditInit(h)
h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver)
h.ServeHTTP(w, r)
wrapClusterCRDHandler(h, conf, cid, false).ServeHTTP(w, r)
return
}
klog.Errorf("mc.crdRuntime unresolved at kube cluster=%s path=%s", cid, r.URL.Path)
Expand All @@ -187,13 +185,15 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
EgressSelector: genericConfig.EgressSelector,
APIServerID: genericConfig.APIServerID,
ClientPool: clientPool,
InformerPool: informerPool,
InformerRegistry: informerRegistry,
})
if genericConfig.Authentication.Authenticator != nil {
genericConfig.Authentication.Authenticator = mcauth.NewClusterAuthenticator(mcOpts.DefaultCluster, genericConfig.Authentication.Authenticator, authManager)
}
if genericConfig.Authorization.Authorizer != nil {
clusterAuthorizer := mcauth.NewClusterAuthorizer(mcOpts.DefaultCluster, genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, authManager)
// Route root and tenant authorization through the same multicluster
// manager-backed path to avoid root-only stale lister divergence.
clusterAuthorizer := mcauth.NewClusterAuthorizer(mcOpts.DefaultCluster, nil, nil, authManager)
genericConfig.Authorization.Authorizer = clusterAuthorizer
genericConfig.RuleResolver = clusterAuthorizer
}
Expand All @@ -209,7 +209,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
PathPrefix: mcOpts.PathPrefix,
ControlPlaneSegment: mcOpts.ControlPlaneSegment,
ClientPool: clientPool,
InformerPool: informerPool,
InformerRegistry: informerRegistry,
})
mcNamespaceLifecycle := mcnsl.NewLifecycle(mcOpts, mcNamespaceMgr)

Expand Down Expand Up @@ -239,11 +239,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
targetPort = opts.SecureServing.BindPort
}
stopChForCluster := func(clusterID string) (<-chan struct{}, error) {
_, _, stopCh, err := informerPool.Get(clusterID)
if err != nil {
return nil, err
}
return stopCh, nil
return genericConfig.DrainedNotify(), nil
}
internalControllerMgr := mcbootstrap.NewInternalControllerManager(mcbootstrap.InternalControllerOptions{
ClientForCluster: clientPool.KubeClientForCluster,
Expand Down Expand Up @@ -291,7 +287,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
ControlPlaneSegment: mcOpts.ControlPlaneSegment,
CelRuntime: celRuntime,
ClientPool: clientPool,
InformerPool: informerPool,
InformerRegistry: informerRegistry,
})
mcMutatingWebhook := mcwh.NewMutating(mcOpts, mcWebhookMgr)
mcValidatingWebhook := mcwh.NewValidating(mcOpts, mcWebhookMgr)
Expand All @@ -317,14 +313,15 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
if apiExtensions.GenericConfig.RESTOptionsGetter != nil {
apiExtensions.GenericConfig.RESTOptionsGetter = decorateRESTOptionsGetter("apiextensions", apiExtensions.GenericConfig.RESTOptionsGetter, mcOpts)
}
apiExtensionsClientPool := mc.NewAPIExtensionsClientPool(apiExtensions.GenericConfig.LoopbackClientConfig, mcOpts.PathPrefix, mcOpts.ControlPlaneSegment)
apiExtensionsInformerPool := mc.NewAPIExtensionsInformerPoolFromClientPool(apiExtensionsClientPool, 0, genericConfig.DrainedNotify())
if apiExtensions.ExtraConfig.CRDRESTOptionsGetter != nil {
apiExtensions.ExtraConfig.CRDRESTOptionsGetter = decorateRESTOptionsGetter("apiextensions-crd", apiExtensions.ExtraConfig.CRDRESTOptionsGetter, mcOpts)
}
crdRuntimeMgr = mcbootstrap.NewCRDRuntimeManager(mcbootstrap.CRDRuntimeManagerOptions{
BaseAPIExtensionsConfig: apiExtensions,
APIExtensionsInformerPool: apiExtensionsInformerPool,
PathPrefix: mcOpts.PathPrefix,
ControlPlaneSegment: mcOpts.ControlPlaneSegment,
DefaultCluster: mcOpts.DefaultCluster,
BaseAPIExtensionsConfig: apiExtensions,
InformerRegistry: informerRegistry,
PathPrefix: mcOpts.PathPrefix,
ControlPlaneSegment: mcOpts.ControlPlaneSegment,
DefaultCluster: mcOpts.DefaultCluster,
})
setCRDRequestResolvers(apiExtensions, crdRuntimeMgr.CRDGetterForRequest, crdRuntimeMgr.CRDListerForRequest)
crdController := mcbootstrap.NewMulticlusterCRDController(crdRuntimeMgr, mcOpts.DefaultCluster)
Expand Down Expand Up @@ -361,19 +358,13 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return true
}
// Ensure RequestInfo is computed from the normalized /apis path
// before entering the cluster-scoped CRD runtime handler.
h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver)
h = withClusterCRDRequestInfoRewrite(h, clusterID)
h = genericfilters.WithAuditInit(h)
h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver)
h.ServeHTTP(w, r)
wrapClusterCRDHandler(h, conf, clusterID, false).ServeHTTP(w, r)
return true
}
// Ensure CRDs are also routed through the multicluster handler
apiExtensions.GenericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler {
ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment}
base := server.DefaultBuildHandlerChain(h, conf)
base := withVersionOverride(server.DefaultBuildHandlerChain(h, conf))
dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cid, _, _ := mc.FromContext(r.Context())
if cid != "" && cid != mcOpts.DefaultCluster {
Expand All @@ -390,11 +381,11 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
mut := mca.NewMutating(mcOpts)
val := mca.NewValidating(mcOpts)
base := apiExtensions.GenericConfig.AdmissionControl
chain := []admission.Interface{mut, mcNamespaceLifecycle, mcMutatingWebhook}
chain := []admission.Interface{mut, mcNamespaceLifecycle}
if base != nil {
chain = append(chain, base)
}
chain = append(chain, mcValidatingWebhook, val)
chain = append(chain, val)
apiExtensions.GenericConfig.AdmissionControl = admission.NewChainHandler(chain...)
}
c.ApiExtensions = apiExtensions
Expand All @@ -409,7 +400,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
// Ensure aggregator also receives multicluster routing
aggregator.GenericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler {
ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment}
base := server.DefaultBuildHandlerChain(h, conf)
base := withVersionOverride(server.DefaultBuildHandlerChain(h, conf))
dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cid, _, _ := mc.FromContext(r.Context())
if cid != "" && cid != mcOpts.DefaultCluster && crdRuntimeMgr != nil {
Expand All @@ -426,11 +417,11 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
mut := mca.NewMutating(mcOpts)
val := mca.NewValidating(mcOpts)
base := aggregator.GenericConfig.AdmissionControl
chain := []admission.Interface{mut, mcNamespaceLifecycle, mcMutatingWebhook}
chain := []admission.Interface{mut, mcNamespaceLifecycle}
if base != nil {
chain = append(chain, base)
}
chain = append(chain, mcValidatingWebhook, val)
chain = append(chain, val)
aggregator.GenericConfig.AdmissionControl = admission.NewChainHandler(chain...)
}
c.Aggregator = aggregator
Expand Down Expand Up @@ -480,6 +471,25 @@ func withClusterCRDRequestInfoRewrite(next http.Handler, clusterID string) http.
})
}

func wrapClusterCRDHandler(next http.Handler, conf *server.Config, clusterID string, rewriteRequestInfo bool) http.Handler {
if next == nil || conf == nil {
return next
}
h := next
if rewriteRequestInfo {
h = withClusterCRDRequestInfoRewrite(h, clusterID)
}
h = genericfilters.WithAuthorization(h, conf.Authorization.Authorizer, conf.Serializer)
failedHandler := genericfilters.Unauthorized(conf.Serializer)
failedHandler = genericfilters.WithFailedAuthenticationAudit(failedHandler, conf.AuditBackend, conf.AuditPolicyRuleEvaluator)
h = genericfilters.WithAuthentication(h, conf.Authentication.Authenticator, failedHandler, conf.Authentication.APIAudiences, conf.Authentication.RequestHeaderConfig)
// RequestInfo must be available before rewrite/authn/authz wrappers execute.
h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver)
h = genericfilters.WithAuditInit(h)
h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver)
return h
}

func decorateRESTOptionsGetter(server string, getter generic.RESTOptionsGetter, opts mc.Options) generic.RESTOptionsGetter {
if _, ok := getter.(mc.RESTOptionsDecorator); ok {
klog.Infof("mc.restOptionsGetter server=%s alreadyDecorated=true", server)
Expand Down
5 changes: 2 additions & 3 deletions cmd/apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ import (
logsapi "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/metrics/prometheus/workqueue"
"k8s.io/component-base/term"
utilversion "k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
"k8s.io/component-base/zpages/flagz"
"k8s.io/apiserver/pkg/server/flagz"
"k8s.io/klog/v2"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/capabilities"
Expand Down Expand Up @@ -148,7 +147,7 @@ cluster's shared state through which all other components interact.`,
// Run runs the specified APIServer. This should never exit.
func Run(ctx context.Context, opts options.CompletedOptions) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", utilversion.Get())
klog.Infof("Version: %+v", normalizedServerVersion())

klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

Expand Down
51 changes: 51 additions & 0 deletions cmd/apiserver/app/version_override.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package app

import (
"encoding/json"
"net/http"
"strings"

apimachineryversion "k8s.io/apimachinery/pkg/version"
utilversion "k8s.io/component-base/version"
)

const gitVersionArchivePlaceholder = "v0.0.0-master+$Format:%H$"

func withVersionOverride(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if isVersionPath(r.URL.Path) {
info := normalizedServerVersion()
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(info)
return
}
next.ServeHTTP(w, r)
})
}

func isVersionPath(path string) bool {
if path == "/version" || path == "/version/" {
return true
}
trimmed := strings.Trim(path, "/")
parts := strings.Split(trimmed, "/")
return len(parts) == 3 && parts[0] == "clusters" && parts[2] == "version"
}

func normalizedServerVersion() apimachineryversion.Info {
info := utilversion.Get()
if info.GitVersion != gitVersionArchivePlaceholder {
return info
}

parts := strings.SplitN(utilversion.DefaultKubeBinaryVersion, ".", 2)
if len(parts) != 2 {
return info
}

info.Major = parts[0]
info.Minor = parts[1]
info.GitVersion = "v" + utilversion.DefaultKubeBinaryVersion + ".0"
return info
}

30 changes: 30 additions & 0 deletions cmd/apiserver/app/version_override_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package app

import "testing"

func TestIsVersionPath(t *testing.T) {
tests := []struct {
path string
want bool
}{
{path: "/version", want: true},
{path: "/version/", want: true},
{path: "/clusters/kpt-1/version", want: true},
{path: "/clusters/kpt-1/version/", want: true},
{path: "/apis", want: false},
}

for _, tt := range tests {
if got := isVersionPath(tt.path); got != tt.want {
t.Fatalf("isVersionPath(%q)=%v, want %v", tt.path, got, tt.want)
}
}
}

func TestNormalizedServerVersion(t *testing.T) {
info := normalizedServerVersion()
if info.GitVersion == gitVersionArchivePlaceholder {
t.Fatalf("expected non-placeholder git version, got %q", info.GitVersion)
}
}

Loading
Loading