diff --git a/cmd/apiserver/app/config.go b/cmd/apiserver/app/config.go index 2576ee8..793dcad 100644 --- a/cmd/apiserver/app/config.go +++ b/cmd/apiserver/app/config.go @@ -313,11 +313,14 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { if apiExtensions.GenericConfig.RESTOptionsGetter != nil { apiExtensions.GenericConfig.RESTOptionsGetter = decorateRESTOptionsGetter("apiextensions", apiExtensions.GenericConfig.RESTOptionsGetter, mcOpts) } + apiExtensionsClientPool := mc.NewAPIExtensionsClientPool(apiExtensions.GenericConfig.LoopbackClientConfig, mcOpts.PathPrefix, mcOpts.ControlPlaneSegment) + apiExtensionsInformerPool := mc.NewAPIExtensionsInformerPoolFromClientPool(apiExtensionsClientPool, 0, genericConfig.DrainedNotify()) crdRuntimeMgr = mcbootstrap.NewCRDRuntimeManager(mcbootstrap.CRDRuntimeManagerOptions{ - BaseAPIExtensionsConfig: apiExtensions, - PathPrefix: mcOpts.PathPrefix, - ControlPlaneSegment: mcOpts.ControlPlaneSegment, - DefaultCluster: mcOpts.DefaultCluster, + BaseAPIExtensionsConfig: apiExtensions, + APIExtensionsInformerPool: apiExtensionsInformerPool, + PathPrefix: mcOpts.PathPrefix, + ControlPlaneSegment: mcOpts.ControlPlaneSegment, + DefaultCluster: mcOpts.DefaultCluster, }) prevOnClusterSelected := mcOpts.OnClusterSelected mcOpts.OnClusterSelected = func(clusterID string) { @@ -456,5 +459,3 @@ func apisGroupVersionFromPath(path string) (group, version string, ok bool) { } return parts[1], parts[2], true } - - diff --git a/pkg/multicluster/apiextensions_clientpool.go b/pkg/multicluster/apiextensions_clientpool.go new file mode 100644 index 0000000..3586be2 --- /dev/null +++ b/pkg/multicluster/apiextensions_clientpool.go @@ -0,0 +1,73 @@ +package multicluster + +import ( + "fmt" + "net/http" + "sync" + + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/client-go/rest" +) + +// APIExtensionsClientPool caches per-cluster apiextensions clients/transports. +type APIExtensionsClientPool struct { + base *rest.Config + pathPrefix string + controlPlaneSegment string + + mu sync.Mutex + clients map[string]*apiExtensionsClientEntry +} + +type apiExtensionsClientEntry struct { + clientset apiextensionsclient.Interface + httpClient *http.Client +} + +func NewAPIExtensionsClientPool(base *rest.Config, pathPrefix, controlPlaneSegment string) *APIExtensionsClientPool { + return &APIExtensionsClientPool{ + base: base, + pathPrefix: pathPrefix, + controlPlaneSegment: controlPlaneSegment, + clients: map[string]*apiExtensionsClientEntry{}, + } +} + +func (p *APIExtensionsClientPool) APIExtensionsClientForCluster(clusterID string) (apiextensionsclient.Interface, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if existing, ok := p.clients[clusterID]; ok { + return existing.clientset, nil + } + if p.base == nil { + return nil, fmt.Errorf("base loopback config is required") + } + + cfg := rest.CopyConfig(p.base) + host, err := ClusterHost(cfg.Host, Options{ + PathPrefix: p.pathPrefix, + ControlPlaneSegment: p.controlPlaneSegment, + }, clusterID) + if err != nil { + return nil, err + } + cfg.Host = host + + transport, err := rest.TransportFor(cfg) + if err != nil { + return nil, err + } + httpClient := &http.Client{Transport: transport} + + cs, err := apiextensionsclient.NewForConfigAndClient(cfg, httpClient) + if err != nil { + return nil, err + } + + p.clients[clusterID] = &apiExtensionsClientEntry{ + clientset: cs, + httpClient: httpClient, + } + return cs, nil +} diff --git a/pkg/multicluster/apiextensions_informerpool.go b/pkg/multicluster/apiextensions_informerpool.go new file mode 100644 index 0000000..8f902e1 --- /dev/null +++ b/pkg/multicluster/apiextensions_informerpool.go @@ -0,0 +1,108 @@ +package multicluster + +import ( + "fmt" + "sync" + "time" + + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" +) + +type APIExtensionsInformerPoolOptions struct { + ClientForCluster func(clusterID string) (apiextensionsclient.Interface, error) + ResyncPeriod time.Duration + StopCh <-chan struct{} + StartOnGet bool +} + +type APIExtensionsInformerPool struct { + opts APIExtensionsInformerPoolOptions + + mu sync.Mutex + clusters map[string]*apiExtensionsInformerEntry +} + +type apiExtensionsInformerEntry struct { + clientset apiextensionsclient.Interface + factory apiextensionsinformers.SharedInformerFactory + stopCh <-chan struct{} + ownedCh chan struct{} +} + +func NewAPIExtensionsInformerPool(opts APIExtensionsInformerPoolOptions) *APIExtensionsInformerPool { + if opts.StartOnGet == false { + // keep explicit + } else { + opts.StartOnGet = true + } + return &APIExtensionsInformerPool{ + opts: opts, + clusters: map[string]*apiExtensionsInformerEntry{}, + } +} + +func NewAPIExtensionsInformerPoolFromClientPool(pool *APIExtensionsClientPool, resync time.Duration, stopCh <-chan struct{}) *APIExtensionsInformerPool { + return NewAPIExtensionsInformerPool(APIExtensionsInformerPoolOptions{ + ClientForCluster: pool.APIExtensionsClientForCluster, + ResyncPeriod: resync, + StopCh: stopCh, + StartOnGet: true, + }) +} + +func (p *APIExtensionsInformerPool) Get(clusterID string) (apiextensionsclient.Interface, apiextensionsinformers.SharedInformerFactory, <-chan struct{}, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if entry, ok := p.clusters[clusterID]; ok { + if p.opts.StartOnGet { + entry.start() + } + return entry.clientset, entry.factory, entry.stopCh, nil + } + if p.opts.ClientForCluster == nil { + return nil, nil, nil, ErrMissingAPIExtensionsClientFactory + } + cs, err := p.opts.ClientForCluster(clusterID) + if err != nil { + return nil, nil, nil, err + } + factory := apiextensionsinformers.NewSharedInformerFactory(cs, p.opts.ResyncPeriod) + stopCh := p.opts.StopCh + var ownedCh chan struct{} + if stopCh == nil { + ownedCh = make(chan struct{}) + stopCh = ownedCh + } + entry := &apiExtensionsInformerEntry{ + clientset: cs, + factory: factory, + stopCh: stopCh, + ownedCh: ownedCh, + } + p.clusters[clusterID] = entry + if p.opts.StartOnGet { + entry.start() + } + return entry.clientset, entry.factory, entry.stopCh, nil +} + +func (e *apiExtensionsInformerEntry) start() { + e.factory.Start(e.stopCh) +} + +func (p *APIExtensionsInformerPool) StopCluster(clusterID string) { + p.mu.Lock() + defer p.mu.Unlock() + entry, ok := p.clusters[clusterID] + if !ok { + return + } + if entry.ownedCh != nil { + close(entry.ownedCh) + } + delete(p.clusters, clusterID) +} + +var ErrMissingAPIExtensionsClientFactory = fmt.Errorf("missing apiextensions client factory for informer pool") diff --git a/pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go b/pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go index eea6e23..3ad5841 100644 --- a/pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go +++ b/pkg/multicluster/bootstrap/crd_runtime_manager_wrapped.go @@ -14,6 +14,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/server" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" @@ -22,8 +23,6 @@ import ( const ( servesLookupTimeout = 2 * time.Second - servesCacheTTL = 5 * time.Second - watchRetryBackoff = 500 * time.Millisecond ) var ( @@ -36,16 +35,12 @@ var ( ) type CRDRuntimeManagerOptions struct { - BaseAPIExtensionsConfig *apiextensionsapiserver.Config - PathPrefix string - ControlPlaneSegment string - DefaultCluster string - Delegate http.Handler -} - -type servesCacheEntry struct { - served bool - exp time.Time + BaseAPIExtensionsConfig *apiextensionsapiserver.Config + APIExtensionsInformerPool *mc.APIExtensionsInformerPool + PathPrefix string + ControlPlaneSegment string + DefaultCluster string + Delegate http.Handler } type runtimeEntry struct { @@ -56,19 +51,25 @@ type runtimeEntry struct { type clusterState struct { r runtimeEntry - c *apiextensionsclient.Clientset + c apiextensionsclient.Interface } type CRDRuntimeManager struct { opts CRDRuntimeManagerOptions - mu sync.Mutex - runtimes map[string]runtimeEntry - clients map[string]*apiextensionsclient.Clientset - started map[string]bool - cache map[string]servesCacheEntry - clusterKeys map[string]map[string]struct{} - createSF singleflight.Group + mu sync.Mutex + + runtimes map[string]runtimeEntry + clients map[string]apiextensionsclient.Interface + createSF singleflight.Group + + // Informer-backed serves index state. + informerStarted map[string]bool + clusterSynced map[string]bool + serves map[string]bool + clusterKeys map[string]map[string]struct{} + crdKeys map[string]map[string][]string + informerSF singleflight.Group } func NewCRDRuntimeManager(opts CRDRuntimeManagerOptions) *CRDRuntimeManager { @@ -76,12 +77,14 @@ func NewCRDRuntimeManager(opts CRDRuntimeManagerOptions) *CRDRuntimeManager { legacyregistry.MustRegister(crdRuntimeCreateTotal, crdServesLookupTotal, crdServesCacheHit, crdServesCacheMiss, crdServesLookupLat) }) return &CRDRuntimeManager{ - opts: opts, - runtimes: map[string]runtimeEntry{}, - clients: map[string]*apiextensionsclient.Clientset{}, - started: map[string]bool{}, - cache: map[string]servesCacheEntry{}, - clusterKeys: map[string]map[string]struct{}{}, + opts: opts, + runtimes: map[string]runtimeEntry{}, + clients: map[string]apiextensionsclient.Interface{}, + informerStarted: map[string]bool{}, + clusterSynced: map[string]bool{}, + serves: map[string]bool{}, + clusterKeys: map[string]map[string]struct{}{}, + crdKeys: map[string]map[string][]string{}, } } @@ -102,7 +105,7 @@ func (m *CRDRuntimeManager) ServesGroupVersion(clusterID, group, version string, } start := time.Now() key := clusterID + "\x00" + group + "\x00" + version - if served, ok := m.getCache(key); ok { + if served, ok := m.lookupFromInformerIndex(clusterID, key); ok { r := result(served) crdServesCacheHit.WithLabelValues(r).Inc() crdServesLookupTotal.WithLabelValues(r).Inc() @@ -111,12 +114,25 @@ func (m *CRDRuntimeManager) ServesGroupVersion(clusterID, group, version string, } crdServesCacheMiss.WithLabelValues("miss").Inc() + // Prefer shared informer-backed state for served checks. + if err := m.ensureInformerState(clusterID, stopCh); err == nil { + if served, ok := m.lookupFromInformerIndex(clusterID, key); ok { + r := result(served) + crdServesCacheHit.WithLabelValues(r).Inc() + crdServesLookupTotal.WithLabelValues(r).Inc() + crdServesLookupLat.WithLabelValues(r).Observe(time.Since(start).Seconds()) + return served, nil + } + } + + // Fallback to direct API list if informer state is unavailable. state, err := m.ensureClusterState(clusterID, stopCh) if err != nil { crdServesLookupTotal.WithLabelValues("error").Inc() crdServesLookupLat.WithLabelValues("error").Observe(time.Since(start).Seconds()) return false, err } + ctx, cancel := context.WithTimeout(context.Background(), servesLookupTimeout) defer cancel() list, err := state.c.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) @@ -142,7 +158,6 @@ func (m *CRDRuntimeManager) ServesGroupVersion(clusterID, group, version string, break } } - m.setCache(clusterID, key, served) r := result(served) crdServesLookupTotal.WithLabelValues(r).Inc() crdServesLookupLat.WithLabelValues(r).Observe(time.Since(start).Seconds()) @@ -222,10 +237,6 @@ func (m *CRDRuntimeManager) ensureClusterState(clusterID string, stopCh <-chan s m.mu.Lock() m.runtimes[clusterID] = entry m.clients[clusterID] = cs - if !m.started[clusterID] { - m.started[clusterID] = true - go m.watchUpdates(clusterID, cs, stopCh) - } m.mu.Unlock() crdRuntimeCreateTotal.WithLabelValues("success").Inc() @@ -241,79 +252,182 @@ func (m *CRDRuntimeManager) ensureClusterState(clusterID string, stopCh <-chan s return state, nil } -func (m *CRDRuntimeManager) watchUpdates(clusterID string, cs *apiextensionsclient.Clientset, stopCh <-chan struct{}) { - for { - select { - case <-stopCh: - return - default: +func (m *CRDRuntimeManager) lookupFromInformerIndex(clusterID, key string) (bool, bool) { + m.mu.Lock() + defer m.mu.Unlock() + if !m.clusterSynced[clusterID] { + return false, false + } + _, served := m.serves[key] + return served, true +} + +func (m *CRDRuntimeManager) ensureInformerState(clusterID string, stopCh <-chan struct{}) error { + if m.opts.APIExtensionsInformerPool == nil { + return fmt.Errorf("apiextensions informer pool not configured") + } + m.mu.Lock() + if m.informerStarted[clusterID] && m.clusterSynced[clusterID] { + m.mu.Unlock() + return nil + } + m.mu.Unlock() + + _, err, _ := m.informerSF.Do(clusterID, func() (any, error) { + m.mu.Lock() + if m.informerStarted[clusterID] && m.clusterSynced[clusterID] { + m.mu.Unlock() + return nil, nil } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - w, err := cs.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{AllowWatchBookmarks: true, ResourceVersion: "0"}) + m.mu.Unlock() + + cs, factory, poolStopCh, err := m.opts.APIExtensionsInformerPool.Get(clusterID) if err != nil { - cancel() - select { - case <-stopCh: - return - case <-time.After(watchRetryBackoff): - continue - } + return nil, err } - m.invalidateCluster(clusterID) - closed := false - for !closed { - select { - case <-stopCh: - w.Stop() - cancel() - return - case _, ok := <-w.ResultChan(): - if !ok { - closed = true - break - } - m.invalidateCluster(clusterID) - } + crdInformer := factory.Apiextensions().V1().CustomResourceDefinitions().Informer() + _, err = crdInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + m.onCRDUpsert(clusterID, obj) + }, + UpdateFunc: func(_, newObj interface{}) { + m.onCRDUpsert(clusterID, newObj) + }, + DeleteFunc: func(obj interface{}) { + m.onCRDDelete(clusterID, obj) + }, + }) + if err != nil { + return nil, err } - w.Stop() - cancel() - select { - case <-stopCh: - return - case <-time.After(watchRetryBackoff): + + stop := poolStopCh + if stop == nil { + stop = stopCh } - } + if stop == nil { + return nil, fmt.Errorf("missing stop channel for apiextensions informer") + } + factory.Start(stop) + if !cache.WaitForCacheSync(stop, crdInformer.HasSynced) { + return nil, fmt.Errorf("failed waiting for CRD informer sync for cluster=%s", clusterID) + } + + m.rebuildClusterIndex(clusterID, crdInformer.GetStore().List()) + + m.mu.Lock() + m.informerStarted[clusterID] = true + m.clusterSynced[clusterID] = true + if _, ok := m.clients[clusterID]; !ok { + m.clients[clusterID] = cs + } + m.mu.Unlock() + return nil, nil + }) + return err } -func (m *CRDRuntimeManager) getCache(key string) (bool, bool) { - now := time.Now() +func (m *CRDRuntimeManager) rebuildClusterIndex(clusterID string, objs []interface{}) { m.mu.Lock() defer m.mu.Unlock() - v, ok := m.cache[key] - if !ok || now.After(v.exp) { - delete(m.cache, key) - return false, false + + for k := range m.clusterKeys[clusterID] { + delete(m.serves, k) + } + m.clusterKeys[clusterID] = map[string]struct{}{} + m.crdKeys[clusterID] = map[string][]string{} + + for _, obj := range objs { + crd, ok := crdFromObj(obj) + if !ok { + continue + } + keys := servedKeysForCRD(clusterID, crd) + m.setCRDKeysLocked(clusterID, crd.Name, keys) } - return v.served, true } -func (m *CRDRuntimeManager) setCache(clusterID, key string, served bool) { +func (m *CRDRuntimeManager) onCRDUpsert(clusterID string, obj interface{}) { + crd, ok := crdFromObj(obj) + if !ok { + return + } + keys := servedKeysForCRD(clusterID, crd) + m.mu.Lock() + defer m.mu.Unlock() + m.setCRDKeysLocked(clusterID, crd.Name, keys) +} + +func (m *CRDRuntimeManager) onCRDDelete(clusterID string, obj interface{}) { + crd, ok := crdFromObj(obj) + if !ok { + return + } m.mu.Lock() defer m.mu.Unlock() - m.cache[key] = servesCacheEntry{served: served, exp: time.Now().Add(servesCacheTTL)} + m.setCRDKeysLocked(clusterID, crd.Name, nil) +} + +func (m *CRDRuntimeManager) setCRDKeysLocked(clusterID, crdName string, keys []string) { if m.clusterKeys[clusterID] == nil { m.clusterKeys[clusterID] = map[string]struct{}{} } - m.clusterKeys[clusterID][key] = struct{}{} + if m.crdKeys[clusterID] == nil { + m.crdKeys[clusterID] = map[string][]string{} + } + for _, old := range m.crdKeys[clusterID][crdName] { + delete(m.serves, old) + delete(m.clusterKeys[clusterID], old) + } + if len(keys) == 0 { + delete(m.crdKeys[clusterID], crdName) + return + } + m.crdKeys[clusterID][crdName] = keys + for _, k := range keys { + m.serves[k] = true + m.clusterKeys[clusterID][k] = struct{}{} + } +} + +func servedKeysForCRD(clusterID string, crd *apiextensionsv1.CustomResourceDefinition) []string { + if crd == nil || !isCRDEstablished(crd) { + return nil + } + keys := make([]string, 0, len(crd.Spec.Versions)) + for _, v := range crd.Spec.Versions { + if !v.Served { + continue + } + keys = append(keys, clusterID+"\x00"+crd.Spec.Group+"\x00"+v.Name) + } + return keys +} + +func crdFromObj(obj interface{}) (*apiextensionsv1.CustomResourceDefinition, bool) { + if obj == nil { + return nil, false + } + if crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition); ok { + return crd, true + } + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if crd, ok := tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition); ok { + return crd, true + } + } + return nil, false } func (m *CRDRuntimeManager) invalidateCluster(clusterID string) { m.mu.Lock() defer m.mu.Unlock() for k := range m.clusterKeys[clusterID] { - delete(m.cache, k) + delete(m.serves, k) } delete(m.clusterKeys, clusterID) + delete(m.crdKeys, clusterID) + delete(m.clusterSynced, clusterID) } func isCRDEstablished(crd *apiextensionsv1.CustomResourceDefinition) bool {