Skip to content

Commit

Permalink
Merge pull request #1712 from FabianKramm/refactor
Browse files Browse the repository at this point in the history
fix: flaking k8sdefaultendpoint syncing
  • Loading branch information
FabianKramm authored Apr 23, 2024
2 parents 464ec88 + d53e3c6 commit 22d44d0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 19 deletions.
34 changes: 16 additions & 18 deletions pkg/controllers/k8sdefaultendpoint/k8sdefaultendpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package k8sdefaultendpoint
import (
"context"
"fmt"
"time"

"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/constants"
Expand Down Expand Up @@ -64,13 +63,14 @@ func (e *EndpointController) Register(mgr ctrl.Manager) error {
if err != nil {
return fmt.Errorf("unable to setup pod security controller: %w", err)
}

return nil
}

func (e *EndpointController) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
err := e.syncKubernetesServiceEndpoints(ctx, e.VirtualClient, e.ServiceClient, e.ServiceName, e.ServiceNamespace)
err := e.syncKubernetesServiceEndpoints(ctx)
if err != nil {
return ctrl.Result{RequeueAfter: time.Second}, err
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
Expand All @@ -79,37 +79,35 @@ func (e *EndpointController) Reconcile(ctx context.Context, _ ctrl.Request) (ctr
// SetupWithManager adds the controller to the manager
func (e *EndpointController) SetupWithManager(mgr ctrl.Manager) error {
// creating a predicate to receive reconcile requests for kubernetes endpoint only
pp := func(object client.Object) bool {
pPredicates := predicate.NewPredicateFuncs(func(object client.Object) bool {
return object.GetNamespace() == e.ServiceNamespace && object.GetName() == e.ServiceName
}
pfuncs := predicate.NewPredicateFuncs(pp)
})

vp := func(object client.Object) bool {
vPredicates := predicate.NewPredicateFuncs(func(object client.Object) bool {
if object.GetNamespace() == specialservices.DefaultKubernetesSvcKey.Namespace && object.GetName() == specialservices.DefaultKubernetesSvcKey.Name {
return true
}

return false
}
vfuncs := predicate.NewPredicateFuncs(vp)
})

return ctrl.NewControllerManagedBy(mgr).
Named("kubernetes_default_endpoint").
WithOptions(controller.Options{
CacheSyncTimeout: constants.DefaultCacheSyncTimeout,
}).
For(&corev1.Endpoints{}, builder.WithPredicates(pfuncs)).
WatchesRawSource(source.Kind(e.VirtualManagerCache, &corev1.Endpoints{}), &handler.EnqueueRequestForObject{}, builder.WithPredicates(vfuncs)).
WatchesRawSource(source.Kind(e.VirtualManagerCache, e.provider.createClientObject()), &handler.EnqueueRequestForObject{}, builder.WithPredicates(vfuncs)).
For(&corev1.Endpoints{}, builder.WithPredicates(pPredicates)).
WatchesRawSource(source.Kind(e.VirtualManagerCache, &corev1.Endpoints{}), &handler.EnqueueRequestForObject{}, builder.WithPredicates(vPredicates)).
WatchesRawSource(source.Kind(e.VirtualManagerCache, e.provider.createClientObject()), &handler.EnqueueRequestForObject{}, builder.WithPredicates(vPredicates)).
Complete(e)
}

func (e *EndpointController) syncKubernetesServiceEndpoints(ctx context.Context, virtualClient client.Client, localClient client.Client, serviceName, serviceNamespace string) error {
func (e *EndpointController) syncKubernetesServiceEndpoints(ctx context.Context) error {
// get physical service endpoints
pEndpoints := &corev1.Endpoints{}
err := localClient.Get(ctx, types.NamespacedName{
Namespace: serviceNamespace,
Name: serviceName,
err := e.ServiceClient.Get(ctx, types.NamespacedName{
Namespace: e.ServiceNamespace,
Name: e.ServiceName,
}, pEndpoints)
if err != nil {
if kerrors.IsNotFound(err) {
Expand All @@ -125,7 +123,7 @@ func (e *EndpointController) syncKubernetesServiceEndpoints(ctx context.Context,
Name: "kubernetes",
},
}
result, err := controllerutil.CreateOrPatch(ctx, virtualClient, vEndpoints, func() error {
result, err := controllerutil.CreateOrPatch(ctx, e.VirtualClient, vEndpoints, func() error {
if vEndpoints.Labels == nil {
vEndpoints.Labels = map[string]string{}
}
Expand Down Expand Up @@ -173,7 +171,7 @@ func (e *EndpointController) syncKubernetesServiceEndpoints(ctx context.Context,
}

if result == controllerutil.OperationResultCreated || result == controllerutil.OperationResultUpdated {
return e.provider.createOrPatch(ctx, virtualClient, vEndpoints)
return e.provider.createOrPatch(ctx, e.VirtualClient, vEndpoints)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/setup/controller_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func newCurrentNamespaceClient(ctx context.Context, localManager ctrl.Manager, o
// as the regular cache is scoped to the options.TargetNamespace and cannot return
// objects from the current namespace.
currentNamespaceCache := localManager.GetCache()
if options.WorkloadNamespace != options.WorkloadTargetNamespace {
if !options.Experimental.MultiNamespaceMode.Enabled && options.WorkloadNamespace != options.WorkloadTargetNamespace {
currentNamespaceCache, err = cache.New(localManager.GetConfig(), cache.Options{
Scheme: localManager.GetScheme(),
Mapper: localManager.GetRESTMapper(),
Expand Down

0 comments on commit 22d44d0

Please sign in to comment.