Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds WAL support (experimental) #2981

Merged
merged 58 commits into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
1b2b931
marshalable chunks
owen-d Oct 1, 2020
ddecf02
wal record types custom serialization
owen-d Oct 2, 2020
48fa77f
proto types for wal checkpoints
owen-d Oct 15, 2020
7ec0fb0
byteswith output unaffected by buffer
owen-d Oct 15, 2020
d6ccccf
wal & record pool ifcs
owen-d Oct 28, 2020
eb6a519
wal record can hold entries from multiple series
owen-d Oct 28, 2020
f089f09
entry pool
owen-d Oct 28, 2020
211356d
ingester uses noopWal
owen-d Oct 29, 2020
7adf966
removes duplicate argument passing in ingester code. adds ingester co…
owen-d Oct 29, 2020
29fafd6
Merge remote-tracking branch 'upstream/master' into wal/writes
owen-d Nov 4, 2020
5ed8ffb
segment writing
owen-d Nov 4, 2020
107b03b
[WIP] wal recovery from segments
owen-d Nov 5, 2020
fb194f5
replay uses sync.Maps & preserves WAL fingerprints
owen-d Nov 5, 2020
3d36b2d
in memory wal recovery
owen-d Nov 5, 2020
9a7eaa5
wal segment recovery
owen-d Nov 6, 2020
30cdbf2
ingester metrics struct
owen-d Nov 6, 2020
d723c0b
wal replay locks streamsMtx in instances, adds checkpoint codec
owen-d Nov 9, 2020
897196e
ingester metrics
owen-d Nov 9, 2020
8df2598
checkpointer
owen-d Nov 10, 2020
b5c2ef0
WAL checkpoint writer
owen-d Nov 10, 2020
60ebe5e
checkpointwriter can write multiple checkpoints
owen-d Nov 10, 2020
ed14e0c
reorgs checkpointing
owen-d Nov 10, 2020
5acf745
wires up checkpointwriter to wal
owen-d Nov 10, 2020
6dd28b1
ingester SeriesIter impl
owen-d Nov 10, 2020
a7e40fd
wires up ingesterRecoverer to consume checkpoints
owen-d Nov 10, 2020
957717b
generic recovery fn
owen-d Nov 10, 2020
9384e98
generic recovery fn
owen-d Nov 10, 2020
a1beb69
recover from both wal types
owen-d Nov 10, 2020
4e27f7c
cleans up old tmp checkpoints & allows aborting in flight checkpoints
owen-d Nov 12, 2020
45840bc
wires up wal checkpointing
owen-d Nov 12, 2020
d1b7f6c
more granular wal logging
owen-d Nov 12, 2020
2ca359d
fixes off by 1 wal truncation & removes double logging
owen-d Nov 12, 2020
ff6b35b
adds userID to wal records correctly
owen-d Nov 12, 2020
d811aef
wire chunk encoding tests
owen-d Nov 12, 2020
6eb4ec0
more granular wal metrics
owen-d Nov 12, 2020
7962164
checkpoint encoding test
owen-d Nov 12, 2020
1961726
Merge remote-tracking branch 'upstream/master' into wal/writes
owen-d Nov 13, 2020
67f298d
ignores debug bins
owen-d Nov 13, 2020
d4318c7
Merge remote-tracking branch 'upstream/master' into wal/writes
owen-d Nov 13, 2020
d43ecde
segment replay ignores out of orders
owen-d Nov 13, 2020
ce72f33
fixes bug between WAL reading []byte validity and proto unmarshalling…
owen-d Nov 13, 2020
c9614aa
conf validations, removes comments
owen-d Nov 13, 2020
571f8fc
flush on shutdown config
owen-d Nov 18, 2020
2c965f3
POST /ingester/shutdown
owen-d Nov 18, 2020
1c4d526
renames flush on shutdown
owen-d Nov 18, 2020
d9139de
wal & checkpoint use same segment size
owen-d Nov 18, 2020
9a4c810
Merge remote-tracking branch 'upstream/master' into wal/writes
owen-d Nov 19, 2020
bd9cee0
writes entries to wal regardless of tailers
owen-d Nov 19, 2020
e042812
makes wal checkpoing duration default to 5m
owen-d Nov 20, 2020
f632022
recovery metrics
owen-d Nov 20, 2020
a948785
encodes headchunks separately for wal purposes
owen-d Nov 20, 2020
2e972c7
merge upstream
owen-d Nov 23, 2020
43d8a78
typo
owen-d Nov 23, 2020
3166183
linting
owen-d Nov 23, 2020
6059256
Merge remote-tracking branch 'upstream/master' into wal/headblock
owen-d Nov 25, 2020
51ff8d7
addresses pr feedback
owen-d Nov 25, 2020
ff211ce
prevent shared access bug with tailers and entry pool
owen-d Nov 25, 2020
cb9bc46
removes stream push entry pool optimization
owen-d Nov 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,17 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
return outBuf.Bytes(), nil
}

