Skip to content

Commit

Permalink
Retry service-busy errors after a delay to prevent retry storms
Browse files Browse the repository at this point in the history
Builds on #1167, but adds delay before retrying service-busy errors.

For now, since our server-side RPS quotas are calculated per second, this delays
at least 1 second per service busy error.
This is in contrast to the previous behavior, which would have retried up to about
a dozen times in the same period, which is the cause of service-busy-based retry
storms that cause lots more service-busy errors.

---

This also gives us an easy way to make use of "retry after" information in errors
we return to the caller, though currently our errors do not contain that.

Eventually this should probably come from the server, which has a global view of
how many requests this service has sent, and can provide a more precise delay to
individual callers.
E.g. currently our server-side ratelimiter works in 1-second slices... but that
isn't something that's guaranteed to stay true.  The server could also detect truly
large floods of requests, and return jittered values larger than 1 second to more
powerfully stop the storm, or to allow prioritizing some requests (like activity
responses) over others simply by returning a lower delay.
  • Loading branch information
Groxx committed Jun 22, 2022
1 parent 1005ea5 commit 647f54f
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 101 deletions.
30 changes: 25 additions & 5 deletions internal/common/backoff/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ type (
// Operation to retry
Operation func() error

// IsRetryable handler can be used to exclude certain errors during retry
IsRetryable func(error) bool
// RetryAfter handler can be used to exclude certain errors during retry,
// and define how long to wait at a minimum before trying again.
// delays must be 0 or larger.
RetryAfter func(error) (isRetryable bool, retryAfter time.Duration)

// ConcurrentRetrier is used for client-side throttling. It determines whether to
// throttle outgoing traffic in case downstream backend server rejects
Expand Down Expand Up @@ -87,7 +89,7 @@ func NewConcurrentRetrier(retryPolicy RetryPolicy) *ConcurrentRetrier {
}

// Retry function can be used to wrap any call with retry logic using the passed in policy
func Retry(ctx context.Context, operation Operation, policy RetryPolicy, isRetryable IsRetryable) error {
func Retry(ctx context.Context, operation Operation, policy RetryPolicy, retryAfter RetryAfter) error {
var err error
var next time.Duration

Expand All @@ -104,15 +106,33 @@ Retry_Loop:
}

// Check if the error is retryable
if isRetryable != nil && !isRetryable(err) {
return err
if retryAfter != nil {
retryable, minNext := retryAfter(err)
// fail on non-retryable errors
if !retryable {
return err
}
// update the time to wait until the next attempt.
// as this is a *minimum*, just add it to the current pending time.
// this way repeated service busy errors will take increasing amounts of time before retrying,
// and when the request is not limited it does not further increase the time until trying again.
next += minNext
}

// check if ctx is done
if ctx.Err() != nil {
return err
}

// wait for the next retry period (or context timeout)
if ctxDone := ctx.Done(); ctxDone != nil {
// we could check if this is longer than context deadline and immediately fail...
// ...but wasting time prevents higher-level retries from trying too early.
// this is particularly useful for service-busy, but seems valid for essentially all retried errors.
timer := time.NewTimer(next)
select {
case <-ctxDone:
timer.Stop()
return err
case <-timer.C:
continue Retry_Loop
Expand Down
61 changes: 26 additions & 35 deletions internal/common/backoff/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,32 @@ func TestRetry(t *testing.T) {
tests := []struct {
name string
maxAttempts int
isRetryable func(error) bool
maxTime time.Duration // context timeout
isRetryable func(error) (bool, time.Duration)

shouldError bool
expectedCalls int
}{
{"success", 2 * succeedOnAttemptNum, nil, false, succeedOnAttemptNum},
{"too many tries", 3, nil, true, 4}, // max 3 retries == 4 calls. must be < succeedOnAttemptNum to work.
{"success with always custom retry", 2 * succeedOnAttemptNum, func(err error) bool {
return true // retry on all errors, same as no custom retry
{"success", 2 * succeedOnAttemptNum, time.Second, nil, false, succeedOnAttemptNum},
{"too many tries", 3, time.Second, nil, true, 4}, // max 3 retries == 4 calls. must be < succeedOnAttemptNum to work.
{"success with always custom retry", 2 * succeedOnAttemptNum, time.Second, func(err error) (bool, time.Duration) {
return true, 0 // retry on all errors, same as no custom retry
}, false, succeedOnAttemptNum},
{"success with never custom retry", 2 * succeedOnAttemptNum, func(err error) bool {
return false // never retry
{"success with never custom retry", 2 * succeedOnAttemptNum, time.Second, func(err error) (bool, time.Duration) {
return false, 0 // never retry
}, true, 1},

// elapsed-time-sensitive tests below.
// consider raising time granularity if flaky, or we could set up a more complete mock
// to resolve flakiness for real, but that's a fair bit more complex.

// try -> sleep(10ms) -> try -> sleep(20ms) -> try -> sleep(40ms) -> timeout == 3 calls.
{"timed out eventually", 5, 50 * time.Millisecond, func(err error) (bool, time.Duration) {
return true, 0
}, true, 3},
// try -> sleep(longer than context timeout) -> timeout == 1 call.
{"timed out due to long minimum delay", 5, 10 * time.Millisecond, func(err error) (bool, time.Duration) {
return true, 20 * time.Millisecond
}, true, 1},
}

Expand All @@ -66,11 +80,13 @@ func TestRetry(t *testing.T) {
return &someError{}
}

policy := NewExponentialRetryPolicy(1 * time.Millisecond)
policy.SetMaximumInterval(5 * time.Millisecond)
policy := NewExponentialRetryPolicy(10 * time.Millisecond)
policy.SetMaximumInterval(50 * time.Millisecond)
policy.SetMaximumAttempts(test.maxAttempts)

err := Retry(context.Background(), op, policy, test.isRetryable)
ctx, cancel := context.WithTimeout(context.Background(), test.maxTime)
defer cancel()
err := Retry(ctx, op, policy, test.isRetryable)
if test.shouldError {
assert.Error(t, err)
} else {
Expand All @@ -81,31 +97,6 @@ func TestRetry(t *testing.T) {
}
}

func TestNoRetryAfterContextDone(t *testing.T) {
t.Parallel()
retryCounter := 0
op := func() error {
retryCounter++

if retryCounter == 5 {
return nil
}

return &someError{}
}

policy := NewExponentialRetryPolicy(10 * time.Millisecond)
policy.SetMaximumInterval(50 * time.Millisecond)
policy.SetMaximumAttempts(10)

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

err := Retry(ctx, op, policy, nil)
assert.Error(t, err)
assert.True(t, retryCounter >= 2, "retryCounter should be at least 2 but was %d", retryCounter) // verify that we did retry
}

func TestConcurrentRetrier(t *testing.T) {
t.Parallel()
a := assert.New(t)
Expand Down
35 changes: 20 additions & 15 deletions internal/internal_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func createDynamicServiceRetryPolicy(ctx context.Context) backoff.RetryPolicy {
return policy
}

func isServiceTransientError(err error) bool {
func errRetryableAfter(err error) (isRetryable bool, retryAfter time.Duration) {
// check intentionally-not-retried error types via errors.As.
//
// sadly we cannot build this into a list of error values / types to range over, as that
Expand All @@ -72,50 +72,55 @@ func isServiceTransientError(err error) bool {
// so we're left with this mess. it's not even generics-friendly.
// at least this pattern lets it be done inline without exposing the variable.
if target := (*s.AccessDeniedError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.BadRequestError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.CancellationAlreadyRequestedError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.ClientVersionNotSupportedError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.DomainAlreadyExistsError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.DomainNotActiveError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.EntityNotExistsError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.FeatureNotEnabledError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.LimitExceededError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.QueryFailedError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.WorkflowExecutionAlreadyCompletedError)(nil); errors.As(err, &target) {
return false
return false, 0
}
if target := (*s.WorkflowExecutionAlreadyStartedError)(nil); errors.As(err, &target) {
return false
return false, 0
}

// shutdowns are not retryable, of course
if errors.Is(err, errShutdown) {
return false
return false, 0
}

// service busy errors are special, and require a minimum delay
if target := (*s.ServiceBusyError)(nil); errors.As(err, &target) {
return true, time.Second
}

// s.InternalServiceError
// s.ServiceBusyError (must retry after a delay, but it is transient)
// server-side-only error types (as they should not reach clients)
// and all other `error` types
return true
return true, 0
}
6 changes: 4 additions & 2 deletions internal/internal_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func TestErrRetries(t *testing.T) {
&s.RemoteSyncMatchedError{},
&s.InternalDataInconsistencyError{},
} {
assert.True(t, isServiceTransientError(err), "%T should be transient", err)
retryable, _ := errRetryableAfter(err)
assert.True(t, retryable, "%T should be transient", err)
}
})
t.Run("terminal", func(t *testing.T) {
Expand All @@ -47,7 +48,8 @@ func TestErrRetries(t *testing.T) {

errShutdown, // shutdowns can't be stopped
} {
assert.False(t, isServiceTransientError(err), "%T should be fatal", err)
retryable, _ := errRetryableAfter(err)
assert.False(t, retryable, "%T should be fatal", err)
}
})
}
6 changes: 3 additions & 3 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,7 @@ func signalWorkflow(
tchCtx, cancel, opt := newChannelContext(ctx, featureFlags)
defer cancel()
return service.SignalWorkflowExecution(tchCtx, request, opt...)
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)
}

func recordActivityHeartbeat(
Expand All @@ -2033,7 +2033,7 @@ func recordActivityHeartbeat(
var err error
heartbeatResponse, err = service.RecordActivityTaskHeartbeat(tchCtx, request, opt...)
return err
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)

if heartbeatErr == nil && heartbeatResponse != nil && heartbeatResponse.GetCancelRequested() {
return NewCanceledError()
Expand Down Expand Up @@ -2067,7 +2067,7 @@ func recordActivityHeartbeatByID(
var err error
heartbeatResponse, err = service.RecordActivityTaskHeartbeatByID(tchCtx, request, opt...)
return err
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)

if heartbeatErr == nil && heartbeatResponse != nil && heartbeatResponse.GetCancelRequested() {
return NewCanceledError()
Expand Down
45 changes: 35 additions & 10 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}
}

return err1
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)

return
}
Expand Down Expand Up @@ -765,12 +765,25 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {

response, err := wtp.service.PollForDecisionTask(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
if err != nil {
if isServiceTransientError(err) {
retryable, retryAfter := errRetryableAfter(err)
if retryable {
wtp.metricsScope.Counter(metrics.DecisionPollTransientFailedCounter).Inc(1)
} else {
wtp.metricsScope.Counter(metrics.DecisionPollFailedCounter).Inc(1)
}
wtp.updateBacklog(request.TaskList.GetKind(), 0)

// pause for the retry delay if present.
// failures also have an exponential backoff, but this ensures a minimum is respected.
if retryAfter > 0 {
t := time.NewTimer(retryAfter)
select {
case <-ctx.Done():
t.Stop()
case <-t.C:
}
}

return nil, err
}

Expand Down Expand Up @@ -896,7 +909,7 @@ func newGetHistoryPageFunc(
NextPageToken: nextPageToken,
}, opt...)
return err1
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)
if err != nil {
metricsScope.Counter(metrics.WorkflowGetHistoryFailedCounter).Inc(1)
return nil, nil, err
Expand Down Expand Up @@ -990,11 +1003,23 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (*s.PollForActivityTask
response, err := atp.service.PollForActivityTask(ctx, request, getYarpcCallOptions(atp.featureFlags)...)

if err != nil {
if isServiceTransientError(err) {
retryable, retryAfter := errRetryableAfter(err)
if retryable {
atp.metricsScope.Counter(metrics.ActivityPollTransientFailedCounter).Inc(1)
} else {
atp.metricsScope.Counter(metrics.ActivityPollFailedCounter).Inc(1)
}

// pause for the retry delay if present
if retryAfter > 0 {
t := time.NewTimer(retryAfter)
select {
case <-ctx.Done():
t.Stop()
case <-t.C:
}
}

return nil, startTime, err
}
if response == nil || len(response.TaskToken) == 0 {
Expand Down Expand Up @@ -1169,23 +1194,23 @@ func reportActivityComplete(
defer cancel()

return service.RespondActivityTaskCanceled(tchCtx, request, opt...)
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)
case *s.RespondActivityTaskFailedRequest:
reportErr = backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx, featureFlags)
defer cancel()

return service.RespondActivityTaskFailed(tchCtx, request, opt...)
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)
case *s.RespondActivityTaskCompletedRequest:
reportErr = backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx, featureFlags)
defer cancel()

return service.RespondActivityTaskCompleted(tchCtx, request, opt...)
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)
}
if reportErr == nil {
switch request.(type) {
Expand Down Expand Up @@ -1222,23 +1247,23 @@ func reportActivityCompleteByID(
defer cancel()

return service.RespondActivityTaskCanceledByID(tchCtx, request, opt...)
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)
case *s.RespondActivityTaskFailedByIDRequest:
reportErr = backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx, featureFlags)
defer cancel()

return service.RespondActivityTaskFailedByID(tchCtx, request, opt...)
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)
case *s.RespondActivityTaskCompletedByIDRequest:
reportErr = backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx, featureFlags)
defer cancel()

return service.RespondActivityTaskCompletedByID(tchCtx, request, opt...)
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}, createDynamicServiceRetryPolicy(ctx), errRetryableAfter)
}
if reportErr == nil {
switch request.(type) {
Expand Down
Loading

0 comments on commit 647f54f

Please sign in to comment.