Skip to content

Commit

Permalink
query: fix retry query case.
Browse files Browse the repository at this point in the history
In case the backend is very unstable and times out the batch we
need to make sure ongoing queryJobs are droped and already
registered queryJobs are removed from the heap as well.
  • Loading branch information
ziggie1984 committed Mar 27, 2024
1 parent 43f5a58 commit f2b8212
Showing 1 changed file with 43 additions and 8 deletions.
51 changes: 43 additions & 8 deletions query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,27 @@ Loop:
// into the work queue if it has not reached the
// maximum number of retries.
case result.err != nil:
// Punish the peer for the failed query.
w.cfg.Ranking.Punish(result.peer.Addr())
// Refresh peer rank on disconnect.
if result.err == ErrPeerDisconnected {
w.cfg.Ranking.ResetRanking(
result.peer.Addr(),
)
} else {
// Punish the peer for the failed query.
w.cfg.Ranking.Punish(result.peer.Addr())
}

if batch == nil {
log.Warnf("Query(%d) from peer %v "+
"failed with retries %d, NOT "+
"rescheduling batch already"+
"canceled: %v",
result.job.index,
result.peer.Addr(),
result.job.tries, result.err)

continue Loop
}

if batch != nil && !batch.noRetryMax {
result.job.tries++
Expand Down Expand Up @@ -380,11 +399,6 @@ Loop:
result.job.timeout = newTimeout
}

// Refresh peer rank on disconnect.
if result.err == ErrPeerDisconnected {
w.cfg.Ranking.ResetRanking(result.peer.Addr())
}

heap.Push(work, result.job)
currentQueries[result.job.index] = batchNum

Expand Down Expand Up @@ -423,11 +437,32 @@ Loop:
batch.errChan <- ErrQueryTimeout
delete(currentBatches, batchNum)

// When deleting the particular batch
// number we need to make sure we delete
// all the ongoing queryJobs in queue.
// Otherwise these queries might never
// be removed (in case they fail because
// of a timeout for example).
//
// NOTE: We do NOT cancel ongoing
// queries with the workers because it's
// easier to let them timeout and just
// be dropped because their
// corresponding batch is already
// removed.
for job, batch := range currentQueries {
if batch == batchNum {
heap.Remove(
work, int(job),
)
}
}

log.Warnf("Query(%d) failed with "+
"error: %v. Timing out.",
result.job.index, result.err)

log.Debugf("Batch %v timed out",
log.Warnf("Batch %v timed out",
batchNum)

default:
Expand Down

0 comments on commit f2b8212

Please sign in to comment.