Skip to content

Commit

Permalink
Merge pull request #4950 from twz123/concurrency-safe-clientfactory
Browse files Browse the repository at this point in the history
Concurrency safe client factory
  • Loading branch information
twz123 authored Oct 1, 2024
2 parents 62ef4c6 + ed0bc7b commit 4115f25
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 126 deletions.
22 changes: 20 additions & 2 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/k0sproject/k0s/pkg/token"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/rest"
)

type command config.CLIOptions
Expand Down Expand Up @@ -162,7 +163,20 @@ func (c *command) start(ctx context.Context) error {
}()

// common factory to get the admin kube client that's needed in many components
adminClientFactory := kubernetes.NewAdminClientFactory(c.K0sVars.AdminKubeConfigPath)
adminClientFactory := &kubernetes.ClientFactory{LoadRESTConfig: func() (*rest.Config, error) {
config, err := kubernetes.ClientConfig(kubernetes.KubeconfigFromFile(c.K0sVars.AdminKubeConfigPath))
if err != nil {
return nil, err
}

// We're always running the client on the same host as the API, no need to compress
config.DisableCompression = true
// To mitigate stack applier bursts in startup
config.QPS = 40.0
config.Burst = 400.0

return config, nil
}}

certificateManager := certificate.Manager{K0sVars: c.K0sVars}

Expand Down Expand Up @@ -561,7 +575,11 @@ func (c *command) start(ctx context.Context) error {
EnableWorker: c.EnableWorker,
})

apClientFactory, err := apclient.NewClientFactory(adminClientFactory.GetRESTConfig())
restConfig, err := adminClientFactory.GetRESTConfig()
if err != nil {
return err
}
apClientFactory, err := apclient.NewClientFactory(restConfig)
if err != nil {
return err
}
Expand Down
36 changes: 18 additions & 18 deletions internal/testutil/kube_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package testutil