// CheckpointBytes is used for WAL checkpointing
// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *headBlock) CheckpointBytes(version byte) ([]byte, error) {
owen-d marked this conversation as resolved.
Show resolved Hide resolved
// wB is eventually returned via buf.Bytes(), don't return it to the pool.
wB := BytesBufferPool.Get(1 << 10).([]byte)
encB := BytesBufferPool.Get(1 << 10).([]byte)

defer func() {
BytesBufferPool.Put(encB[:0])
}()

buf := bytes.NewBuffer(wB[:0])
buf := bytes.NewBuffer(make([]byte, 0, 1<<10))
eb := encbuf{b: encB}

eb.putByte(version)
Expand Down
20 changes: 11 additions & 9 deletions pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
Expand Down Expand Up @@ -95,13 +96,14 @@ func decodeCheckpointRecord(rec []byte, s *Series) error {
}
}

func encodeWithTypeHeader(m proto.Message, typ RecordType, b []byte) ([]byte, error) {
func encodeWithTypeHeader(m proto.Message, typ RecordType) ([]byte, error) {
buf, err := proto.Marshal(m)
if err != nil {
return b, err
return nil, err
}

b = append(b[:0], byte(typ))
b := make([]byte, 0, len(buf)+1)
b = append(b, byte(typ))
b = append(b, buf...)
return b, nil
}
Expand All @@ -112,7 +114,7 @@ type SeriesWithErr struct {
}

type SeriesIter interface {
Num() int
Count() int
Iter() <-chan *SeriesWithErr
Stop()
}
Expand All @@ -130,7 +132,7 @@ func newIngesterSeriesIter(ing *Ingester) *ingesterSeriesIter {
}
}

