diff --git a/pkg/logproto/extensions.go b/pkg/logproto/extensions.go new file mode 100644 index 000000000000..ecd2b4dbe36a --- /dev/null +++ b/pkg/logproto/extensions.go @@ -0,0 +1,12 @@ +package logproto + +import "github.com/prometheus/prometheus/pkg/labels" + +type SeriesIdentifiers []SeriesIdentifier + +func (ids SeriesIdentifiers) Len() int { return len(ids) } +func (ids SeriesIdentifiers) Swap(i, j int) { ids[i], ids[j] = ids[j], ids[i] } +func (ids SeriesIdentifiers) Less(i, j int) bool { + a, b := labels.FromMap(ids[i].Labels), labels.FromMap(ids[j].Labels) + return labels.Compare(a, b) <= 0 +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8b9a04a656b2..9da654cdf5bf 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -24,7 +24,6 @@ import ( "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/logql/marshal" "github.com/grafana/loki/pkg/logql/stats" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/util/validation" @@ -484,12 +483,9 @@ func (q *Querier) seriesForMatchers( groups []string, ) ([]logproto.SeriesIdentifier, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - var results []logproto.SeriesIdentifier for _, group := range groups { - iter, err := q.store.LazyQuery(ctx, logql.SelectParams{ + ids, err := q.store.GetSeries(ctx, logql.SelectParams{ QueryRequest: &logproto.QueryRequest{ Selector: group, Limit: 1, @@ -502,19 +498,10 @@ func (q *Querier) seriesForMatchers( return nil, err } - for iter.Next() { - ls, err := marshal.NewLabelSet(iter.Labels()) - if err != nil { - return nil, err - } + results = append(results, ids...) - results = append(results, logproto.SeriesIdentifier{ - Labels: ls.Map(), - }) - } } return results, nil - } func (q *Querier) validateQueryRequest(ctx context.Context, req *logproto.QueryRequest) error { diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 5c68d5db4b95..a7b4afd9b0a1 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -243,6 +243,15 @@ func (s *storeMock) DeleteSeriesIDs(ctx context.Context, from, through model.Tim panic("don't call me please") } +func (s *storeMock) GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) { + args := s.Called(ctx, req) + res := args.Get(0) + if res == nil { + return []logproto.SeriesIdentifier(nil), args.Error(1) + } + return res.([]logproto.SeriesIdentifier), args.Error(1) +} + func (s *storeMock) Stop() { } @@ -314,15 +323,6 @@ func mockStreamIterator(from int, quantity int) iter.EntryIterator { return iter.NewStreamIterator(mockStream(from, quantity)) } -func mockStreamIterFromLabelSets(from, quantity int, sets []string) iter.EntryIterator { - var streams []*logproto.Stream - for _, s := range sets { - streams = append(streams, mockStreamWithLabels(from, quantity, s)) - } - - return iter.NewStreamsIterator(context.Background(), streams, logproto.FORWARD) -} - // mockStream return a stream with quantity entries, where entries timestamp and // line string are constructed as sequential numbers starting at from func mockStream(from int, quantity int) *logproto.Stream { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 6364c9f3a941..6c332a847ca3 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -329,7 +329,7 @@ func TestQuerier_SeriesAPI(t *testing.T) { func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) { ingester.On("Series", mock.Anything, req, mock.Anything).Return(nil, errors.New("tst-err")) - store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(0, 0), nil) + store.On("GetSeries", mock.Anything, mock.Anything).Return(nil, nil) }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") @@ -345,7 +345,7 @@ func TestQuerier_SeriesAPI(t *testing.T) { {"a": "1"}, }), nil) - store.On("LazyQuery", mock.Anything, mock.Anything).Return(nil, context.DeadlineExceeded) + store.On("GetSeries", mock.Anything, mock.Anything).Return(nil, context.DeadlineExceeded) }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") @@ -358,9 +358,7 @@ func TestQuerier_SeriesAPI(t *testing.T) { mkReq([]string{`{a="1"}`}), func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) { ingester.On("Series", mock.Anything, req, mock.Anything).Return(mockSeriesResponse(nil), nil) - - store.On("LazyQuery", mock.Anything, mock.Anything). - Return(mockStreamIterator(0, 0), nil) + store.On("GetSeries", mock.Anything, mock.Anything).Return(nil, nil) }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") @@ -378,11 +376,10 @@ func TestQuerier_SeriesAPI(t *testing.T) { {"a": "1", "b": "3"}, }), nil) - store.On("LazyQuery", mock.Anything, mock.Anything). - Return(mockStreamIterFromLabelSets(0, 10, []string{ - `{a="1",b="4"}`, - `{a="1",b="5"}`, - }), nil) + store.On("GetSeries", mock.Anything, mock.Anything).Return([]logproto.SeriesIdentifier{ + {Labels: map[string]string{"a": "1", "b": "4"}}, + {Labels: map[string]string{"a": "1", "b": "5"}}, + }, nil) }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") @@ -404,11 +401,11 @@ func TestQuerier_SeriesAPI(t *testing.T) { {"a": "1", "b": "2"}, }), nil) - store.On("LazyQuery", mock.Anything, mock.Anything). - Return(mockStreamIterFromLabelSets(0, 10, []string{ - `{a="1",b="2"}`, - `{a="1",b="3"}`, - }), nil) + store.On("GetSeries", mock.Anything, mock.Anything).Return([]logproto.SeriesIdentifier{ + {Labels: map[string]string{"a": "1", "b": "2"}}, + {Labels: map[string]string{"a": "1", "b": "3"}}, + }, nil) + }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") diff --git a/pkg/storage/store.go b/pkg/storage/store.go index aea3cc0b2a3d..b03992c45b1f 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -3,6 +3,7 @@ package storage import ( "context" "flag" + "sort" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -13,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/stats" "github.com/grafana/loki/pkg/util" @@ -34,6 +36,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type Store interface { chunk.Store LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) + GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) } type store struct { @@ -53,34 +56,38 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf }, nil } -// LazyQuery returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront -// for that request. -func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) { - storeStats := stats.GetStoreData(ctx) - +// decodeReq sanitizes an incoming request, rounds bounds, and appends the __name__ matcher +func decodeReq(req logql.SelectParams) ([]*labels.Matcher, logql.LineFilter, model.Time, model.Time, error) { expr, err := req.LogSelector() if err != nil { - return nil, err + return nil, nil, 0, 0, err } filter, err := expr.Filter() if err != nil { - return nil, err + return nil, nil, 0, 0, err } matchers := expr.Matchers() nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs") if err != nil { - return nil, err + return nil, nil, 0, 0, err } + matchers = append(matchers, nameLabelMatcher) + from, through := util.RoundToMilliseconds(req.Start, req.End) + return matchers, filter, from, through, nil +} + +// lazyChunks is an internal function used to resolve a set of lazy chunks from the store without actually loading them. It's used internally by `LazyQuery` and `GetSeries` +func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*chunkenc.LazyChunk, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err } - matchers = append(matchers, nameLabelMatcher) - from, through := util.RoundToMilliseconds(req.Start, req.End) + storeStats := stats.GetStoreData(ctx) + chks, fetchers, err := s.GetChunkRefs(ctx, userID, from, through, matchers...) if err != nil { return nil, err @@ -99,6 +106,87 @@ func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.Ent lazyChunks = append(lazyChunks, &chunkenc.LazyChunk{Chunk: c, Fetcher: fetchers[i]}) } } + return lazyChunks, nil +} + +func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) { + matchers, _, from, through, err := decodeReq(req) + if err != nil { + return nil, err + } + + lazyChunks, err := s.lazyChunks(ctx, matchers, from, through) + if err != nil { + return nil, err + } + + // group chunks by series + chunksBySeries := partitionBySeriesChunks(lazyChunks) + + firstChunksPerSeries := make([]*chunkenc.LazyChunk, 0, len(chunksBySeries)) + + // discard all but one chunk per series + for _, chks := range chunksBySeries { + firstChunksPerSeries = append(firstChunksPerSeries, chks[0][0]) + } + + results := make(logproto.SeriesIdentifiers, 0, len(firstChunksPerSeries)) + + // bound concurrency + groups := make([][]*chunkenc.LazyChunk, 0, len(firstChunksPerSeries)/s.cfg.MaxChunkBatchSize+1) + + split := s.cfg.MaxChunkBatchSize + if len(firstChunksPerSeries) < split { + split = len(firstChunksPerSeries) + } + + for split > 0 { + groups = append(groups, firstChunksPerSeries[:split]) + firstChunksPerSeries = firstChunksPerSeries[split:] + if len(firstChunksPerSeries) < split { + split = len(firstChunksPerSeries) + } + } + + for _, group := range groups { + err = fetchLazyChunks(ctx, group) + if err != nil { + return nil, err + } + + outer: + for _, chk := range group { + for _, matcher := range matchers { + if !matcher.Matches(chk.Chunk.Metric.Get(matcher.Name)) { + continue outer + } + } + + m := chk.Chunk.Metric.Map() + delete(m, labels.MetricName) + results = append(results, logproto.SeriesIdentifier{ + Labels: m, + }) + } + } + sort.Sort(results) + return results, nil + +} + +// LazyQuery returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront +// for that request. +func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) { + matchers, filter, from, through, err := decodeReq(req) + if err != nil { + return nil, err + } + + lazyChunks, err := s.lazyChunks(ctx, matchers, from, through) + if err != nil { + return nil, err + } + return newBatchChunkIterator(ctx, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, req.QueryRequest), nil } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index bb3ac17bf9a9..33cd9105c19d 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/chunk" @@ -19,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/marshal" "github.com/grafana/loki/pkg/util/validation" ) @@ -355,3 +357,75 @@ func Test_store_LazyQuery(t *testing.T) { }) } } + +func Test_store_GetSeries(t *testing.T) { + + tests := []struct { + name string + req *logproto.QueryRequest + expected []logproto.SeriesIdentifier + batchSize int + }{ + { + "all", + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD), + []logproto.SeriesIdentifier{ + {Labels: mustParseLabels("{foo=\"bar\"}")}, + {Labels: mustParseLabels("{foo=\"bazz\"}")}, + }, + 1, + }, + { + "all-single-batch", + newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD), + []logproto.SeriesIdentifier{ + {Labels: mustParseLabels("{foo=\"bar\"}")}, + {Labels: mustParseLabels("{foo=\"bazz\"}")}, + }, + 5, + }, + { + "regexp filter (post chunk fetching)", + newQuery("{foo=~\"bar.*\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD), + []logproto.SeriesIdentifier{ + {Labels: mustParseLabels("{foo=\"bar\"}")}, + }, + 1, + }, + { + "filter matcher", + newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD), + []logproto.SeriesIdentifier{ + {Labels: mustParseLabels("{foo=\"bar\"}")}, + }, + 1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &store{ + Store: storeFixture, + cfg: Config{ + MaxChunkBatchSize: tt.batchSize, + }, + } + ctx = user.InjectOrgID(context.Background(), "test-user") + out, err := s.GetSeries(ctx, logql.SelectParams{QueryRequest: tt.req}) + if err != nil { + t.Errorf("store.GetSeries() error = %v", err) + return + } + require.Equal(t, tt.expected, out) + }) + } +} + +func mustParseLabels(s string) map[string]string { + l, err := marshal.NewLabelSet(s) + + if err != nil { + log.Fatalf("Failed to parse %s", s) + } + + return l +}