Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve protobuf serialization #2031

Merged
merged 3 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (d *Distributor) stopping(_ error) error {

// TODO taken from Cortex, see if we can refactor out an usable interface.
type streamTracker struct {
stream *logproto.Stream
stream logproto.Stream
minSuccess int
maxFailures int
succeeded int32
Expand Down Expand Up @@ -329,7 +329,7 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
}

req := &logproto.PushRequest{
Streams: make([]*logproto.Stream, len(streams)),
Streams: make([]logproto.Stream, len(streams)),
}
for i, s := range streams {
req.Streams[i] = s.stream
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client) *Distri

func makeWriteRequest(lines int, size int) *logproto.PushRequest {
req := logproto.PushRequest{
Streams: []*logproto.Stream{
Streams: []logproto.Stream{
{
Labels: `{foo="bar"}`,
},
Expand Down
9 changes: 5 additions & 4 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package distributor

import (
"errors"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/weaveworks/common/httpgrpc"
"net/http"
"strings"
"time"

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
Expand Down Expand Up @@ -52,7 +53,7 @@ func (v Validator) ValidateEntry(userID string, labels string, entry logproto.En
}

// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, stream *logproto.Stream) error {
func (v Validator) ValidateLabels(userID string, stream logproto.Stream) error {
ls, err := util.ToClientLabels(stream.Labels)
if err != nil {
// I wish we didn't return httpgrpc errors here as it seems
Expand Down Expand Up @@ -92,7 +93,7 @@ func (v Validator) ValidateLabels(userID string, stream *logproto.Stream) error
return nil
}

func updateMetrics(reason, userID string, stream *logproto.Stream) {
func updateMetrics(reason, userID string, stream logproto.Stream) {
validation.DiscardedSamples.WithLabelValues(reason, userID).Inc()
bytes := 0
for _, e := range stream.Entries {
Expand Down
14 changes: 8 additions & 6 deletions pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package distributor

import (
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/common/httpgrpc"
"net/http"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)

var testStreamLabels = "FIXME"
Expand Down Expand Up @@ -147,7 +149,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
v, err := NewValidator(o)
assert.NoError(t, err)

err = v.ValidateLabels(tt.userID, &logproto.Stream{Labels: tt.labels})
err = v.ValidateLabels(tt.userID, logproto.Stream{Labels: tt.labels})
assert.Equal(t, tt.expected, err)
})
}
Expand Down
32 changes: 16 additions & 16 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestFlushingCollidingLabels(t *testing.T) {
// checkData only iterates between unix seconds 0 and 1000
now := time.Unix(0, 0)

req := &logproto.PushRequest{Streams: []*logproto.Stream{
req := &logproto.PushRequest{Streams: []logproto.Stream{
// some colliding label sets
{Labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}.String(), Entries: entries(5, now.Add(time.Minute))},
{Labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}.String(), Entries: entries(5, now)},
Expand All @@ -95,7 +95,7 @@ func TestFlushingCollidingLabels(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))

// verify that we get all the data back
store.checkData(t, map[string][]*logproto.Stream{userID: req.Streams})
store.checkData(t, map[string][]logproto.Stream{userID: req.Streams})

// make sure all chunks have different fingerprint, even colliding ones.
chunkFingerprints := map[model.Fingerprint]bool{}
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestFlushMaxAge(t *testing.T) {
{Timestamp: now.Add(time.Second * 61), Line: "3"},
}

req := &logproto.PushRequest{Streams: []*logproto.Stream{
req := &logproto.PushRequest{Streams: []logproto.Stream{
{Labels: model.LabelSet{"app": "l"}.String(), Entries: firstEntries},
}}

Expand All @@ -138,9 +138,9 @@ func TestFlushMaxAge(t *testing.T) {
time.Sleep(2 * cfg.FlushCheckPeriod)

// ensure chunk is not flushed after flush period elapses
store.checkData(t, map[string][]*logproto.Stream{})
store.checkData(t, map[string][]logproto.Stream{})

req2 := &logproto.PushRequest{Streams: []*logproto.Stream{
req2 := &logproto.PushRequest{Streams: []logproto.Stream{
{Labels: model.LabelSet{"app": "l"}.String(), Entries: secondEntries},
}}

Expand All @@ -150,7 +150,7 @@ func TestFlushMaxAge(t *testing.T) {
time.Sleep(2 * cfg.FlushCheckPeriod)

// assert stream is now both batches
store.checkData(t, map[string][]*logproto.Stream{
store.checkData(t, map[string][]logproto.Stream{
userID: {
{Labels: model.LabelSet{"app": "l"}.String(), Entries: append(firstEntries, secondEntries...)},
},
Expand Down Expand Up @@ -235,11 +235,11 @@ func (s *testStore) LazyQuery(ctx context.Context, req logql.SelectParams) (iter

func (s *testStore) Stop() {}

func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]*logproto.Stream {
func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream {
userIDs := []string{"1", "2", "3"}

// Create test samples.
testData := map[string][]*logproto.Stream{}
testData := map[string][]logproto.Stream{}
for i, userID := range userIDs {
testData[userID] = buildTestStreams(i)
}
Expand All @@ -255,8 +255,8 @@ func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]*logp
return testData
}

func buildTestStreams(offset int) []*logproto.Stream {
var m []*logproto.Stream
func buildTestStreams(offset int) []logproto.Stream {
var m []logproto.Stream
for i := 0; i < numSeries; i++ {
ss := logproto.Stream{
Labels: model.Metric{
Expand All @@ -270,7 +270,7 @@ func buildTestStreams(offset int) []*logproto.Stream {
Line: "line",
})
}
m = append(m, &ss)
m = append(m, ss)
}

sort.Slice(m, func(i, j int) bool {
Expand All @@ -281,15 +281,15 @@ func buildTestStreams(offset int) []*logproto.Stream {
}

// check that the store is holding data equivalent to what we expect
func (s *testStore) checkData(t *testing.T, testData map[string][]*logproto.Stream) {
func (s *testStore) checkData(t *testing.T, testData map[string][]logproto.Stream) {
for userID, expected := range testData {
streams := s.getStreamsForUser(t, userID)
require.Equal(t, expected, streams)
}
}

func (s *testStore) getStreamsForUser(t *testing.T, userID string) []*logproto.Stream {
var streams []*logproto.Stream
func (s *testStore) getStreamsForUser(t *testing.T, userID string) []logproto.Stream {
var streams []logproto.Stream
for _, c := range s.getChunksForUser(userID) {
lokiChunk := c.Data.(*chunkenc.Facade).LokiChunk()
streams = append(streams, buildStreamsFromChunk(t, c.Metric.String(), lokiChunk))
Expand All @@ -307,11 +307,11 @@ func (s *testStore) getChunksForUser(userID string) []chunk.Chunk {
return s.chunks[userID]
}

func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream {
func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) logproto.Stream {
it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, nil)
require.NoError(t, err)

stream := &logproto.Stream{
stream := logproto.Stream{
Labels: labels,
}
for it.Next() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestIngester(t *testing.T) {
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

req := logproto.PushRequest{
Streams: []*logproto.Stream{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) {
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

req := logproto.PushRequest{
Streams: []*logproto.Stream{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package ingester

import (
"context"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/util/validation"
"net/http"
"sync"
"time"

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

const queryBatchSize = 128
Expand Down Expand Up @@ -150,7 +150,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
return appendErr
}

func (i *instance) getOrCreateStream(pushReqStream *logproto.Stream) (*stream, error) {
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, error) {
labels, err := util.ToClientLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestLabelsCollisions(t *testing.T) {
tt := time.Now().Add(-5 * time.Minute)

// Notice how labels aren't sorted.
err = i.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{
err = i.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{
// both label sets have FastFingerprint=e002a3a451262627
{Labels: "{app=\"l\",uniq0=\"0\",uniq1=\"1\"}", Entries: entries(5, tt.Add(time.Minute))},
{Labels: "{uniq0=\"1\",app=\"m\",uniq1=\"1\"}", Entries: entries(5, tt)},
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestConcurrentPushes(t *testing.T) {
tt := time.Now().Add(-5 * time.Minute)

for i := 0; i < iterations; i++ {
err := inst.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{
err := inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{
{Labels: labels, Entries: entries(entriesPerIteration, tt)},
}})

Expand Down Expand Up @@ -122,7 +122,7 @@ func TestSyncPeriod(t *testing.T) {
result = append(result, logproto.Entry{Timestamp: tt, Line: fmt.Sprintf("hello %d", i)})
tt = tt.Add(time.Duration(1 + rand.Int63n(randomStep.Nanoseconds())))
}
pr := &logproto.PushRequest{Streams: []*logproto.Stream{{Labels: lbls, Entries: result}}}
pr := &logproto.PushRequest{Streams: []logproto.Stream{{Labels: lbls, Entries: result}}}
err = inst.Push(context.Background(), pr)
require.NoError(t, err)

Expand Down
28 changes: 15 additions & 13 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ type stream struct {
cfg *Config
// Newest chunk at chunks[n-1].
// Not thread-safe; assume accesses to this are locked by caller.
chunks []chunkDesc
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels labels.Labels
factory func() chunkenc.Chunk
lastLine line
chunks []chunkDesc
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels labels.Labels
labelsString string
factory func() chunkenc.Chunk
lastLine line

tailers map[uint32]*tailer
tailerMtx sync.RWMutex
Expand All @@ -86,11 +87,12 @@ type entryWithError struct {

func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, factory func() chunkenc.Chunk) *stream {
return &stream{
cfg: cfg,
fp: fp,
labels: labels,
factory: factory,
tailers: map[uint32]*tailer{},
cfg: cfg,
fp: fp,
labels: labels,
labelsString: labels.String(),
factory: factory,
tailers: map[uint32]*tailer{},
}
}

Expand Down Expand Up @@ -172,7 +174,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize

if len(storedEntries) != 0 {
go func() {
stream := logproto.Stream{Labels: s.labels.String(), Entries: storedEntries}
stream := logproto.Stream{Labels: s.labelsString, Entries: storedEntries}

closedTailers := []uint32{}

Expand Down Expand Up @@ -202,7 +204,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize
if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
// return bad http status request response with all failed entries
buf := bytes.Buffer{}
streamName := s.labels.String()
streamName := s.labelsString

limitedFailedEntries := failedEntriesWithError
if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore {
Expand Down Expand Up @@ -272,7 +274,7 @@ func (s *stream) Iterator(ctx context.Context, from, through time.Time, directio
}
}

return iter.NewNonOverlappingIterator(iterators, s.labels.String()), nil
return iter.NewNonOverlappingIterator(iterators, s.labelsString), nil
}

func (s *stream) addTailer(t *tailer) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestTransferOut(t *testing.T) {
// Push some data into our original ingester
ctx := user.InjectOrgID(context.Background(), "test")
_, err := ing.Push(ctx, &logproto.PushRequest{
Streams: []*logproto.Stream{
Streams: []logproto.Stream{
{
Entries: []logproto.Entry{
{Line: "line 0", Timestamp: time.Unix(0, 0)},
Expand All @@ -59,7 +59,7 @@ func TestTransferOut(t *testing.T) {

// verify we get out of order exception on adding an entry with older timestamps
_, err2 := ing.Push(ctx, &logproto.PushRequest{
Streams: []*logproto.Stream{
Streams: []logproto.Stream{
{
Entries: []logproto.Entry{
{Line: "out of order line", Timestamp: time.Unix(0, 0)},
Expand Down
8 changes: 4 additions & 4 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type streamIterator struct {
}

// NewStreamIterator iterates over entries in a stream.
func NewStreamIterator(stream *logproto.Stream) EntryIterator {
func NewStreamIterator(stream logproto.Stream) EntryIterator {
return &streamIterator{
i: -1,
entries: stream.Entries,
Expand Down Expand Up @@ -355,7 +355,7 @@ func (i *heapIterator) Len() int {
}

// NewStreamsIterator returns an iterator over logproto.Stream
func NewStreamsIterator(ctx context.Context, streams []*logproto.Stream, direction logproto.Direction) EntryIterator {
func NewStreamsIterator(ctx context.Context, streams []logproto.Stream, direction logproto.Direction) EntryIterator {
is := make([]EntryIterator, 0, len(streams))
for i := range streams {
is = append(is, NewStreamIterator(streams[i]))
Expand Down Expand Up @@ -603,10 +603,10 @@ func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, e
}

result := logproto.QueryResponse{
Streams: make([]*logproto.Stream, 0, len(streams)),
Streams: make([]logproto.Stream, 0, len(streams)),
}
for _, stream := range streams {
result.Streams = append(result.Streams, stream)
result.Streams = append(result.Streams, *stream)
}
return &result, respSize, i.Error()
}
Expand Down
Loading