func (i *ingesterSeriesIter) Num() (ct int) {
func (i *ingesterSeriesIter) Count() (ct int) {
for _, inst := range i.ing.getInstances() {
ct += inst.numStreams()
}
Expand Down Expand Up @@ -234,8 +236,8 @@ func (w *WALCheckpointWriter) Advance() (bool, error) {
if err := os.MkdirAll(checkpointDirTemp, 0777); err != nil {
return false, errors.Wrap(err, "create checkpoint dir")
}
// checkpoint, err ;= wal.NewSize()
checkpoint, err := wal.NewSize(nil, nil, checkpointDirTemp, walSegmentSize, false)

checkpoint, err := wal.NewSize(log.With(util.Logger, "component", "checkpoint_wal"), nil, checkpointDirTemp, walSegmentSize, false)
if err != nil {
return false, errors.Wrap(err, "open checkpoint")
}
Expand All @@ -248,7 +250,7 @@ func (w *WALCheckpointWriter) Advance() (bool, error) {
}

func (w *WALCheckpointWriter) Write(s *Series) error {
b, err := encodeWithTypeHeader(s, CheckpointRecord, recordPool.GetBytes()[:0])
b, err := encodeWithTypeHeader(s, CheckpointRecord)
if err != nil {
return err
}
Expand Down Expand Up @@ -430,7 +432,7 @@ func (c *Checkpointer) PerformCheckpoint() (err error) {
}()
// signal whether checkpoint writes should be amortized or burst
var immediate bool
n := c.iter.Num()
n := c.iter.Count()
if n < 1 {
return c.writer.Close(false)
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/ingester/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,11 @@ func decodeEntries(b []byte, rec *WALRecord) error {

for len(dec.B) > 0 && dec.Err() == nil {
refEntries := RefEntries{
Ref: dec.Be64(),
Entries: recordPool.GetEntries(),
Ref: dec.Be64(),
}

nEntries := dec.Uvarint()
rem := nEntries
if cap(refEntries.Entries) < nEntries {
refEntries.Entries = make([]logproto.Entry, 0, nEntries)
}
for ; dec.Err() == nil && rem > 0; rem-- {
timeOffset := dec.Varint64()
lineLength := dec.Uvarint()
Expand Down Expand Up @@ -181,8 +177,6 @@ func decodeWALRecord(b []byte, walRec *WALRecord) (err error) {
t = RecordType(decbuf.Byte())
)

walRec.Reset()

switch t {
case WALRecordSeries:
userID = decbuf.UvarintStr()
Expand Down
24 changes: 11 additions & 13 deletions pkg/ingester/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ func Test_Encoding_Series(t *testing.T) {

buf := record.encodeSeries(nil)

var decoded WALRecord
decoded := recordPool.GetRecord()

err := decodeWALRecord(buf, &decoded)
err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, record, &decoded)
require.Equal(t, record, decoded)
}

func Test_Encoding_Entries(t *testing.T) {
Expand Down Expand Up @@ -81,25 +81,23 @@ func Test_Encoding_Entries(t *testing.T) {

buf := record.encodeEntries(nil)

var decoded WALRecord
decoded := recordPool.GetRecord()

err := decodeWALRecord(buf, &decoded)
err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, record, &decoded)
require.Equal(t, record, decoded)
}

func fillChunk(c chunkenc.Chunk) int64 {
func fillChunk(t *testing.T, c chunkenc.Chunk) int64 {
t.Helper()
var i, inserted int64
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: "entry for line 0",
}

for c.SpaceFor(entry) {
err := c.Append(entry)
if err != nil {
panic(err)
}
require.NoError(t, c.Append(entry))
i++
inserted += int64(len(entry.Line))
entry.Timestamp = time.Unix(0, i)
Expand All @@ -120,7 +118,7 @@ func Test_EncodingChunks(t *testing.T) {

conf := dummyConf()
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, conf.BlockSize, conf.TargetChunkSize)
fillChunk(c)
fillChunk(t, c)

from := []chunkDesc{
{
Expand Down Expand Up @@ -187,7 +185,7 @@ func Test_EncodingCheckpoint(t *testing.T) {
},
}

b, err := encodeWithTypeHeader(s, CheckpointRecord, recordPool.GetBytes()[:0])
b, err := encodeWithTypeHeader(s, CheckpointRecord)
require.Nil(t, err)

out := &Series{}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper"
errUtil "github.com/grafana/loki/pkg/util"
listutil "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -147,7 +148,6 @@ type Ingester struct {

metrics *ingesterMetrics

// WAL
wal WAL
owen-d marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -297,9 +297,9 @@ func (i *Ingester) running(ctx context.Context) error {
// At this point, loop no longer runs, but flushers are still running.
func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
i.wal.Stop()

err := services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
var errs errUtil.MultiError
errs.Add(i.wal.Stop())
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))

// Normally, flushers are stopped via lifecycler (in transferOut), but if lifecycler fails,
// we better stop them.
Expand All @@ -308,7 +308,7 @@ func (i *Ingester) stopping(_ error) error {
}
i.flushQueuesDone.Wait()

return err
return errs.Err()
}

func (i *Ingester) loop() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ func recoverGeneric(
inputs = append(inputs, make(chan recoveryInput))

go func(input <-chan recoveryInput) {
defer wg.Done()
process(recoverer, input, errCh)
wg.Done()
}(inputs[i])

}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *stream) Push(
_, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds()
}

storedEntries := make([]logproto.Entry, 0, len(entries))
storedEntries := recordPool.GetEntries()
owen-d marked this conversation as resolved.
Show resolved Hide resolved
failedEntriesWithError := []entryWithError{}

// Don't fail on the first append error - if samples are sent out of order,
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (t *tailer) loop() {
}

func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
defer recordPool.PutEntries(stream.Entries)
if t.isClosed() {
return
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/ingester/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ type WAL interface {
// Log marshalls the records and writes it into the WAL.
Log(*WALRecord) error
// Stop stops all the WAL operations.
Stop()
Stop() error
}

type noopWAL struct{}

func (noopWAL) Log(*WALRecord) error { return nil }
func (noopWAL) Stop() {}
func (noopWAL) Stop() error { return nil }

type walWrapper struct {
cfg WALConfig
Expand Down Expand Up @@ -127,11 +127,12 @@ func (w *walWrapper) Log(record *WALRecord) error {
}
}

func (w *walWrapper) Stop() {
func (w *walWrapper) Stop() error {
close(w.quit)
w.wait.Wait()
_ = w.wal.Close()
err := w.wal.Close()
level.Info(util.Logger).Log("msg", "stopped", "component", "wal")
return err
}

func (w *walWrapper) checkpointWriter() *WALCheckpointWriter {
Expand Down