Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zstd and flate compressions algorithms. #3064

Merged
merged 3 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ require (
github.com/jmespath/go-jmespath v0.4.0
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible
github.com/json-iterator/go v1.1.10
github.com/klauspost/compress v1.9.5
github.com/klauspost/compress v1.11.3
github.com/mitchellh/mapstructure v1.3.3
github.com/moby/term v0.0.0-20200915141129-7f0af18e79f2 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/opentracing/opentracing-go v1.2.0
// github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pierrec/lz4/v4 v4.0.2-0.20200813132121-22f5d580d5c4
github.com/pierrec/lz4/v4 v4.1.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.8.0
github.com/prometheus/client_model v0.2.0
Expand Down
12 changes: 4 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09bA=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE=
contrib.go.opencensus.io/exporter/ocagent v0.6.0/go.mod h1:zmKjrJcdo0aYcVS7bmEeSEBLPA9YJp5bjrofdU3pIXs=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
Expand Down Expand Up @@ -825,7 +824,6 @@ github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4d
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
Expand All @@ -835,6 +833,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.5 h1:U+CaK85mrNNb4k8BNOfgJtJ/gr6kswUCFj6miSzVC6M=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.3 h1:dB4Bn0tN3wdCzQxnS8r06kV74qN/TAfaIS0bVE8h3jc=
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
Expand Down Expand Up @@ -911,7 +911,6 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.6/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE=
Expand Down Expand Up @@ -948,7 +947,6 @@ github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKU
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/mitchellh/go-homedir v1.0.0 h1:vKb8ShqSby24Yrqr/yDYkuFz8d0WUjys40rvnGC8aR0=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
Expand Down Expand Up @@ -1069,12 +1067,11 @@ github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/cmdflag v0.0.2/go.mod h1:a3zKGZ3cdQUfxjd0RGMLZr8xI3nvpJOB+m6o/1X5BmU=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.0.2-0.20200813132121-22f5d580d5c4 h1:qId52nKbgRXnPooGimqIBgQCECmjgQedP8YsrOFPOOo=
github.com/pierrec/lz4/v4 v4.0.2-0.20200813132121-22f5d580d5c4/go.mod h1:vvUajMAuienWCEdMnA5Zb5mp0VIa9M8VvKcVEOkoAh8=
github.com/pierrec/lz4/v4 v4.1.1 h1:cS6aGkNLJr4u+UwaA21yp+gbWN3WJWtKo1axmPDObMA=
github.com/pierrec/lz4/v4 v4.1.1/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -1192,7 +1189,6 @@ github.com/samuel/go-zookeeper v0.0.0-20200724154423-2164a8ac840e/go.mod h1:gi+0
github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/schollz/progressbar/v3 v3.3.4/go.mod h1:Rp5lZwpgtYmlvmGo1FyDwXMqagyRBQYSDwzlP9QDu84=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
Expand Down
8 changes: 8 additions & 0 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
EncLZ4_256k
EncLZ4_1M
EncLZ4_4M
EncFlate
EncZstd
)

