Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds chunk filter hook for ingesters. #3603

Merged
merged 2 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"bytes"
"context"
fmt "fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -206,7 +207,7 @@ func newStreamsIterator(ing ingesterInstances) *streamIterator {
inst.streamsMtx.RLock()
streams := make([]*stream, 0, len(inst.streams))
inst.streamsMtx.RUnlock()
_ = inst.forAllStreams(func(s *stream) error {
_ = inst.forAllStreams(context.Background(), func(s *stream) error {
streams = append(streams, s)
return nil
})
Expand Down
7 changes: 2 additions & 5 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ func TestUnflushedChunks(t *testing.T) {
}

func TestIngesterWALBackpressureSegments(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)
Expand Down Expand Up @@ -287,7 +286,6 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
}

func TestIngesterWALBackpressureCheckpoint(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)
Expand Down Expand Up @@ -353,7 +351,6 @@ func expectCheckpoint(t *testing.T, walDir string, shouldExist bool, max time.Du
return
}
}

}

// mkPush makes approximately totalSize bytes of log lines across min(500, totalSize) streams
Expand Down Expand Up @@ -456,7 +453,7 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil)
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
instances = append(instances, inst)
Expand Down Expand Up @@ -506,7 +503,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

for i := range instances {
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil)
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil, nil)

require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/runtime"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -340,6 +341,8 @@ func (s *testStore) GetSchemaConfigs() []chunk.PeriodConfig {

func (s *testStore) Stop() {}

func (s *testStore) SetChunkFilterer(_ storage.RequestChunkFilterer) {}

func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream {
userIDs := []string{"1", "2", "3"}

Expand Down
10 changes: 8 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ type Ingester struct {
metrics *ingesterMetrics

wal WAL

chunkFilter storage.RequestChunkFilterer
}

// ChunkStore is the interface we need to store chunks.
Expand Down Expand Up @@ -220,6 +222,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
return i, nil
}

func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer) {
i.chunkFilter = chunkFilter
}

func (i *Ingester) starting(ctx context.Context) error {
if i.cfg.WAL.Enabled {
// Ignore retain period during wal replay.
Expand Down Expand Up @@ -404,7 +410,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch)
inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter)
i.instances[instanceID] = inst
}
return inst
Expand Down Expand Up @@ -677,7 +683,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
return err
}

