Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 38 additions & 42 deletions cmd/apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@
ControlPlaneSegment: mcOpts.ControlPlaneSegment,
DefaultCluster: mcOpts.DefaultCluster,
})
apiExtensions.ExtraConfig.CRDGetter = crdRuntimeMgr.CRDGetterForRequest

Check failure on line 325 in cmd/apiserver/app/config.go

View workflow job for this annotation

GitHub Actions / lint-test

apiExtensions.ExtraConfig.CRDGetter undefined (type "k8s.io/apiextensions-apiserver/pkg/apiserver".ExtraConfig has no field or method CRDGetter)
apiExtensions.ExtraConfig.CRDListerForRequest = crdRuntimeMgr.CRDListerForRequest

Check failure on line 326 in cmd/apiserver/app/config.go

View workflow job for this annotation

GitHub Actions / lint-test

apiExtensions.ExtraConfig.CRDListerForRequest undefined (type "k8s.io/apiextensions-apiserver/pkg/apiserver".ExtraConfig has no field or method CRDListerForRequest)
crdController := mcbootstrap.NewMulticlusterCRDController(crdRuntimeMgr, mcOpts.DefaultCluster)
crdController.Start(genericConfig.DrainedNotify())
prevOnClusterSelected := mcOpts.OnClusterSelected
mcOpts.OnClusterSelected = func(clusterID string) {
if prevOnClusterSelected != nil {
Expand All @@ -330,7 +334,37 @@
if clusterID == "" || clusterID == mcOpts.DefaultCluster {
return
}
_, _ = crdRuntimeMgr.Runtime(clusterID, genericConfig.DrainedNotify())
crdController.EnsureCluster(clusterID)
}
serveClusterCRD := func(w http.ResponseWriter, r *http.Request, conf *server.Config, clusterID, caller string) bool {
group, version, ok := apisGroupVersionFromPath(r.URL.Path)
if !ok {
return false
}

served, err := crdRuntimeMgr.ServesGroupVersion(clusterID, group, version, genericConfig.DrainedNotify())
if err != nil {
klog.Errorf("mc.crdRuntime lookup failed at %s cluster=%s path=%s err=%v", caller, clusterID, r.URL.Path, err)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return true
}
if !served {
return false
}

h, err := crdRuntimeMgr.Runtime(clusterID, genericConfig.DrainedNotify())
if err != nil || h == nil {
klog.Errorf("mc.crdRuntime unresolved at %s cluster=%s path=%s err=%v", caller, clusterID, r.URL.Path, err)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return true
}
// Ensure RequestInfo is computed from the normalized /apis path
// before entering the cluster-scoped CRD runtime handler.
h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver)
h = genericfilters.WithAuditInit(h)
h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver)
h.ServeHTTP(w, r)
return true
}
// Ensure CRDs are also routed through the multicluster handler
apiExtensions.GenericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler {
Expand All @@ -339,28 +373,7 @@
dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cid, _, _ := mc.FromContext(r.Context())
if cid != "" && cid != mcOpts.DefaultCluster {
if group, version, ok := apisGroupVersionFromPath(r.URL.Path); ok {
served, err := crdRuntimeMgr.ServesGroupVersion(cid, group, version, genericConfig.DrainedNotify())
if err != nil {
klog.Errorf("mc.crdRuntime lookup failed at apiextensions cluster=%s path=%s err=%v", cid, r.URL.Path, err)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return
}
if !served {
base.ServeHTTP(w, r)
return
}
if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil {
// Ensure RequestInfo is computed from the normalized /apis path
// before entering the cluster-scoped CRD runtime handler.
h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver)
h = genericfilters.WithAuditInit(h)
h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver)
h.ServeHTTP(w, r)
return
}
klog.Errorf("mc.crdRuntime unresolved cluster=%s path=%s", cid, r.URL.Path)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
if serveClusterCRD(w, r, conf, cid, "apiextensions") {
return
}
}
Expand Down Expand Up @@ -396,25 +409,8 @@
dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cid, _, _ := mc.FromContext(r.Context())
if cid != "" && cid != mcOpts.DefaultCluster && crdRuntimeMgr != nil {
if group, version, ok := apisGroupVersionFromPath(r.URL.Path); ok {
served, err := crdRuntimeMgr.ServesGroupVersion(cid, group, version, genericConfig.DrainedNotify())
if err != nil {
klog.Errorf("mc.crdRuntime lookup failed at aggregator cluster=%s path=%s err=%v", cid, r.URL.Path, err)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return
}
if served {
if h, err := crdRuntimeMgr.Runtime(cid, genericConfig.DrainedNotify()); err == nil && h != nil {
h = genericfilters.WithRequestInfo(h, conf.RequestInfoResolver)
h = genericfilters.WithAuditInit(h)
h = serverfilters.WithPanicRecovery(h, conf.RequestInfoResolver)
h.ServeHTTP(w, r)
return
}
klog.Errorf("mc.crdRuntime unresolved at aggregator cluster=%s path=%s", cid, r.URL.Path)
http.Error(w, "cluster CRD runtime unavailable", http.StatusServiceUnavailable)
return
}
if serveClusterCRD(w, r, conf, cid, "aggregator") {
return
}
}
base.ServeHTTP(w, r)
Expand Down
80 changes: 71 additions & 9 deletions pkg/multicluster/admission/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package namespace
import (
"sync"

"github.com/kplane-dev/apiserver/pkg/multicluster/scopedinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -31,10 +32,16 @@ type Manager struct {

mu sync.Mutex
clusters map[string]*clusterEnv

sharedOnce sync.Once
sharedErr error
shared informers.SharedInformerFactory
sharedStop <-chan struct{}
sharedOwn chan struct{}
}

type clusterEnv struct {
stopCh <-chan struct{}
stopCh chan struct{}
cid string

clientset kubernetes.Interface
Expand All @@ -45,9 +52,6 @@ func NewManager(opts Options) *Manager {
if opts.ClientPool == nil && opts.BaseLoopbackClientConfig != nil {
opts.ClientPool = mc.NewClientPool(opts.BaseLoopbackClientConfig, opts.PathPrefix, opts.ControlPlaneSegment)
}
if opts.InformerPool == nil && opts.ClientPool != nil {
opts.InformerPool = mc.NewInformerPoolFromClientPool(opts.ClientPool, 0, nil)
}
return &Manager{
opts: opts,
clusters: map[string]*clusterEnv{},
Expand All @@ -62,24 +66,82 @@ func (m *Manager) envForCluster(clusterID string) (*clusterEnv, error) {
return e, nil
}

if m.opts.InformerPool == nil {
if m.opts.ClientPool == nil {
return nil, mc.ErrMissingClientFactory
}
cs, inf, stopCh, err := m.opts.InformerPool.Get(clusterID)
cs, err := m.opts.ClientPool.KubeClientForCluster(clusterID)
if err != nil {
return nil, err
}
scoped, err := m.scopedNamespaceFactory(clusterID)
if err != nil {
return nil, err
}
stopCh := make(chan struct{})

e := &clusterEnv{
cid: clusterID,
stopCh: stopCh,
clientset: cs,
informers: inf,
informers: scoped,
}

// Warm the namespaces informer (used by NamespaceLifecycle).
_ = inf.Core().V1().Namespaces().Informer()
inf.Start(stopCh)
_ = scoped.Core().V1().Namespaces().Informer()
scoped.Start(stopCh)
m.clusters[clusterID] = e
return e, nil
}

func (m *Manager) scopedNamespaceFactory(clusterID string) (informers.SharedInformerFactory, error) {
shared, err := m.ensureSharedFactory()
if err != nil {
return nil, err
}
return newScopedFactory(clusterID, mc.DefaultClusterAnnotation, shared), nil
}

func (m *Manager) ensureSharedFactory() (informers.SharedInformerFactory, error) {
m.sharedOnce.Do(func() {
if m.opts.BaseLoopbackClientConfig == nil {
m.sharedErr = mc.ErrMissingClientFactory
return
}
cs, err := scopedinformer.NewAllClustersKubeClient(m.opts.BaseLoopbackClientConfig)
if err != nil {
m.sharedErr = err
return
}
factory := informers.NewSharedInformerFactory(cs, 0)
if err := factory.Core().V1().Namespaces().Informer().SetTransform(transformNamespaceForShared(mc.DefaultClusterAnnotation)); err != nil {
m.sharedErr = err
return
}
if err := scopedinformer.EnsureClusterIndex(factory.Core().V1().Namespaces().Informer(), mc.DefaultClusterAnnotation); err != nil {
m.sharedErr = err
return
}
if m.sharedStop == nil {
m.sharedOwn = make(chan struct{})
m.sharedStop = m.sharedOwn
}
factory.Start(m.sharedStop)
m.shared = factory
})
if m.sharedErr != nil {
return nil, m.sharedErr
}
return m.shared, nil
}

// StopCluster is test-oriented cleanup; production can leave informers running.
func (m *Manager) StopCluster(clusterID string) {
m.mu.Lock()
defer m.mu.Unlock()
if e, ok := m.clusters[clusterID]; ok {
if e.stopCh != nil {
close(e.stopCh)
}
delete(m.clusters, clusterID)
}
}
Loading
Loading