From 9f3ab5180125468976c233347622c66c23e89b6d Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 7 Apr 2020 15:08:25 -0400 Subject: [PATCH 1/4] only fetches one chunk per series in /series --- pkg/querier/querier.go | 17 +---- pkg/querier/querier_mock_test.go | 18 +++--- pkg/querier/querier_test.go | 27 ++++---- pkg/storage/store.go | 105 ++++++++++++++++++++++++++++--- pkg/storage/store_test.go | 61 ++++++++++++++++++ 5 files changed, 179 insertions(+), 49 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8b9a04a656b2..9da654cdf5bf 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -24,7 +24,6 @@ import ( "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/logql/marshal" "github.com/grafana/loki/pkg/logql/stats" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/util/validation" @@ -484,12 +483,9 @@ func (q *Querier) seriesForMatchers( groups []string, ) ([]logproto.SeriesIdentifier, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - var results []logproto.SeriesIdentifier for _, group := range groups { - iter, err := q.store.LazyQuery(ctx, logql.SelectParams{ + ids, err := q.store.GetSeries(ctx, logql.SelectParams{ QueryRequest: &logproto.QueryRequest{ Selector: group, Limit: 1, @@ -502,19 +498,10 @@ func (q *Querier) seriesForMatchers( return nil, err } - for iter.Next() { - ls, err := marshal.NewLabelSet(iter.Labels()) - if err != nil { - return nil, err - } + results = append(results, ids...) - results = append(results, logproto.SeriesIdentifier{ - Labels: ls.Map(), - }) - } } return results, nil - } func (q *Querier) validateQueryRequest(ctx context.Context, req *logproto.QueryRequest) error { diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 5c68d5db4b95..a7b4afd9b0a1 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -243,6 +243,15 @@ func (s *storeMock) DeleteSeriesIDs(ctx context.Context, from, through model.Tim panic("don't call me please") } +func (s *storeMock) GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) { + args := s.Called(ctx, req) + res := args.Get(0) + if res == nil { + return []logproto.SeriesIdentifier(nil), args.Error(1) + } + return res.([]logproto.SeriesIdentifier), args.Error(1) +} + func (s *storeMock) Stop() { } @@ -314,15 +323,6 @@ func mockStreamIterator(from int, quantity int) iter.EntryIterator { return iter.NewStreamIterator(mockStream(from, quantity)) } -func mockStreamIterFromLabelSets(from, quantity int, sets []string) iter.EntryIterator { - var streams []*logproto.Stream - for _, s := range sets { - streams = append(streams, mockStreamWithLabels(from, quantity, s)) - } - - return iter.NewStreamsIterator(context.Background(), streams, logproto.FORWARD) -} - // mockStream return a stream with quantity entries, where entries timestamp and // line string are constructed as sequential numbers starting at from func mockStream(from int, quantity int) *logproto.Stream { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 6364c9f3a941..6c332a847ca3 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -329,7 +329,7 @@ func TestQuerier_SeriesAPI(t *testing.T) { func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) { ingester.On("Series", mock.Anything, req, mock.Anything).Return(nil, errors.New("tst-err")) - store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(0, 0), nil) + store.On("GetSeries", mock.Anything, mock.Anything).Return(nil, nil) }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") @@ -345,7 +345,7 @@ func TestQuerier_SeriesAPI(t *testing.T) { {"a": "1"}, }), nil) - store.On("LazyQuery", mock.Anything, mock.Anything).Return(nil, context.DeadlineExceeded) + store.On("GetSeries", mock.Anything, mock.Anything).Return(nil, context.DeadlineExceeded) }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") @@ -358,9 +358,7 @@ func TestQuerier_SeriesAPI(t *testing.T) { mkReq([]string{`{a="1"}`}), func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) { ingester.On("Series", mock.Anything, req, mock.Anything).Return(mockSeriesResponse(nil), nil) - - store.On("LazyQuery", mock.Anything, mock.Anything). - Return(mockStreamIterator(0, 0), nil) + store.On("GetSeries", mock.Anything, mock.Anything).Return(nil, nil) }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") @@ -378,11 +376,10 @@ func TestQuerier_SeriesAPI(t *testing.T) { {"a": "1", "b": "3"}, }), nil) - store.On("LazyQuery", mock.Anything, mock.Anything). - Return(mockStreamIterFromLabelSets(0, 10, []string{ - `{a="1",b="4"}`, - `{a="1",b="5"}`, - }), nil) + store.On("GetSeries", mock.Anything, mock.Anything).Return([]logproto.SeriesIdentifier{ + {Labels: map[string]string{"a": "1", "b": "4"}}, + {Labels: map[string]string{"a": "1", "b": "5"}}, + }, nil) }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") @@ -404,11 +401,11 @@ func TestQuerier_SeriesAPI(t *testing.T) { {"a": "1", "b": "2"}, }), nil) - store.On("LazyQuery", mock.Anything, mock.Anything). - Return(mockStreamIterFromLabelSets(0, 10, []string{ - `{a="1",b="2"}`, - `{a="1",b="3"}`, - }), nil) + store.On("GetSeries", mock.Anything, mock.Anything).Return([]logproto.SeriesIdentifier{ + {Labels: map[string]string{"a": "1", "b": "2"}}, + {Labels: map[string]string{"a": "1", "b": "3"}}, + }, nil) + }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") diff --git a/pkg/storage/store.go b/pkg/storage/store.go index aea3cc0b2a3d..5530dee6784f 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/stats" "github.com/grafana/loki/pkg/util" @@ -34,6 +35,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type Store interface { chunk.Store LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) + GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) } type store struct { @@ -53,34 +55,37 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf }, nil } -// LazyQuery returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront -// for that request. -func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) { - storeStats := stats.GetStoreData(ctx) - +// decodeReq sanitizes an incoming request, rounds bounds, and appends the __name__ matcher +func decodeReq(req logql.SelectParams) ([]*labels.Matcher, logql.LineFilter, model.Time, model.Time, error) { expr, err := req.LogSelector() if err != nil { - return nil, err + return nil, nil, 0, 0, err } filter, err := expr.Filter() if err != nil { - return nil, err + return nil, nil, 0, 0, err } matchers := expr.Matchers() nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs") if err != nil { - return nil, err + return nil, nil, 0, 0, err } + matchers = append(matchers, nameLabelMatcher) + from, through := util.RoundToMilliseconds(req.Start, req.End) + return matchers, filter, from, through, nil +} + +func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*chunkenc.LazyChunk, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err } - matchers = append(matchers, nameLabelMatcher) - from, through := util.RoundToMilliseconds(req.Start, req.End) + storeStats := stats.GetStoreData(ctx) + chks, fetchers, err := s.GetChunkRefs(ctx, userID, from, through, matchers...) if err != nil { return nil, err @@ -99,6 +104,86 @@ func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.Ent lazyChunks = append(lazyChunks, &chunkenc.LazyChunk{Chunk: c, Fetcher: fetchers[i]}) } } + return lazyChunks, nil +} + +func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) { + matchers, _, from, through, err := decodeReq(req) + if err != nil { + return nil, err + } + + lazyChunks, err := s.lazyChunks(ctx, matchers, from, through) + if err != nil { + return nil, err + } + + // group chunks by series + m := partitionBySeriesChunks(lazyChunks) + + firstChunksPerSeries := make([]*chunkenc.LazyChunk, 0, len(m)) + + // discard all but one chunk per series + for _, chks := range m { + firstChunksPerSeries = append(firstChunksPerSeries, chks[0][0]) + } + + results := make([]logproto.SeriesIdentifier, 0, len(firstChunksPerSeries)) + + // bound concurrency + groups := make([][]*chunkenc.LazyChunk, 0, len(firstChunksPerSeries)/s.cfg.MaxChunkBatchSize+1) + + split := s.cfg.MaxChunkBatchSize + if len(firstChunksPerSeries) < split { + split = len(firstChunksPerSeries) + } + + for split > 0 { + groups = append(groups, firstChunksPerSeries[:split]) + firstChunksPerSeries = firstChunksPerSeries[split:] + if len(firstChunksPerSeries) < split { + split = len(firstChunksPerSeries) + } + } + + for _, group := range groups { + err = fetchLazyChunks(ctx, group) + if err != nil { + return nil, err + } + + outer: + for _, chk := range group { + for _, matcher := range matchers { + if !matcher.Matches(chk.Chunk.Metric.Get(matcher.Name)) { + continue outer + } + } + + m := chk.Chunk.Metric.Map() + delete(m, labels.MetricName) + results = append(results, logproto.SeriesIdentifier{ + Labels: m, + }) + } + } + return results, nil + +} + +// LazyQuery returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront +// for that request. +func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) { + matchers, filter, from, through, err := decodeReq(req) + if err != nil { + return nil, err + } + + lazyChunks, err := s.lazyChunks(ctx, matchers, from, through) + if err != nil { + return nil, err + } + return newBatchChunkIterator(ctx, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, req.QueryRequest), nil } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index bb3ac17bf9a9..5d178174d8ea 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/chunk" @@ -19,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/marshal" "github.com/grafana/loki/pkg/util/validation" ) @@ -355,3 +357,62 @@ func Test_store_LazyQuery(t *testing.T) { }) } } + +func Test_store_GetSeries(t *testing.T) { + + tests := []struct { + name string + req *logproto.QueryRequest + expected []logproto.SeriesIdentifier + }{ + { + "all", + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD), + []logproto.SeriesIdentifier{ + {Labels: mustParseLabels("{foo=\"bar\"}")}, + {Labels: mustParseLabels("{foo=\"bazz\"}")}, + }, + }, + { + "regexp filter (post chunk fetching)", + newQuery("{foo=~\"bar.*\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD), + []logproto.SeriesIdentifier{ + {Labels: mustParseLabels("{foo=\"bar\"}")}, + }, + }, + { + "filter matcher", + newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD), + []logproto.SeriesIdentifier{ + {Labels: mustParseLabels("{foo=\"bar\"}")}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &store{ + Store: storeFixture, + cfg: Config{ + MaxChunkBatchSize: 1, + }, + } + ctx = user.InjectOrgID(context.Background(), "test-user") + out, err := s.GetSeries(ctx, logql.SelectParams{QueryRequest: tt.req}) + if err != nil { + t.Errorf("store.GetSeries() error = %v", err) + return + } + require.Equal(t, tt.expected, out) + }) + } +} + +func mustParseLabels(s string) map[string]string { + l, err := marshal.NewLabelSet(s) + + if err != nil { + log.Fatalf("Failed to parse %s", s) + } + + return l +} From 61e00a96aa73e129ec408b2c6438cc595e848163 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 8 Apr 2020 09:50:57 -0400 Subject: [PATCH 2/4] Update pkg/storage/store.go --- pkg/storage/store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 5530dee6784f..9ed453c61659 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -78,6 +78,7 @@ func decodeReq(req logql.SelectParams) ([]*labels.Matcher, logql.LineFilter, mod return matchers, filter, from, through, nil } +// lazyChunks is an internal function used to resolve a set of lazy chunks from the store without actually loading them. It's used internally by `LazyQuery` and `GetSeries` func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*chunkenc.LazyChunk, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { From 6369140a99ee9e549fc14a5df3082bf8ecbc8036 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 8 Apr 2020 09:59:12 -0400 Subject: [PATCH 3/4] chunksBySeries naming --- pkg/storage/store.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 9ed453c61659..b28cd65b1ff7 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -120,12 +120,12 @@ func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logpro } // group chunks by series - m := partitionBySeriesChunks(lazyChunks) + chunksBySeries := partitionBySeriesChunks(lazyChunks) - firstChunksPerSeries := make([]*chunkenc.LazyChunk, 0, len(m)) + firstChunksPerSeries := make([]*chunkenc.LazyChunk, 0, len(chunksBySeries)) // discard all but one chunk per series - for _, chks := range m { + for _, chks := range chunksBySeries { firstChunksPerSeries = append(firstChunksPerSeries, chks[0][0]) } From 8c1cc144db8a72f7484301aca48fe7f2cdf237a3 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 8 Apr 2020 10:49:02 -0400 Subject: [PATCH 4/4] sorts series response, adds tests for chunks < batchsize --- pkg/logproto/extensions.go | 12 ++++++++++++ pkg/storage/store.go | 4 +++- pkg/storage/store_test.go | 21 +++++++++++++++++---- 3 files changed, 32 insertions(+), 5 deletions(-) create mode 100644 pkg/logproto/extensions.go diff --git a/pkg/logproto/extensions.go b/pkg/logproto/extensions.go new file mode 100644 index 000000000000..ecd2b4dbe36a --- /dev/null +++ b/pkg/logproto/extensions.go @@ -0,0 +1,12 @@ +package logproto + +import "github.com/prometheus/prometheus/pkg/labels" + +type SeriesIdentifiers []SeriesIdentifier + +func (ids SeriesIdentifiers) Len() int { return len(ids) } +func (ids SeriesIdentifiers) Swap(i, j int) { ids[i], ids[j] = ids[j], ids[i] } +func (ids SeriesIdentifiers) Less(i, j int) bool { + a, b := labels.FromMap(ids[i].Labels), labels.FromMap(ids[j].Labels) + return labels.Compare(a, b) <= 0 +} diff --git a/pkg/storage/store.go b/pkg/storage/store.go index b28cd65b1ff7..b03992c45b1f 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -3,6 +3,7 @@ package storage import ( "context" "flag" + "sort" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -129,7 +130,7 @@ func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logpro firstChunksPerSeries = append(firstChunksPerSeries, chks[0][0]) } - results := make([]logproto.SeriesIdentifier, 0, len(firstChunksPerSeries)) + results := make(logproto.SeriesIdentifiers, 0, len(firstChunksPerSeries)) // bound concurrency groups := make([][]*chunkenc.LazyChunk, 0, len(firstChunksPerSeries)/s.cfg.MaxChunkBatchSize+1) @@ -168,6 +169,7 @@ func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logpro }) } } + sort.Sort(results) return results, nil } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 5d178174d8ea..33cd9105c19d 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -361,9 +361,10 @@ func Test_store_LazyQuery(t *testing.T) { func Test_store_GetSeries(t *testing.T) { tests := []struct { - name string - req *logproto.QueryRequest - expected []logproto.SeriesIdentifier + name string + req *logproto.QueryRequest + expected []logproto.SeriesIdentifier + batchSize int }{ { "all", @@ -372,6 +373,16 @@ func Test_store_GetSeries(t *testing.T) { {Labels: mustParseLabels("{foo=\"bar\"}")}, {Labels: mustParseLabels("{foo=\"bazz\"}")}, }, + 1, + }, + { + "all-single-batch", + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD), + []logproto.SeriesIdentifier{ + {Labels: mustParseLabels("{foo=\"bar\"}")}, + {Labels: mustParseLabels("{foo=\"bazz\"}")}, + }, + 5, }, { "regexp filter (post chunk fetching)", @@ -379,6 +390,7 @@ func Test_store_GetSeries(t *testing.T) { []logproto.SeriesIdentifier{ {Labels: mustParseLabels("{foo=\"bar\"}")}, }, + 1, }, { "filter matcher", @@ -386,6 +398,7 @@ func Test_store_GetSeries(t *testing.T) { []logproto.SeriesIdentifier{ {Labels: mustParseLabels("{foo=\"bar\"}")}, }, + 1, }, } for _, tt := range tests { @@ -393,7 +406,7 @@ func Test_store_GetSeries(t *testing.T) { s := &store{ Store: storeFixture, cfg: Config{ - MaxChunkBatchSize: 1, + MaxChunkBatchSize: tt.batchSize, }, } ctx = user.InjectOrgID(context.Background(), "test-user")