import (
"errors"
"reflect"
"strings"

"github.com/k0sproject/k0s/internal/testutil/fakeclient"
k0sclientset "github.com/k0sproject/k0s/pkg/client/clientset"
k0sfake "github.com/k0sproject/k0s/pkg/client/clientset/fake"
k0sscheme "github.com/k0sproject/k0s/pkg/client/clientset/scheme"
etcdv1beta1 "github.com/k0sproject/k0s/pkg/client/clientset/typed/etcd/v1beta1"
Expand Down Expand Up @@ -69,20 +69,18 @@ func NewFakeClientFactory(objects ...runtime.Object) *FakeClientFactory {
k0sClients := fakeclient.NewClientset[k0sfake.Clientset](fakeDiscovery, tracker)

return &FakeClientFactory{
DynamicClient: fakeDynamic,
Client: kubeClients,
DiscoveryClient: memory.NewMemCacheClient(fakeDiscovery),
ConfigClient: k0sClients.K0sV1beta1().ClusterConfigs(constant.ClusterConfigNamespace),
EtcdMemberClient: k0sClients.EtcdV1beta1().EtcdMembers(),
DynamicClient: fakeDynamic,
Client: kubeClients,
DiscoveryClient: memory.NewMemCacheClient(fakeDiscovery),
K0sClient: k0sClients,
}
}

type FakeClientFactory struct {
DynamicClient *dynamicfake.FakeDynamicClient
Client kubernetes.Interface
DiscoveryClient discovery.CachedDiscoveryInterface
ConfigClient k0sv1beta1.ClusterConfigInterface
EtcdMemberClient etcdv1beta1.EtcdMemberInterface
DynamicClient *dynamicfake.FakeDynamicClient
Client kubernetes.Interface
DiscoveryClient discovery.CachedDiscoveryInterface
K0sClient k0sclientset.Interface
}

func (f *FakeClientFactory) GetClient() (kubernetes.Interface, error) {
Expand All @@ -97,20 +95,22 @@ func (f *FakeClientFactory) GetDiscoveryClient() (discovery.CachedDiscoveryInter
return f.DiscoveryClient, nil
}

func (f *FakeClientFactory) GetConfigClient() (k0sv1beta1.ClusterConfigInterface, error) {
return f.ConfigClient, nil
func (f *FakeClientFactory) GetK0sClient() (k0sclientset.Interface, error) {
return f.K0sClient, nil
}

func (f *FakeClientFactory) GetRESTClient() (rest.Interface, error) {
return nil, errors.ErrUnsupported
// Deprecated: Use [FakeClientFactory.GetK0sClient] instead.
func (f *FakeClientFactory) GetConfigClient() (k0sv1beta1.ClusterConfigInterface, error) {
return f.K0sClient.K0sV1beta1().ClusterConfigs(constant.ClusterConfigNamespace), nil
}

func (f FakeClientFactory) GetRESTConfig() *rest.Config {
return &rest.Config{}
func (f FakeClientFactory) GetRESTConfig() (*rest.Config, error) {
panic("GetRESTConfig not implemented for FakeClientFactory")
}

// Deprecated: Use [FakeClientFactory.GetK0sClient] instead.
func (f FakeClientFactory) GetEtcdMemberClient() (etcdv1beta1.EtcdMemberInterface, error) {
return f.EtcdMemberClient, nil
return f.K0sClient.EtcdV1beta1().EtcdMembers(), nil
}

// Extracts all kinds from scheme and builds API resource lists for fake discovery clients.
Expand Down
6 changes: 5 additions & 1 deletion pkg/applier/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error {

// waitForCRD waits 5 seconds for a CRD to become established on a best-effort basis.
func (s *Stack) waitForCRD(ctx context.Context, crdName string) {
client, err := extensionsclient.NewForConfig(s.Clients.GetRESTConfig())
config, err := s.Clients.GetRESTConfig()
if err != nil {
return
}
client, err := extensionsclient.NewForConfig(config)
if err != nil {
return
}
Expand Down
17 changes: 12 additions & 5 deletions pkg/autopilot/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ func CanUpdate(ctx context.Context, log logrus.FieldLogger, clientFactory kubern
}
}

metaClient, err := metadata.NewForConfig(clientFactory.GetRESTConfig())
if err != nil {
return err
}

var metaClient metadata.Interface
for _, r := range resources {
gv, err := schema.ParseGroupVersion(r.GroupVersion)
if err != nil {
Expand All @@ -69,6 +65,17 @@ func CanUpdate(ctx context.Context, log logrus.FieldLogger, clientFactory kubern
continue
}

if metaClient == nil {
restConfig, err := clientFactory.GetRESTConfig()
if err != nil {
return err
}

if metaClient, err = metadata.NewForConfig(restConfig); err != nil {
return err
}
}

metas, err := metaClient.Resource(gv.WithResource(ar.Name)).
Namespace(metav1.NamespaceAll).
List(ctx, metav1.ListOptions{})
Expand Down
6 changes: 5 additions & 1 deletion pkg/component/controller/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func (a *Autopilot) Init(ctx context.Context) error {
func (a *Autopilot) Start(ctx context.Context) error {
log := logrus.WithFields(logrus.Fields{"component": "autopilot"})

autopilotClientFactory, err := apcli.NewClientFactory(a.AdminClientFactory.GetRESTConfig())
restConfig, err := a.AdminClientFactory.GetRESTConfig()
if err != nil {
return fmt.Errorf("creating autopilot client factory error: %w", err)
}
autopilotClientFactory, err := apcli.NewClientFactory(restConfig)
if err != nil {
return fmt.Errorf("creating autopilot client factory error: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/component/controller/etcd_member_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ func (e *EtcdMemberReconciler) Stop() error {
}

func (e *EtcdMemberReconciler) waitForCRD(ctx context.Context) error {
rc := e.clientFactory.GetRESTConfig()

rc, err := e.clientFactory.GetRESTConfig()
if err != nil {
return err
}
ec, err := extclient.NewForConfig(rc)
if err != nil {
return err
Expand Down
14 changes: 10 additions & 4 deletions pkg/component/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -32,7 +33,7 @@ import (
"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/config"
"github.com/k0sproject/k0s/pkg/kubernetes"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -60,15 +61,20 @@ var _ manager.Component = (*Metrics)(nil)
var _ manager.Reconciler = (*Metrics)(nil)

// NewMetrics creates new Metrics reconciler
func NewMetrics(k0sVars *config.CfgVars, saver manifestsSaver, clientCF kubernetes.ClientFactoryInterface, storageType v1beta1.StorageType) (*Metrics, error) {
func NewMetrics(k0sVars *config.CfgVars, saver manifestsSaver, clientCF kubeutil.ClientFactoryInterface, storageType v1beta1.StorageType) (*Metrics, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}

restClient, err := clientCF.GetRESTClient()
if err != nil {
var restClient rest.Interface
if client, err := clientCF.GetDiscoveryClient(); err != nil {
return nil, fmt.Errorf("error getting REST client for metrics: %w", err)
} else {
restClient = client.RESTClient()
}
if restClient == nil {
return nil, errors.New("no REST client for metrics")
}

return &Metrics{
Expand Down
Loading

0 comments on commit 4115f25

Please sign in to comment.