var supportedEncoding = []Encoding{
Expand All @@ -46,6 +48,8 @@ var supportedEncoding = []Encoding{
EncLZ4_256k,
EncLZ4_1M,
EncLZ4_4M,
EncFlate,
EncZstd,
}

func (e Encoding) String() string {
Expand All @@ -66,6 +70,10 @@ func (e Encoding) String() string {
return "lz4"
case EncSnappy:
return "snappy"
case EncFlate:
return "flate"
case EncZstd:
return "zstd"
default:
return "unknown"
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ var testEncoding = []Encoding{
EncLZ4_1M,
EncLZ4_4M,
EncSnappy,
EncFlate,
EncZstd,
}

var (
Expand Down Expand Up @@ -619,6 +621,13 @@ func BenchmarkWrite(b *testing.B) {

}

type nomatchPipeline struct{}

func (nomatchPipeline) Process(line []byte) ([]byte, log.LabelsResult, bool) { return line, nil, false }
func (nomatchPipeline) ProcessString(line string) (string, log.LabelsResult, bool) {
return line, nil, false
}

func BenchmarkRead(b *testing.B) {
for _, enc := range testEncoding {
b.Run(enc.String(), func(b *testing.B) {
Expand All @@ -629,7 +638,7 @@ func BenchmarkRead(b *testing.B) {
for n := 0; n < b.N; n++ {
for _, c := range chunks {
// use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, noopStreamPipeline)
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nomatchPipeline{})
if err != nil {
panic(err)
}
Expand Down
106 changes: 105 additions & 1 deletion pkg/chunkenc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"sync"

"github.com/golang/snappy"
"github.com/klauspost/compress/flate"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
"github.com/prometheus/prometheus/pkg/pool"
)
Expand All @@ -32,7 +34,8 @@ var (
Lz4_256k = LZ4Pool{bufferSize: 1 << 18} // Lz4_256k uses 256k buffer
Lz4_1M = LZ4Pool{bufferSize: 1 << 20} // Lz4_1M uses 1M buffer
Lz4_4M = LZ4Pool{bufferSize: 1 << 22} // Lz4_4M uses 4M buffer

Flate = FlatePool{}
Zstd = ZstdPool{}
// Snappy is the snappy compression pool
Snappy SnappyPool
// Noop is the no compression pool
Expand Down Expand Up @@ -74,6 +77,10 @@ func getReaderPool(enc Encoding) ReaderPool {
return &Snappy
case EncNone:
return &Noop
case EncFlate:
return &Flate
case EncZstd:
return &Zstd
default:
panic("unknown encoding")
}
Expand Down Expand Up @@ -132,6 +139,103 @@ func (pool *GzipPool) PutWriter(writer io.WriteCloser) {
pool.writers.Put(writer)
}

// FlatePool is a flate compression pool
type FlatePool struct {
readers sync.Pool
writers sync.Pool
level int
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *FlatePool) GetReader(src io.Reader) io.Reader {
if r := pool.readers.Get(); r != nil {
reader := r.(flate.Resetter)
err := reader.Reset(src, nil)
if err != nil {
panic(err)
}
return reader.(io.Reader)
}
return flate.NewReader(src)
}

// PutReader places back in the pool a CompressionReader
func (pool *FlatePool) PutReader(reader io.Reader) {
pool.readers.Put(reader)
}

// GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (pool *FlatePool) GetWriter(dst io.Writer) io.WriteCloser {
if w := pool.writers.Get(); w != nil {
writer := w.(*flate.Writer)
writer.Reset(dst)
return writer
}

level := pool.level
if level == 0 {
level = flate.DefaultCompression
}
w, err := flate.NewWriter(dst, level)
if err != nil {
panic(err) // never happens, error is only returned on wrong compression level.
}
return w
}

// PutWriter places back in the pool a CompressionWriter
func (pool *FlatePool) PutWriter(writer io.WriteCloser) {
pool.writers.Put(writer)
}

// GzipPool is a gun zip compression pool
type ZstdPool struct {
readers sync.Pool
writers sync.Pool
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *ZstdPool) GetReader(src io.Reader) io.Reader {
if r := pool.readers.Get(); r != nil {
reader := r.(*zstd.Decoder)
err := reader.Reset(src)
if err != nil {
panic(err)
}
return reader
}
reader, err := zstd.NewReader(src)
if err != nil {
panic(err)
}
return reader
}

// PutReader places back in the pool a CompressionReader
func (pool *ZstdPool) PutReader(reader io.Reader) {
pool.readers.Put(reader)
}

// GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (pool *ZstdPool) GetWriter(dst io.Writer) io.WriteCloser {
if w := pool.writers.Get(); w != nil {
writer := w.(*zstd.Encoder)
writer.Reset(dst)
return writer
}

w, err := zstd.NewWriter(dst)
if err != nil {
panic(err) // never happens, error is only returned on wrong compression level.
}
return w
}

// PutWriter places back in the pool a CompressionWriter
func (pool *ZstdPool) PutWriter(writer io.WriteCloser) {
pool.writers.Put(writer)
}

type LZ4Pool struct {
readers sync.Pool
writers sync.Pool
Expand Down
Loading