Skip to content

Commit

Permalink
Remove stop channel usage during reconiliation
Browse files Browse the repository at this point in the history
Use context causes to signal the stop condition. This way, there's no
need to add another special case to the select statements.

Signed-off-by: Tom Wieczorek <twieczorek@mirantis.com>
  • Loading branch information
twz123 committed Sep 4, 2024
1 parent d86e318 commit 04f1a7a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 20 deletions.
43 changes: 25 additions & 18 deletions pkg/component/controller/workerconfig/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (r *Reconciler) Start(context.Context) error {
// the update channel and apply those to the desired state. Changes will be
// applied whenever the last reconciled state differs from the desired
// state.
reconcilerCtx, cancelReconciler := context.WithCancel(context.Background())
reconcilerCtx, cancelReconciler := context.WithCancelCause(context.Background())
stopped := make(chan struct{})
apply := r.apply
go func() {
Expand All @@ -177,10 +177,10 @@ func (r *Reconciler) Start(context.Context) error {

go func() {
wait.UntilWithContext(reconcilerCtx, func(ctx context.Context) {
err := r.reconcileAPIServers(ctx, updates, stopped)
err := r.reconcileAPIServers(ctx, updates)
// Log any reconciliation errors, but only if they don't
// indicate that the reconciler has been stopped concurrently.
if err != nil && !errors.Is(err, reconcilerCtx.Err()) && !errors.Is(err, errStoppedConcurrently) {
if err != nil && !errors.Is(err, errStoppedConcurrently) {
r.log.WithError(err).Error("Failed to reconcile API server addresses")
}
}, 10*time.Second)
Expand All @@ -189,10 +189,10 @@ func (r *Reconciler) Start(context.Context) error {
// lease is acquired.
r.leaderElector.AddAcquiredLeaseCallback(func() {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
ctx, cancel := context.WithTimeout(reconcilerCtx, 1*time.Minute)
defer cancel()

err := reconcile(ctx, updates, stopped, func(s *snapshot) {
err := reconcile(ctx, updates, func(s *snapshot) {
s.serial++
})

Expand All @@ -207,7 +207,7 @@ func (r *Reconciler) Start(context.Context) error {
// Store the started state
r.apply = nil
r.updates = updates
r.requestStop = cancelReconciler
r.requestStop = func() { cancelReconciler(errStoppedConcurrently) }
r.stopped = stopped
r.state = reconcilerStarted

Expand Down Expand Up @@ -236,8 +236,8 @@ func (r *Reconciler) runReconcileLoop(ctx context.Context, updates <-chan update
var desiredState, reconciledState snapshot

runReconciliation := func() error {
if err := ctx.Err(); err != nil {
return fmt.Errorf("%w while processing reconciliation", errStoppedConcurrently)
if err := context.Cause(ctx); err != nil {
return fmt.Errorf("%w while processing reconciliation", err)
}

if !r.leaderElector.IsLeader() {
Expand Down Expand Up @@ -322,33 +322,40 @@ func (r *Reconciler) Reconcile(ctx context.Context, cluster *v1beta1.ClusterConf

configSnapshot := takeConfigSnapshot(cluster.Spec)

return reconcile(ctx, updates, stopped, func(s *snapshot) {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)

go func() {
select {
case <-stopped:
cancel(errStoppedConcurrently)
case <-ctx.Done():
}
}()

return reconcile(ctx, updates, func(s *snapshot) {
s.configSnapshot = &configSnapshot
})
}

var errStoppedConcurrently = errors.New("stopped concurrently")

// reconcile enqueues the given update and awaits its reconciliation.
func reconcile(ctx context.Context, updates chan<- updateFunc, stopped <-chan struct{}, update func(*snapshot)) error {
func reconcile(ctx context.Context, updates chan<- updateFunc, update func(*snapshot)) error {
recoDone := make(chan error, 1)

select {
case updates <- func(s *snapshot) chan<- error { update(s); return recoDone }:
break
case <-stopped:
return fmt.Errorf("%w while trying to enqueue state update", errStoppedConcurrently)
case <-ctx.Done():
return fmt.Errorf("%w while trying to enqueue state update", ctx.Err())
return fmt.Errorf("%w while trying to enqueue state update", context.Cause(ctx))
}

select {
case err := <-recoDone:
return err
case <-stopped:
return fmt.Errorf("%w while waiting for reconciliation to finish", errStoppedConcurrently)
case <-ctx.Done():
return fmt.Errorf("%w while waiting for reconciliation to finish", ctx.Err())
return fmt.Errorf("%w while waiting for reconciliation to finish", context.Cause(ctx))
}
}

Expand Down Expand Up @@ -384,7 +391,7 @@ func (r *Reconciler) Stop() error {
return nil
}

func (r *Reconciler) reconcileAPIServers(ctx context.Context, updates chan<- updateFunc, stopped <-chan struct{}) error {
func (r *Reconciler) reconcileAPIServers(ctx context.Context, updates chan<- updateFunc) error {
client, err := r.clientFactory.GetClient()
if err != nil {
return err
Expand All @@ -398,7 +405,7 @@ func (r *Reconciler) reconcileAPIServers(ctx context.Context, updates chan<- upd
return false, err
}

return false, reconcile(ctx, updates, stopped, func(s *snapshot) { s.apiServers = apiServers })
return false, reconcile(ctx, updates, func(s *snapshot) { s.apiServers = apiServers })
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/component/controller/workerconfig/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func TestReconciler_runReconcileLoop(t *testing.T) {
leaderElector: &leaderelector.Dummy{Leader: true},
}

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
ctx, cancel := context.WithCancelCause(context.TODO())

// Prepare update channel for two updates.
updates, firstDone, secondDone := make(chan updateFunc, 2), make(chan error, 1), make(chan error, 1)
Expand All @@ -581,7 +581,7 @@ func TestReconciler_runReconcileLoop(t *testing.T) {
updates <- func(s *snapshot) chan<- error { return firstDone }

// Put in the second update that'll cancel the context.
updates <- func(s *snapshot) chan<- error { cancel(); return secondDone }
updates <- func(s *snapshot) chan<- error { cancel(errStoppedConcurrently); return secondDone }

underTest.runReconcileLoop(ctx, updates, nil)

Expand Down

0 comments on commit 04f1a7a

Please sign in to comment.