Skip to content

Commit

Permalink
safe starting of batchChunkIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d committed Aug 5, 2020
1 parent 2b8ab0a commit f862fa0
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type batchChunkIterator struct {
lastOverlapping []*LazyChunk
iterFactory chunksIteratorFactory

begun bool
ctx context.Context
cancel context.CancelFunc
start, end time.Time
direction logproto.Direction
Expand All @@ -69,6 +71,7 @@ func newBatchChunkIterator(
start: start,
end: end,
direction: direction,
ctx: ctx,
cancel: cancel,
iterFactory: iterFactory,
chunks: lazyChunks{direction: direction, chunks: chunks},
Expand All @@ -78,10 +81,17 @@ func newBatchChunkIterator(
}),
}
sort.Sort(res.chunks)
go res.loop(ctx)
return res
}

// Start is idempotent and will begin the processing thread which seeds the iterator data.
func (it *batchChunkIterator) Start() {
if !it.begun {
it.begun = true
go it.loop(it.ctx)
}
}

func (it *batchChunkIterator) loop(ctx context.Context) {
for {
if it.chunks.Len() == 0 {
Expand Down Expand Up @@ -111,6 +121,8 @@ func (it *batchChunkIterator) loop(ctx context.Context) {
}

func (it *batchChunkIterator) Next() bool {
it.Start() // Ensure the iterator has started.

var err error
// for loop to avoid recursion
for {
Expand Down Expand Up @@ -300,7 +312,10 @@ func newLogBatchIterator(
}

batch := newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, logbatch.newChunksIterator)
// Important: since the batchChunkIterator is bound to the LogBatchIterator,
// ensure embedded fields are present before it's started.
logbatch.batchChunkIterator = batch
batch.Start()
return logbatch, nil
}

Expand Down Expand Up @@ -398,7 +413,11 @@ func newSampleBatchIterator(
ctx: ctx,
}
batch := newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, samplebatch.newChunksIterator)

// Important: since the batchChunkIterator is bound to the SampleBatchIterator,
// ensure embedded fields are present before it's started.
samplebatch.batchChunkIterator = batch
batch.Start()
return samplebatch, nil
}

Expand Down

0 comments on commit f862fa0

Please sign in to comment.