Skip to content

Commit

Permalink
Improved Tailer loop (dropped entries, logic refactoring, introduced …
Browse files Browse the repository at this point in the history
…tests)
  • Loading branch information
pracucci authored and sandeepsukhani committed Aug 12, 2019
1 parent 1329890 commit b918532
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 103 deletions.
2 changes: 1 addition & 1 deletion pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestEntryNeverReceived(t *testing.T) {
found := []time.Time{t1, t3, t4, t5}

mr := &mockReader{found}
maxWait := 5 * time.Millisecond
maxWait := 50 * time.Millisecond
c := NewComparator(actual, maxWait, 2*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), mr)

c.entrySent(t1)
Expand Down
8 changes: 8 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (
"github.com/grafana/loki/pkg/storage"
)

const (
// How long the Tailer should wait - once there are no entries to read from ingesters -
// before checking if a new entry is available (to avoid spinning the CPU in a continuous
// check loop)
tailerWaitEntryThrottle = time.Second / 2
)

var readinessProbeSuccess = []byte("Ready")

// Config for a querier.
Expand Down Expand Up @@ -308,6 +315,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
return q.tailDisconnectedIngesters(tailCtx, req, connectedIngestersAddr)
},
q.cfg.TailMaxDuration,
tailerWaitEntryThrottle,
), nil
}

Expand Down
45 changes: 44 additions & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"errors"
"fmt"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -107,10 +108,13 @@ func (c *queryClientMock) RecvMsg(m interface{}) error {
type tailClientMock struct {
util.ExtendedMock
logproto.Querier_TailClient
recvTrigger chan time.Time
}

func newTailClientMock() *tailClientMock {
return &tailClientMock{}
return &tailClientMock{
recvTrigger: make(chan time.Time, 10),
}
}

func (c *tailClientMock) Recv() (*logproto.TailResponse, error) {
Expand Down Expand Up @@ -138,6 +142,20 @@ func (c *tailClientMock) RecvMsg(m interface{}) error {
return nil
}

func (c *tailClientMock) mockRecvWithTrigger(response *logproto.TailResponse) *tailClientMock {
c.On("Recv").WaitUntil(c.recvTrigger).Return(response, nil)

return c
}

// triggerRecv triggers the Recv() mock to return from the next invocation
// or from the current invocation if was already called and waiting for the
// trigger. This method works if and only if the Recv() has been mocked with
// mockRecvWithTrigger().
func (c *tailClientMock) triggerRecv() {
c.recvTrigger <- time.Now()
}

// storeMock is a mockable version of Loki's storage, used in querier unit tests
// to control the behaviour of the store without really hitting any storage backend
type storeMock struct {
Expand Down Expand Up @@ -227,3 +245,28 @@ func mockReadRingWithOneActiveIngester() *readRingMock {
{Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}},
})
}

// mockStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from
func mockStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockStream(from, quantity))
}

// mockStream return a stream with quantity entries, where entries timestamp and
// line string are constructed as sequential numbers starting at from
func mockStream(from int, quantity int) *logproto.Stream {
entries := make([]logproto.Entry, 0, quantity)

for i := from; i < from+quantity; i++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("line %d", i),
})
}

return &logproto.Stream{
Entries: entries,
Labels: `{type="test"}`,
}
}
36 changes: 5 additions & 31 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
Expand All @@ -30,10 +29,10 @@ func TestQuerier_Query_QueryTimeoutConfigFlag(t *testing.T) {
}

store := newStoreMock()
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(), nil)
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream()}), nil)
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, &request, mock.Anything).Return(queryClient, nil)
Expand Down Expand Up @@ -119,13 +118,13 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) {
}

store := newStoreMock()
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(), nil)
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream()}), nil)
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil)

tailClient := newTailClientMock()
tailClient.On("Recv").Return(mockTailResponse(mockStream()), nil)
tailClient.On("Recv").Return(mockTailResponse(mockStream(1, 2)), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil)
Expand Down Expand Up @@ -170,24 +169,6 @@ func mockQuerierConfig() Config {
}
}

func mockStreamIterator() iter.EntryIterator {
return iter.NewStreamIterator(mockStream())
}

func mockStream() *logproto.Stream {
entries := []logproto.Entry{
{Timestamp: time.Now(), Line: "line 1"},
{Timestamp: time.Now(), Line: "line 2"},
}

labels := "{type=\"test\"}"

return &logproto.Stream{
Entries: entries,
Labels: labels,
}
}

func mockQueryResponse(streams []*logproto.Stream) *logproto.QueryResponse {
return &logproto.QueryResponse{
Streams: streams,
Expand All @@ -199,10 +180,3 @@ func mockLabelResponse(values []string) *logproto.LabelResponse {
Values: values,
}
}

func mockTailResponse(stream *logproto.Stream) *logproto.TailResponse {
return &logproto.TailResponse{
Stream: stream,
DroppedStreams: []*logproto.DroppedStream{},
}
}
Loading

0 comments on commit b918532

Please sign in to comment.