if err := instance.addNewTailer(tailer); err != nil {
if err := instance.addNewTailer(queryServer.Context(), tailer); err != nil {
return err
}
tailer.loop()
Expand Down
5 changes: 4 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -284,6 +285,9 @@ func (s *mockStore) GetSchemaConfigs() []chunk.PeriodConfig {
return nil
}

func (s *mockStore) SetChunkFilterer(_ storage.RequestChunkFilterer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put the end curly bracket on the same line as the opening one.

}

type mockQuerierServer struct {
ctx context.Context
resps []*logproto.QueryResponse
Expand Down Expand Up @@ -448,7 +452,6 @@ func TestIngester_boltdbShipperMaxLookBack(t *testing.T) {
}

func TestValidate(t *testing.T) {

for i, tc := range []struct {
in Config
err bool
Expand Down
46 changes: 37 additions & 9 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -89,9 +90,11 @@ type instance struct {
flushOnShutdownSwitch *OnceSwitch

metrics *ingesterMetrics

chunkFilter storage.RequestChunkFilterer
}

func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runtime.TenantConfigs, wal WAL, metrics *ingesterMetrics, flushOnShutdownSwitch *OnceSwitch) *instance {
func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runtime.TenantConfigs, wal WAL, metrics *ingesterMetrics, flushOnShutdownSwitch *OnceSwitch, chunkFilter storage.RequestChunkFilterer) *instance {
i := &instance{
cfg: cfg,
streams: map[string]*stream{},
Expand All @@ -110,6 +113,8 @@ func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runt
wal: wal,
metrics: metrics,
flushOnShutdownSwitch: flushOnShutdownSwitch,

chunkFilter: chunkFilter,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
Expand Down Expand Up @@ -295,7 +300,9 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter

ingStats := stats.GetIngesterData(ctx)
var iters []iter.EntryIterator

err = i.forMatchingStreams(
ctx,
expr.Matchers(),
func(stream *stream) error {
iter, err := stream.Iterator(ctx, ingStats, req.Start, req.End, req.Direction, pipeline.ForStream(stream.labels))
Expand Down Expand Up @@ -326,6 +333,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
ingStats := stats.GetIngesterData(ctx)
var iters []iter.SampleIterator
err = i.forMatchingStreams(
ctx,
expr.Selector().Matchers(),
func(stream *stream) error {
iter, err := stream.SampleIterator(ctx, ingStats, req.Start, req.End, extractor.ForStream(stream.labels))
Expand Down Expand Up @@ -363,7 +371,7 @@ func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logpro
}, nil
}

func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
groups, err := loghttp.Match(req.GetGroups())
if err != nil {
return nil, err
Expand All @@ -374,7 +382,7 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp
// If no matchers were supplied we include all streams.
if len(groups) == 0 {
series = make([]logproto.SeriesIdentifier, 0, len(i.streams))
err = i.forAllStreams(func(stream *stream) error {
err = i.forAllStreams(ctx, func(stream *stream) error {
// consider the stream only if it overlaps the request time range
if shouldConsiderStream(stream, req) {
series = append(series, logproto.SeriesIdentifier{
Expand All @@ -389,7 +397,7 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp
} else {
dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(matchers, func(stream *stream) error {
err = i.forMatchingStreams(ctx, matchers, func(stream *stream) error {
// consider the stream only if it overlaps the request time range
if shouldConsiderStream(stream, req) {
// exit early when this stream was added by an earlier group
Expand Down Expand Up @@ -426,11 +434,18 @@ func (i *instance) numStreams() int {

// forAllStreams will execute a function for all streams in the instance.
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
func (i *instance) forAllStreams(fn func(*stream) error) error {
func (i *instance) forAllStreams(ctx context.Context, fn func(*stream) error) error {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()
var chunkFilter storage.ChunkFilterer
if i.chunkFilter != nil {
chunkFilter = i.chunkFilter.ForRequest(ctx)
}

for _, stream := range i.streams {
if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) {
continue
}
err := fn(stream)
if err != nil {
return err
Expand All @@ -442,6 +457,7 @@ func (i *instance) forAllStreams(fn func(*stream) error) error {
// forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc).
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
func (i *instance) forMatchingStreams(
ctx context.Context,
matchers []*labels.Matcher,
fn func(*stream) error,
) error {
Expand All @@ -450,7 +466,10 @@ func (i *instance) forMatchingStreams(

filters, matchers := cutil.SplitFiltersAndMatchers(matchers)
ids := i.index.Lookup(matchers)

var chunkFilter storage.ChunkFilterer
if i.chunkFilter != nil {
chunkFilter = i.chunkFilter.ForRequest(ctx)
}
outer:
for _, streamID := range ids {
stream, ok := i.streamsByFP[streamID]
Expand All @@ -462,7 +481,9 @@ outer:
continue outer
}
}

if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) {
continue
}
err := fn(stream)
if err != nil {
return err
Expand All @@ -471,8 +492,8 @@ outer:
return nil
}

func (i *instance) addNewTailer(t *tailer) error {
if err := i.forMatchingStreams(t.matchers, func(s *stream) error {
func (i *instance) addNewTailer(ctx context.Context, t *tailer) error {
if err := i.forMatchingStreams(ctx, t.matchers, func(s *stream) error {
s.addTailer(t)
return nil
}); err != nil {
Expand All @@ -494,8 +515,15 @@ func (i *instance) addTailersToNewStream(stream *stream) {
if t.isClosed() {
continue
}
var chunkFilter storage.ChunkFilterer
if i.chunkFilter != nil {
chunkFilter = i.chunkFilter.ForRequest(t.conn.Context())
}

if isMatching(stream.labels, t.matchers) {
if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) {
continue
}
stream.addTailer(t)
}
}
Expand Down
Loading