Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions cmd/apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,14 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
if apiExtensions.GenericConfig.RESTOptionsGetter != nil {
apiExtensions.GenericConfig.RESTOptionsGetter = decorateRESTOptionsGetter("apiextensions", apiExtensions.GenericConfig.RESTOptionsGetter, mcOpts)
}
apiExtensionsClientPool := mc.NewAPIExtensionsClientPool(apiExtensions.GenericConfig.LoopbackClientConfig, mcOpts.PathPrefix, mcOpts.ControlPlaneSegment)
apiExtensionsInformerPool := mc.NewAPIExtensionsInformerPoolFromClientPool(apiExtensionsClientPool, 0, genericConfig.DrainedNotify())
crdRuntimeMgr = mcbootstrap.NewCRDRuntimeManager(mcbootstrap.CRDRuntimeManagerOptions{
BaseAPIExtensionsConfig: apiExtensions,
PathPrefix: mcOpts.PathPrefix,
ControlPlaneSegment: mcOpts.ControlPlaneSegment,
DefaultCluster: mcOpts.DefaultCluster,
BaseAPIExtensionsConfig: apiExtensions,
APIExtensionsInformerPool: apiExtensionsInformerPool,
PathPrefix: mcOpts.PathPrefix,
ControlPlaneSegment: mcOpts.ControlPlaneSegment,
DefaultCluster: mcOpts.DefaultCluster,
})
prevOnClusterSelected := mcOpts.OnClusterSelected
mcOpts.OnClusterSelected = func(clusterID string) {
Expand Down Expand Up @@ -456,5 +459,3 @@ func apisGroupVersionFromPath(path string) (group, version string, ok bool) {
}
return parts[1], parts[2], true
}


73 changes: 73 additions & 0 deletions pkg/multicluster/apiextensions_clientpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package multicluster

import (
"fmt"
"net/http"
"sync"

apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/rest"
)

// APIExtensionsClientPool caches per-cluster apiextensions clients/transports.
type APIExtensionsClientPool struct {
base *rest.Config
pathPrefix string
controlPlaneSegment string

mu sync.Mutex
clients map[string]*apiExtensionsClientEntry
}

type apiExtensionsClientEntry struct {
clientset apiextensionsclient.Interface
httpClient *http.Client
}

func NewAPIExtensionsClientPool(base *rest.Config, pathPrefix, controlPlaneSegment string) *APIExtensionsClientPool {
return &APIExtensionsClientPool{
base: base,
pathPrefix: pathPrefix,
controlPlaneSegment: controlPlaneSegment,
clients: map[string]*apiExtensionsClientEntry{},
}
}

func (p *APIExtensionsClientPool) APIExtensionsClientForCluster(clusterID string) (apiextensionsclient.Interface, error) {
p.mu.Lock()
defer p.mu.Unlock()

if existing, ok := p.clients[clusterID]; ok {
return existing.clientset, nil
}
if p.base == nil {
return nil, fmt.Errorf("base loopback config is required")
}

cfg := rest.CopyConfig(p.base)
host, err := ClusterHost(cfg.Host, Options{
PathPrefix: p.pathPrefix,
ControlPlaneSegment: p.controlPlaneSegment,
}, clusterID)
if err != nil {
return nil, err
}
cfg.Host = host

transport, err := rest.TransportFor(cfg)
if err != nil {
return nil, err
}
httpClient := &http.Client{Transport: transport}

cs, err := apiextensionsclient.NewForConfigAndClient(cfg, httpClient)
if err != nil {
return nil, err
}

p.clients[clusterID] = &apiExtensionsClientEntry{
clientset: cs,
httpClient: httpClient,
}
return cs, nil
}
108 changes: 108 additions & 0 deletions pkg/multicluster/apiextensions_informerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package multicluster

import (
"fmt"
"sync"
"time"

apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
)

type APIExtensionsInformerPoolOptions struct {
ClientForCluster func(clusterID string) (apiextensionsclient.Interface, error)
ResyncPeriod time.Duration
StopCh <-chan struct{}
StartOnGet bool
}

type APIExtensionsInformerPool struct {
opts APIExtensionsInformerPoolOptions

mu sync.Mutex
clusters map[string]*apiExtensionsInformerEntry
}

type apiExtensionsInformerEntry struct {
clientset apiextensionsclient.Interface
factory apiextensionsinformers.SharedInformerFactory
stopCh <-chan struct{}
ownedCh chan struct{}
}

func NewAPIExtensionsInformerPool(opts APIExtensionsInformerPoolOptions) *APIExtensionsInformerPool {
if opts.StartOnGet == false {
// keep explicit
} else {
opts.StartOnGet = true
}
return &APIExtensionsInformerPool{
opts: opts,
clusters: map[string]*apiExtensionsInformerEntry{},
}
}

func NewAPIExtensionsInformerPoolFromClientPool(pool *APIExtensionsClientPool, resync time.Duration, stopCh <-chan struct{}) *APIExtensionsInformerPool {
return NewAPIExtensionsInformerPool(APIExtensionsInformerPoolOptions{
ClientForCluster: pool.APIExtensionsClientForCluster,
ResyncPeriod: resync,
StopCh: stopCh,
StartOnGet: true,
})
}

func (p *APIExtensionsInformerPool) Get(clusterID string) (apiextensionsclient.Interface, apiextensionsinformers.SharedInformerFactory, <-chan struct{}, error) {
p.mu.Lock()
defer p.mu.Unlock()

if entry, ok := p.clusters[clusterID]; ok {
if p.opts.StartOnGet {
entry.start()
}
return entry.clientset, entry.factory, entry.stopCh, nil
}
if p.opts.ClientForCluster == nil {
return nil, nil, nil, ErrMissingAPIExtensionsClientFactory
}
cs, err := p.opts.ClientForCluster(clusterID)
if err != nil {
return nil, nil, nil, err
}
factory := apiextensionsinformers.NewSharedInformerFactory(cs, p.opts.ResyncPeriod)
stopCh := p.opts.StopCh
var ownedCh chan struct{}
if stopCh == nil {
ownedCh = make(chan struct{})
stopCh = ownedCh
}
entry := &apiExtensionsInformerEntry{
clientset: cs,
factory: factory,
stopCh: stopCh,
ownedCh: ownedCh,
}
p.clusters[clusterID] = entry
if p.opts.StartOnGet {
entry.start()
}
return entry.clientset, entry.factory, entry.stopCh, nil
}

func (e *apiExtensionsInformerEntry) start() {
e.factory.Start(e.stopCh)
}

func (p *APIExtensionsInformerPool) StopCluster(clusterID string) {
p.mu.Lock()
defer p.mu.Unlock()
entry, ok := p.clusters[clusterID]
if !ok {
return
}
if entry.ownedCh != nil {
close(entry.ownedCh)
}
delete(p.clusters, clusterID)
}

var ErrMissingAPIExtensionsClientFactory = fmt.Errorf("missing apiextensions client factory for informer pool")
Loading