Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Headblock interop #3995

Merged
merged 3 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 82 additions & 55 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ const (
defaultBlockSize = 256 * 1024
)

type HeadBlockFmt byte

func (f HeadBlockFmt) Byte() byte { return byte(f) }

const (
_ HeadBlockFmt = iota
// placeholders to start splitting chunk formats vs head block
// fmts at v3
_
_
OrderedHeadBlockFmt
UnorderedHeadBlockFmt
)

var magicNumber = uint32(0x12EE56A)

// The table gets initialized with sync.Once but may still cause a race
Expand Down Expand Up @@ -74,7 +88,7 @@ type MemChunk struct {
cutBlockSize int

// Current in-mem block being appended to.
head *headBlock
head HeadBlock

// the chunk format default to v2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are at it, If I am not wrong, we default to v3 now, and I think we should update this comment too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its not yet active since NewMemChunk still creates an ordered head.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the DefaultChunkFormat being set to v3. Am I looking at the wrong place?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I missed it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is covered in the followup PR #4001 :)

format byte
Expand Down Expand Up @@ -102,11 +116,17 @@ type headBlock struct {
mint, maxt int64
}

func (hb *headBlock) isEmpty() bool {
func (hb *headBlock) Format() HeadBlockFmt { return OrderedHeadBlockFmt }

func (hb *headBlock) IsEmpty() bool {
return len(hb.entries) == 0
}

func (hb *headBlock) clear() {
func (hb *headBlock) Entries() int { return len(hb.entries) }

func (hb *headBlock) UncompressedSize() int { return hb.size }

func (hb *headBlock) Reset() {
if hb.entries != nil {
hb.entries = hb.entries[:0]
}
Expand All @@ -115,8 +135,10 @@ func (hb *headBlock) clear() {
hb.maxt = 0
}

func (hb *headBlock) append(ts int64, line string) error {
if !hb.isEmpty() && hb.maxt > ts {
func (hb *headBlock) Bounds() (int64, int64) { return hb.mint, hb.maxt }

func (hb *headBlock) Append(ts int64, line string) error {
if !hb.IsEmpty() && hb.maxt > ts {
return ErrOutOfOrder
}

Expand All @@ -130,7 +152,7 @@ func (hb *headBlock) append(ts int64, line string) error {
return nil
}

func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) {
inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
inBuf.Reset()
Expand Down Expand Up @@ -164,14 +186,14 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *headBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) {
func (hb *headBlock) CheckpointBytes(b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
err := hb.CheckpointTo(version, buf)
err := hb.CheckpointTo(buf)
return buf.Bytes(), err
}

// CheckpointSize returns the estimated size of the headblock checkpoint.
func (hb *headBlock) CheckpointSize(version byte) int {
func (hb *headBlock) CheckpointSize() int {
size := 1 // version
size += binary.MaxVarintLen32 * 2 // total entries + total size
size += binary.MaxVarintLen64 * 2 // mint,maxt
Expand All @@ -184,13 +206,13 @@ func (hb *headBlock) CheckpointSize(version byte) int {
}

// CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`.
func (hb *headBlock) CheckpointTo(version byte, w io.Writer) error {
func (hb *headBlock) CheckpointTo(w io.Writer) error {
eb := EncodeBufferPool.Get().(*encbuf)
defer EncodeBufferPool.Put(eb)

eb.reset()

eb.putByte(version)
eb.putByte(byte(hb.Format()))
_, err := w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock version")
Expand Down Expand Up @@ -225,7 +247,7 @@ func (hb *headBlock) CheckpointTo(version byte, w io.Writer) error {
return nil
}

func (hb *headBlock) FromCheckpoint(b []byte) error {
func (hb *headBlock) LoadBytes(b []byte) error {
if len(b) < 1 {
return nil
}
Expand Down Expand Up @@ -267,6 +289,20 @@ func (hb *headBlock) FromCheckpoint(b []byte) error {
return nil
}

func (hb *headBlock) Convert(version HeadBlockFmt) (HeadBlock, error) {
if version < UnorderedHeadBlockFmt {
return hb, nil
}
out := newUnorderedHeadBlock()

for _, e := range hb.entries {
if err := out.Append(e.t, e.s); err != nil {
return nil, err
}
}
return out, nil
}

type entry struct {
t int64
s string
Expand Down Expand Up @@ -511,11 +547,11 @@ func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error {
return err
}

if c.head.isEmpty() {
if c.head.IsEmpty() {
return nil
}

err = c.head.CheckpointTo(c.format, head)
err = c.head.CheckpointTo(head)
if err != nil {
return err
}
Expand All @@ -524,15 +560,15 @@ func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error {
}

func (c *MemChunk) CheckpointSize() (chunk, head int) {
return c.BytesSize(), c.head.CheckpointSize(c.format)
return c.BytesSize(), c.head.CheckpointSize()
}

func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) {
mc, err := NewByteChunk(chk, blockSize, targetSize)
if err != nil {
return nil, err
}
return mc, mc.head.FromCheckpoint(head)
return mc, mc.head.LoadBytes(head)
}

// Encoding implements Chunk.
Expand All @@ -547,9 +583,7 @@ func (c *MemChunk) Size() int {
ne += blk.numEntries
}

if !c.head.isEmpty() {
ne += len(c.head.entries)
}
ne += c.head.Entries()

return ne
}
Expand All @@ -564,7 +598,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
if c.targetSize > 0 {
// This is looking to see if the uncompressed lines will fit which is not
// a great check, but it will guarantee we are always under the target size
newHBSize := c.head.size + len(e.Line)
newHBSize := c.head.UncompressedSize() + len(e.Line)
return (c.cutBlockSize + newHBSize) < c.targetSize
}
// if targetSize is not defined, default to the original behavior of fixed blocks per chunk
Expand All @@ -575,9 +609,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
func (c *MemChunk) UncompressedSize() int {
size := 0

if !c.head.isEmpty() {
size += c.head.size
}
size += c.head.UncompressedSize()

for _, b := range c.blocks {
size += b.uncompressedSize
Expand All @@ -590,9 +622,7 @@ func (c *MemChunk) UncompressedSize() int {
func (c *MemChunk) CompressedSize() int {
size := 0
// Better to account for any uncompressed data than ignore it even though this isn't accurate.
if !c.head.isEmpty() {
size += c.head.size
}
size += c.head.UncompressedSize()
size += c.cutBlockSize
return size
}
Expand All @@ -612,15 +642,15 @@ func (c *MemChunk) Append(entry *logproto.Entry) error {

// If the head block is empty but there are cut blocks, we have to make
// sure the new entry is not out of order compared to the previous block
if c.head.isEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
if c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
return ErrOutOfOrder
}

if err := c.head.append(entryTimestamp, entry.Line); err != nil {
if err := c.head.Append(entryTimestamp, entry.Line); err != nil {
return err
}

if c.head.size >= c.blockSize {
if c.head.UncompressedSize() >= c.blockSize {
return c.cut()
}

Expand All @@ -635,44 +665,41 @@ func (c *MemChunk) Close() error {

// cut a new block and add it to finished blocks.
func (c *MemChunk) cut() error {
if c.head.isEmpty() {
if c.head.IsEmpty() {
return nil
}

b, err := c.head.serialise(getWriterPool(c.encoding))
b, err := c.head.Serialise(getWriterPool(c.encoding))
if err != nil {
return err
}

mint, maxt := c.head.Bounds()
c.blocks = append(c.blocks, block{
b: b,
numEntries: len(c.head.entries),
mint: c.head.mint,
maxt: c.head.maxt,
uncompressedSize: c.head.size,
numEntries: c.head.Entries(),
mint: mint,
maxt: maxt,
uncompressedSize: c.head.UncompressedSize(),
})

c.cutBlockSize += len(b)

c.head.clear()
c.head.Reset()
return nil
}

// Bounds implements Chunk.
func (c *MemChunk) Bounds() (fromT, toT time.Time) {
var from, to int64
if len(c.blocks) > 0 {
from = c.blocks[0].mint
to = c.blocks[len(c.blocks)-1].maxt
}
from, to := c.head.Bounds()

if !c.head.isEmpty() {
if from == 0 || from > c.head.mint {
from = c.head.mint
// need to check all the blocks in case they overlap
for _, b := range c.blocks {
if from == 0 || from > b.mint {
from = b.mint
}

if to < c.head.maxt {
to = c.head.maxt
if to < b.maxt {
to = b.maxt
}
}

Expand All @@ -692,8 +719,8 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
blockItrs = append(blockItrs, encBlock{c.encoding, b}.Iterator(ctx, pipeline))
}

if !c.head.isEmpty() {
headIterator = c.head.iterator(ctx, direction, mint, maxt, pipeline)
if !c.head.IsEmpty() {
headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline)
}

if direction == logproto.FORWARD {
Expand Down Expand Up @@ -743,8 +770,8 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor))
}

if !c.head.isEmpty() {
its = append(its, c.head.sampleIterator(ctx, mint, maxt, extractor))
if !c.head.IsEmpty() {
its = append(its, c.head.SampleIterator(ctx, mint, maxt, extractor))
}

return iter.NewTimeRangedSampleIterator(
Expand Down Expand Up @@ -837,8 +864,8 @@ func (b block) MaxTime() int64 {
return b.maxt
}

func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}

Expand Down Expand Up @@ -895,8 +922,8 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
return iter.NewStreamsIterator(ctx, streamsResult, direction)
}

func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
chunkStats := stats.GetChunkData(ctx)
Expand Down
Loading