Skip to content

Commit

Permalink
WIP safe GetRESTConfig using a map
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Wieczorek <twieczorek@mirantis.com>
  • Loading branch information
twz123 committed Jun 12, 2024
1 parent af7d13e commit 887e4c4
Showing 1 changed file with 47 additions and 51 deletions.
98 changes: 47 additions & 51 deletions pkg/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,46 +44,41 @@ type ClientFactoryInterface interface {

// NewAdminClientFactory creates a new factory that loads the admin kubeconfig based client
func NewAdminClientFactory(kubeconfigPath string) ClientFactoryInterface {
return &ClientFactory{
return &clientFactory{
configPath: kubeconfigPath,
cache: make(map[any]any, 8),
}
}

// ClientFactory implements a cached and lazy-loading ClientFactory for all the different types of kube clients we use
// clientFactory implements a cached and lazy-loading clientFactory for all the different types of kube clients we use
// It's imoplemented as lazy-loading so we can create the factory itself before we have the api, etcd and other components up so we can pass
// the factory itself to components needing kube clients and creation time.
type ClientFactory struct {
type clientFactory struct {
configPath string

client *kubernetes.Clientset
dynamicClient *dynamic.DynamicClient
discoveryClient discovery.CachedDiscoveryInterface
restConfig *rest.Config
configClient cfgClient.ClusterConfigInterface
etcdMemberClient etcdMemberClient.EtcdMemberInterface

mutex sync.Mutex
cache map[any]any
}

func (c *ClientFactory) GetClient() (kubernetes.Interface, error) {
func (c *clientFactory) GetClient() (kubernetes.Interface, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

return lazyLoadClient(&c.client, c.getRESTConfig, kubernetes.NewForConfig)
return getOrCreateClient(c, kubernetes.NewForConfig)
}

func (c *ClientFactory) GetDynamicClient() (dynamic.Interface, error) {
func (c *clientFactory) GetDynamicClient() (dynamic.Interface, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

return lazyLoadClient(&c.dynamicClient, c.getRESTConfig, dynamic.NewForConfig)
return getOrCreateClient(c, dynamic.NewForConfig)
}

func (c *ClientFactory) GetDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
func (c *clientFactory) GetDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

return lazyLoadClient(&c.discoveryClient, c.getRESTConfig, func(c *rest.Config) (discovery.CachedDiscoveryInterface, error) {
return getOrCreateClient(c, func(c *rest.Config) (discovery.CachedDiscoveryInterface, error) {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(c)
if err != nil {
return nil, err
Expand All @@ -92,10 +87,11 @@ func (c *ClientFactory) GetDiscoveryClient() (discovery.CachedDiscoveryInterface
})
}

func (c *ClientFactory) GetConfigClient() (cfgClient.ClusterConfigInterface, error) {
func (c *clientFactory) GetConfigClient() (cfgClient.ClusterConfigInterface, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
return lazyLoadClient(&c.configClient, c.getRESTConfig, func(c *rest.Config) (cfgClient.ClusterConfigInterface, error) {

return getOrCreateClient(c, func(c *rest.Config) (cfgClient.ClusterConfigInterface, error) {
configClient, err := cfgClient.NewForConfig(c)
if err != nil {
return nil, err
Expand All @@ -104,11 +100,11 @@ func (c *ClientFactory) GetConfigClient() (cfgClient.ClusterConfigInterface, err
})
}

func (c *ClientFactory) GetEtcdMemberClient() (etcdMemberClient.EtcdMemberInterface, error) {
func (c *clientFactory) GetEtcdMemberClient() (etcdMemberClient.EtcdMemberInterface, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

return lazyLoadClient(&c.etcdMemberClient, c.getRESTConfig, func(c *rest.Config) (etcdMemberClient.EtcdMemberInterface, error) {
return getOrCreateClient(c, func(c *rest.Config) (etcdMemberClient.EtcdMemberInterface, error) {
etcdMemberClient, err := etcdMemberClient.NewForConfig(c)
if err != nil {
return nil, err
Expand All @@ -117,32 +113,26 @@ func (c *ClientFactory) GetEtcdMemberClient() (etcdMemberClient.EtcdMemberInterf
})
}

func (c *ClientFactory) GetRESTConfig() (*rest.Config, error) {
func (c *clientFactory) GetRESTConfig() (*rest.Config, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.getRESTConfig()
}

func (c *ClientFactory) getRESTConfig() (_ *rest.Config, err error) {
if c.restConfig == nil {
c.restConfig, err = loadRESTConfig(c.configPath)
}

return c.restConfig, err
}

func loadRESTConfig(kubeconfigPath string) (*rest.Config, error) {
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed to load kubeconfig: %w", err)
}
// We're always running the client on the same host as the API, no need to compress
restConfig.DisableCompression = true
// To mitigate stack applier bursts in startup
restConfig.QPS = 40.0
restConfig.Burst = 400.0
func (c *clientFactory) getRESTConfig() (*rest.Config, error) {
return getOrCreate(c, func() (*rest.Config, error) {
restConfig, err := clientcmd.BuildConfigFromFlags("", c.configPath)
if err != nil {
return nil, fmt.Errorf("failed to load kubeconfig: %w", err)
}
// We're always running the client on the same host as the API, no need to compress
restConfig.DisableCompression = true
// To mitigate stack applier bursts in startup
restConfig.QPS = 40.0
restConfig.Burst = 400.0

return restConfig, nil
return restConfig, nil
})
}

// KubeconfigFromFile returns a [clientcmd.KubeconfigGetter] that tries to load
Expand Down Expand Up @@ -178,22 +168,28 @@ func NewClient(getter clientcmd.KubeconfigGetter) (kubernetes.Interface, error)
return kubernetes.NewForConfig(config)
}

func lazyLoadClient[I comparable](loaded *I, loadConfig func() (*rest.Config, error), newForConfig func(*rest.Config) (I, error)) (I, error) {
return lazyLoad(loaded, func() (t I, _ error) {
config, err := loadConfig()
func getOrCreateClient[T any](c *clientFactory, newForConfig func(*rest.Config) (T, error)) (T, error) {
return getOrCreate(c, func() (t T, _ error) {
c, err := c.getRESTConfig()
if err != nil {
return t, err
}
return newForConfig(config)
return newForConfig(c)
})
}

func lazyLoad[T comparable](loaded *T, load func() (T, error)) (t T, err error) {
if *loaded == t {
t, err := load()
if err == nil {
*loaded = t
}
func getOrCreate[T any](c *clientFactory, create func() (T, error)) (T, error) {
// A unique key based on the type of a value.
type keyType[T any] struct{}

k := keyType[T]{}
if stored, ok := c.cache[k].(*T); ok && stored != nil {
return *stored, nil
}

loaded, err := create()
if err == nil {
c.cache[k] = &loaded
}
return *loaded, err
return loaded, err
}

0 comments on commit 887e4c4

Please sign in to comment.