Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loki: Improve Tailer loop #877

Merged
merged 1 commit into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestEntryNeverReceived(t *testing.T) {
found := []time.Time{t1, t3, t4, t5}

mr := &mockReader{found}
maxWait := 5 * time.Millisecond
maxWait := 50 * time.Millisecond
c := NewComparator(actual, maxWait, 2*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), mr)

c.entrySent(t1)
Expand Down
8 changes: 8 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (
"github.com/grafana/loki/pkg/storage"
)

const (
// How long the Tailer should wait - once there are no entries to read from ingesters -
// before checking if a new entry is available (to avoid spinning the CPU in a continuous
// check loop)
tailerWaitEntryThrottle = time.Second / 2
)

var readinessProbeSuccess = []byte("Ready")

// Config for a querier.
Expand Down Expand Up @@ -308,6 +315,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
return q.tailDisconnectedIngesters(tailCtx, req, connectedIngestersAddr)
},
q.cfg.TailMaxDuration,
tailerWaitEntryThrottle,
), nil
}

Expand Down
45 changes: 44 additions & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"errors"
"fmt"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -107,10 +108,13 @@ func (c *queryClientMock) RecvMsg(m interface{}) error {
type tailClientMock struct {
util.ExtendedMock
logproto.Querier_TailClient
recvTrigger chan time.Time
}

func newTailClientMock() *tailClientMock {
return &tailClientMock{}
return &tailClientMock{
recvTrigger: make(chan time.Time, 10),
}
}

func (c *tailClientMock) Recv() (*logproto.TailResponse, error) {
Expand Down Expand Up @@ -138,6 +142,20 @@ func (c *tailClientMock) RecvMsg(m interface{}) error {
return nil
}

func (c *tailClientMock) mockRecvWithTrigger(response *logproto.TailResponse) *tailClientMock {
c.On("Recv").WaitUntil(c.recvTrigger).Return(response, nil)

return c
}

// triggerRecv triggers the Recv() mock to return from the next invocation
// or from the current invocation if was already called and waiting for the
// trigger. This method works if and only if the Recv() has been mocked with
// mockRecvWithTrigger().
func (c *tailClientMock) triggerRecv() {
c.recvTrigger <- time.Now()
}

// storeMock is a mockable version of Loki's storage, used in querier unit tests
// to control the behaviour of the store without really hitting any storage backend
type storeMock struct {
Expand Down Expand Up @@ -227,3 +245,28 @@ func mockReadRingWithOneActiveIngester() *readRingMock {
{Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}},
})
}

// mockStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from
func mockStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockStream(from, quantity))
}

// mockStream return a stream with quantity entries, where entries timestamp and
// line string are constructed as sequential numbers starting at from
func mockStream(from int, quantity int) *logproto.Stream {
entries := make([]logproto.Entry, 0, quantity)

for i := from; i < from+quantity; i++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("line %d", i),
})
}

return &logproto.Stream{
Entries: entries,
Labels: `{type="test"}`,
}
}
36 changes: 5 additions & 31 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
Expand All @@ -30,10 +29,10 @@ func TestQuerier_Query_QueryTimeoutConfigFlag(t *testing.T) {
}

store := newStoreMock()
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(), nil)
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream()}), nil)
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, &request, mock.Anything).Return(queryClient, nil)
Expand Down Expand Up @@ -119,13 +118,13 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) {
}

store := newStoreMock()
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(), nil)
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream()}), nil)
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil)

tailClient := newTailClientMock()
tailClient.On("Recv").Return(mockTailResponse(mockStream()), nil)
tailClient.On("Recv").Return(mockTailResponse(mockStream(1, 2)), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil)
Expand Down Expand Up @@ -170,24 +169,6 @@ func mockQuerierConfig() Config {
}
}

func mockStreamIterator() iter.EntryIterator {
return iter.NewStreamIterator(mockStream())
}

func mockStream() *logproto.Stream {
entries := []logproto.Entry{
{Timestamp: time.Now(), Line: "line 1"},
{Timestamp: time.Now(), Line: "line 2"},
}

labels := "{type=\"test\"}"

return &logproto.Stream{
Entries: entries,
Labels: labels,
}
}

func mockQueryResponse(streams []*logproto.Stream) *logproto.QueryResponse {
return &logproto.QueryResponse{
Streams: streams,
Expand All @@ -199,10 +180,3 @@ func mockLabelResponse(values []string) *logproto.LabelResponse {
Values: values,
}
}

func mockTailResponse(stream *logproto.Stream) *logproto.TailResponse {
return &logproto.TailResponse{
Stream: stream,
DroppedStreams: []*logproto.DroppedStream{},
}
}
Loading