Skip to content

Commit

Permalink
Headblock interop (#3995)
Browse files Browse the repository at this point in the history
* merge feature/unordered-replay

* interoperable head chunks
  • Loading branch information
owen-d authored Jul 19, 2021
1 parent 2107148 commit a4db1a4
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 97 deletions.
137 changes: 82 additions & 55 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ const (
defaultBlockSize = 256 * 1024
)

type HeadBlockFmt byte

func (f HeadBlockFmt) Byte() byte { return byte(f) }

const (
_ HeadBlockFmt = iota
// placeholders to start splitting chunk formats vs head block
// fmts at v3
_
_
OrderedHeadBlockFmt
UnorderedHeadBlockFmt
)

var magicNumber = uint32(0x12EE56A)

// The table gets initialized with sync.Once but may still cause a race
Expand Down Expand Up @@ -74,7 +88,7 @@ type MemChunk struct {
cutBlockSize int

// Current in-mem block being appended to.
head *headBlock
head HeadBlock

// the chunk format default to v2
format byte
Expand Down Expand Up @@ -102,11 +116,17 @@ type headBlock struct {
mint, maxt int64
}

func (hb *headBlock) isEmpty() bool {
func (hb *headBlock) Format() HeadBlockFmt { return OrderedHeadBlockFmt }

func (hb *headBlock) IsEmpty() bool {
return len(hb.entries) == 0
}

func (hb *headBlock) clear() {
func (hb *headBlock) Entries() int { return len(hb.entries) }

func (hb *headBlock) UncompressedSize() int { return hb.size }

func (hb *headBlock) Reset() {
if hb.entries != nil {
hb.entries = hb.entries[:0]
}
Expand All @@ -115,8 +135,10 @@ func (hb *headBlock) clear() {
hb.maxt = 0
}

func (hb *headBlock) append(ts int64, line string) error {
if !hb.isEmpty() && hb.maxt > ts {
func (hb *headBlock) Bounds() (int64, int64) { return hb.mint, hb.maxt }

func (hb *headBlock) Append(ts int64, line string) error {
if !hb.IsEmpty() && hb.maxt > ts {
return ErrOutOfOrder
}

Expand All @@ -130,7 +152,7 @@ func (hb *headBlock) append(ts int64, line string) error {
return nil
}

func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) {
inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
inBuf.Reset()
Expand Down Expand Up @@ -164,14 +186,14 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *headBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) {
func (hb *headBlock) CheckpointBytes(b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
err := hb.CheckpointTo(version, buf)
err := hb.CheckpointTo(buf)
return buf.Bytes(), err
}

// CheckpointSize returns the estimated size of the headblock checkpoint.
func (hb *headBlock) CheckpointSize(version byte) int {
func (hb *headBlock) CheckpointSize() int {
size := 1 // version
size += binary.MaxVarintLen32 * 2 // total entries + total size
size += binary.MaxVarintLen64 * 2 // mint,maxt
Expand All @@ -184,13 +206,13 @@ func (hb *headBlock) CheckpointSize(version byte) int {
}

// CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`.
func (hb *headBlock) CheckpointTo(version byte, w io.Writer) error {
func (hb *headBlock) CheckpointTo(w io.Writer) error {
eb := EncodeBufferPool.Get().(*encbuf)
defer EncodeBufferPool.Put(eb)

eb.reset()

eb.putByte(version)
eb.putByte(byte(hb.Format()))
_, err := w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock version")
Expand Down Expand Up @@ -225,7 +247,7 @@ func (hb *headBlock) CheckpointTo(version byte, w io.Writer) error {
return nil
}

func (hb *headBlock) FromCheckpoint(b []byte) error {
func (hb *headBlock) LoadBytes(b []byte) error {
if len(b) < 1 {
return nil
}
Expand Down Expand Up @@ -267,6 +289,20 @@ func (hb *headBlock) FromCheckpoint(b []byte) error {
return nil
}

func (hb *headBlock) Convert(version HeadBlockFmt) (HeadBlock, error) {
if version < UnorderedHeadBlockFmt {
return hb, nil
}
out := newUnorderedHeadBlock()

for _, e := range hb.entries {
if err := out.Append(e.t, e.s); err != nil {
return nil, err
}
}
return out, nil
}

type entry struct {
t int64
s string
Expand Down Expand Up @@ -511,11 +547,11 @@ func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error {
return err
}

if c.head.isEmpty() {
if c.head.IsEmpty() {
return nil
}

err = c.head.CheckpointTo(c.format, head)
err = c.head.CheckpointTo(head)
if err != nil {
return err
}
Expand All @@ -524,15 +560,15 @@ func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error {
}

func (c *MemChunk) CheckpointSize() (chunk, head int) {
return c.BytesSize(), c.head.CheckpointSize(c.format)
return c.BytesSize(), c.head.CheckpointSize()
}

func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) {
mc, err := NewByteChunk(chk, blockSize, targetSize)
if err != nil {
return nil, err
}
return mc, mc.head.FromCheckpoint(head)
return mc, mc.head.LoadBytes(head)
}

// Encoding implements Chunk.
Expand All @@ -547,9 +583,7 @@ func (c *MemChunk) Size() int {
ne += blk.numEntries
}

if !c.head.isEmpty() {
ne += len(c.head.entries)
}
ne += c.head.Entries()

return ne
}
Expand All @@ -564,7 +598,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
if c.targetSize > 0 {
// This is looking to see if the uncompressed lines will fit which is not
// a great check, but it will guarantee we are always under the target size
newHBSize := c.head.size + len(e.Line)
newHBSize := c.head.UncompressedSize() + len(e.Line)
return (c.cutBlockSize + newHBSize) < c.targetSize
}
// if targetSize is not defined, default to the original behavior of fixed blocks per chunk
Expand All @@ -575,9 +609,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
func (c *MemChunk) UncompressedSize() int {
size := 0

if !c.head.isEmpty() {
size += c.head.size
}
size += c.head.UncompressedSize()

for _, b := range c.blocks {
size += b.uncompressedSize
Expand All @@ -590,9 +622,7 @@ func (c *MemChunk) UncompressedSize() int {
func (c *MemChunk) CompressedSize() int {
size := 0
// Better to account for any uncompressed data than ignore it even though this isn't accurate.
if !c.head.isEmpty() {
size += c.head.size
}
size += c.head.UncompressedSize()
size += c.cutBlockSize
return size
}
Expand All @@ -612,15 +642,15 @@ func (c *MemChunk) Append(entry *logproto.Entry) error {

// If the head block is empty but there are cut blocks, we have to make
// sure the new entry is not out of order compared to the previous block
if c.head.isEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
if c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
return ErrOutOfOrder
}

if err := c.head.append(entryTimestamp, entry.Line); err != nil {
if err := c.head.Append(entryTimestamp, entry.Line); err != nil {
return err
}

if c.head.size >= c.blockSize {
if c.head.UncompressedSize() >= c.blockSize {
return c.cut()
}

Expand All @@ -635,44 +665,41 @@ func (c *MemChunk) Close() error {

// cut a new block and add it to finished blocks.
func (c *MemChunk) cut() error {
if c.head.isEmpty() {
if c.head.IsEmpty() {
return nil
}

b, err := c.head.serialise(getWriterPool(c.encoding))
b, err := c.head.Serialise(getWriterPool(c.encoding))
if err != nil {
return err
}

mint, maxt := c.head.Bounds()
c.blocks = append(c.blocks, block{
b: b,
numEntries: len(c.head.entries),
mint: c.head.mint,
maxt: c.head.maxt,
uncompressedSize: c.head.size,
numEntries: c.head.Entries(),
mint: mint,
maxt: maxt,
uncompressedSize: c.head.UncompressedSize(),
})

c.cutBlockSize += len(b)

c.head.clear()
c.head.Reset()
return nil
}

// Bounds implements Chunk.
func (c *MemChunk) Bounds() (fromT, toT time.Time) {
var from, to int64
if len(c.blocks) > 0 {
from = c.blocks[0].mint
to = c.blocks[len(c.blocks)-1].maxt
}
from, to := c.head.Bounds()

if !c.head.isEmpty() {
if from == 0 || from > c.head.mint {
from = c.head.mint
// need to check all the blocks in case they overlap
for _, b := range c.blocks {
if from == 0 || from > b.mint {
from = b.mint
}

if to < c.head.maxt {
to = c.head.maxt
if to < b.maxt {
to = b.maxt
}
}

Expand All @@ -692,8 +719,8 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
blockItrs = append(blockItrs, encBlock{c.encoding, b}.Iterator(ctx, pipeline))
}

if !c.head.isEmpty() {
headIterator = c.head.iterator(ctx, direction, mint, maxt, pipeline)
if !c.head.IsEmpty() {
headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline)
}

if direction == logproto.FORWARD {
Expand Down Expand Up @@ -743,8 +770,8 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor))
}

if !c.head.isEmpty() {
its = append(its, c.head.sampleIterator(ctx, mint, maxt, extractor))
if !c.head.IsEmpty() {
its = append(its, c.head.SampleIterator(ctx, mint, maxt, extractor))
}

return iter.NewTimeRangedSampleIterator(
Expand Down Expand Up @@ -837,8 +864,8 @@ func (b block) MaxTime() int64 {
return b.maxt
}

func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}

Expand Down Expand Up @@ -895,8 +922,8 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
return iter.NewStreamsIterator(ctx, streamsResult, direction)
}

func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
chunkStats := stats.GetChunkData(ctx)
Expand Down
Loading

0 comments on commit a4db1a4

Please sign in to comment.