Skip to content

Commit

Permalink
WIP safe getrestclient without a map again
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Wieczorek <twieczorek@mirantis.com>
  • Loading branch information
twz123 committed Sep 7, 2024
1 parent ef25228 commit f193886
Showing 1 changed file with 49 additions and 47 deletions.
96 changes: 49 additions & 47 deletions pkg/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package kubernetes

import (
"fmt"
"sync"

k0sclientset "github.com/k0sproject/k0s/pkg/client/clientset"
Expand Down Expand Up @@ -48,30 +47,34 @@ type ClientFactoryInterface interface {
func NewAdminClientFactory(kubeconfigPath string) ClientFactoryInterface {
return &clientFactory{
configPath: kubeconfigPath,
cache: make(map[any]any, 8),
}
}

// clientFactory implements a cached and lazy-loading clientFactory for all the different types of kube clients we use
// It's imoplemented as lazy-loading so we can create the factory itself before we have the api, etcd and other components up so we can pass
// It's implemented as lazy-loading so we can create the factory itself before we have the api, etcd and other components up so we can pass
// the factory itself to components needing kube clients and creation time.
type clientFactory struct {
configPath string

client *kubernetes.Clientset
dynamicClient *dynamic.DynamicClient
discoveryClient discovery.CachedDiscoveryInterface
k0sClient *k0sclientset.Clientset
restConfig *rest.Config

mutex sync.Mutex
cache map[any]any
}

func (c *clientFactory) GetClient() (kubernetes.Interface, error) {
return getOrCreateClient(c, kubernetes.NewForConfig)
return getOrCreateClient(c, &c.client, kubernetes.NewForConfig)
}

func (c *clientFactory) GetDynamicClient() (dynamic.Interface, error) {
return getOrCreateClient(c, dynamic.NewForConfig)
return getOrCreateClient(c, &c.dynamicClient, dynamic.NewForConfig)
}

func (c *clientFactory) GetDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
return getOrCreateClient(c, func(c *rest.Config) (discovery.CachedDiscoveryInterface, error) {
return getOrCreateClient(c, &c.discoveryClient, func(c *rest.Config) (discovery.CachedDiscoveryInterface, error) {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(c)
if err != nil {
return nil, err
Expand All @@ -81,10 +84,10 @@ func (c *clientFactory) GetDiscoveryClient() (discovery.CachedDiscoveryInterface
}

func (c *clientFactory) GetK0sClient() (k0sclientset.Interface, error) {
return getOrCreateClient(c, k0sclientset.NewForConfig)
return getOrCreateClient(c, &c.k0sClient, k0sclientset.NewForConfig)
}

// Deprecated: Use [ClientFactory.GetK0sClient] instead.
// Deprecated: Use [clientFactory.GetK0sClient] instead.
func (c *clientFactory) GetConfigClient() (cfgClient.ClusterConfigInterface, error) {
k0sClient, err := c.GetK0sClient()
if err != nil {
Expand All @@ -94,7 +97,7 @@ func (c *clientFactory) GetConfigClient() (cfgClient.ClusterConfigInterface, err
return k0sClient.K0sV1beta1().ClusterConfigs(constant.ClusterConfigNamespace), nil
}

// Deprecated: Use [ClientFactory.GetK0sClient] instead.
// Deprecated: Use [clientFactory.GetK0sClient] instead.
func (c *clientFactory) GetEtcdMemberClient() (etcdMemberClient.EtcdMemberInterface, error) {
k0sClient, err := c.GetK0sClient()
if err != nil {
Expand All @@ -107,25 +110,53 @@ func (c *clientFactory) GetEtcdMemberClient() (etcdMemberClient.EtcdMemberInterf
func (c *clientFactory) GetRESTConfig() (*rest.Config, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.getRESTConfig()
config, err := c.getRESTConfig()
if err != nil {
return nil, err
}
return rest.CopyConfig(config), nil
}

func (c *clientFactory) getRESTConfig() (*rest.Config, error) {
return getOrCreate(c, func() (*rest.Config, error) {
restConfig, err := clientcmd.BuildConfigFromFlags("", c.configPath)
return getOrCreate(&c.restConfig, func() (*rest.Config, error) {
config, err := ClientConfig(KubeconfigFromFile(c.configPath))
if err != nil {
return nil, fmt.Errorf("failed to load kubeconfig: %w", err)
return nil, err
}

// We're always running the client on the same host as the API, no need to compress
restConfig.DisableCompression = true
config.DisableCompression = true
// To mitigate stack applier bursts in startup
restConfig.QPS = 40.0
restConfig.Burst = 400.0
config.QPS = 40.0
config.Burst = 400.0

return config, nil
})
}

func getOrCreateClient[T comparable](cf *clientFactory, loaded *T, newForConfig func(*rest.Config) (T, error)) (T, error) {
cf.mutex.Lock()
defer cf.mutex.Unlock()

return restConfig, nil
return getOrCreate(loaded, func() (t T, _ error) {
config, err := cf.getRESTConfig()
if err != nil {
return t, err
}
return newForConfig(config)
})
}

func getOrCreate[T comparable](loaded *T, load func() (T, error)) (t T, err error) {
if *loaded == t {
t, err = load()
if err == nil {
*loaded = t
}
}
return *loaded, err
}

// KubeconfigFromFile returns a [clientcmd.KubeconfigGetter] that tries to load
// a kubeconfig from the given path.
func KubeconfigFromFile(path string) clientcmd.KubeconfigGetter {
Expand Down Expand Up @@ -158,32 +189,3 @@ func NewClient(getter clientcmd.KubeconfigGetter) (kubernetes.Interface, error)

return kubernetes.NewForConfig(config)
}

func getOrCreateClient[T any](c *clientFactory, newForConfig func(*rest.Config) (T, error)) (T, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

return getOrCreate(c, func() (t T, _ error) {
c, err := c.getRESTConfig()
if err != nil {
return t, err
}
return newForConfig(c)
})
}

func getOrCreate[T any](c *clientFactory, create func() (T, error)) (T, error) {
// A unique key based on the type of a value.
type keyType[T any] struct{}

k := keyType[T]{}
if stored, ok := c.cache[k].(*T); ok && stored != nil {
return *stored, nil
}

loaded, err := create()
if err == nil {
c.cache[k] = &loaded
}
return loaded, err
}

0 comments on commit f193886

Please sign in to comment.