diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go index 452f39b2114c..7f17a1338bd5 100644 --- a/pkg/canary/comparator/comparator_test.go +++ b/pkg/canary/comparator/comparator_test.go @@ -156,7 +156,7 @@ func TestEntryNeverReceived(t *testing.T) { found := []time.Time{t1, t3, t4, t5} mr := &mockReader{found} - maxWait := 5 * time.Millisecond + maxWait := 50 * time.Millisecond c := NewComparator(actual, maxWait, 2*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), mr) c.entrySent(t1) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 107263f18686..57cfb7129334 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -20,6 +20,13 @@ import ( "github.com/grafana/loki/pkg/storage" ) +const ( + // How long the Tailer should wait - once there are no entries to read from ingesters - + // before checking if a new entry is available (to avoid spinning the CPU in a continuous + // check loop) + tailerWaitEntryThrottle = time.Second / 2 +) + var readinessProbeSuccess = []byte("Ready") // Config for a querier. @@ -308,6 +315,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, return q.tailDisconnectedIngesters(tailCtx, req, connectedIngestersAddr) }, q.cfg.TailMaxDuration, + tailerWaitEntryThrottle, ), nil } diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index b431725b5b46..37f15457e871 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -3,6 +3,7 @@ package querier import ( "context" "errors" + "fmt" "time" "github.com/cortexproject/cortex/pkg/chunk" @@ -107,10 +108,13 @@ func (c *queryClientMock) RecvMsg(m interface{}) error { type tailClientMock struct { util.ExtendedMock logproto.Querier_TailClient + recvTrigger chan time.Time } func newTailClientMock() *tailClientMock { - return &tailClientMock{} + return &tailClientMock{ + recvTrigger: make(chan time.Time, 10), + } } func (c *tailClientMock) Recv() (*logproto.TailResponse, error) { @@ -138,6 +142,20 @@ func (c *tailClientMock) RecvMsg(m interface{}) error { return nil } +func (c *tailClientMock) mockRecvWithTrigger(response *logproto.TailResponse) *tailClientMock { + c.On("Recv").WaitUntil(c.recvTrigger).Return(response, nil) + + return c +} + +// triggerRecv triggers the Recv() mock to return from the next invocation +// or from the current invocation if was already called and waiting for the +// trigger. This method works if and only if the Recv() has been mocked with +// mockRecvWithTrigger(). +func (c *tailClientMock) triggerRecv() { + c.recvTrigger <- time.Now() +} + // storeMock is a mockable version of Loki's storage, used in querier unit tests // to control the behaviour of the store without really hitting any storage backend type storeMock struct { @@ -227,3 +245,28 @@ func mockReadRingWithOneActiveIngester() *readRingMock { {Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}}, }) } + +// mockStreamIterator returns an iterator with 1 stream and quantity entries, +// where entries timestamp and line string are constructed as sequential numbers +// starting at from +func mockStreamIterator(from int, quantity int) iter.EntryIterator { + return iter.NewStreamIterator(mockStream(from, quantity)) +} + +// mockStream return a stream with quantity entries, where entries timestamp and +// line string are constructed as sequential numbers starting at from +func mockStream(from int, quantity int) *logproto.Stream { + entries := make([]logproto.Entry, 0, quantity) + + for i := from; i < from+quantity; i++ { + entries = append(entries, logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf("line %d", i), + }) + } + + return &logproto.Stream{ + Entries: entries, + Labels: `{type="test"}`, + } +} diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 541949b0486c..8b57d5f8c4b2 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" @@ -30,10 +29,10 @@ func TestQuerier_Query_QueryTimeoutConfigFlag(t *testing.T) { } store := newStoreMock() - store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(), nil) + store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil) queryClient := newQueryClientMock() - queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream()}), nil) + queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil) ingesterClient := newQuerierClientMock() ingesterClient.On("Query", mock.Anything, &request, mock.Anything).Return(queryClient, nil) @@ -119,13 +118,13 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) { } store := newStoreMock() - store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(), nil) + store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil) queryClient := newQueryClientMock() - queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream()}), nil) + queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil) tailClient := newTailClientMock() - tailClient.On("Recv").Return(mockTailResponse(mockStream()), nil) + tailClient.On("Recv").Return(mockTailResponse(mockStream(1, 2)), nil) ingesterClient := newQuerierClientMock() ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil) @@ -170,24 +169,6 @@ func mockQuerierConfig() Config { } } -func mockStreamIterator() iter.EntryIterator { - return iter.NewStreamIterator(mockStream()) -} - -func mockStream() *logproto.Stream { - entries := []logproto.Entry{ - {Timestamp: time.Now(), Line: "line 1"}, - {Timestamp: time.Now(), Line: "line 2"}, - } - - labels := "{type=\"test\"}" - - return &logproto.Stream{ - Entries: entries, - Labels: labels, - } -} - func mockQueryResponse(streams []*logproto.Stream) *logproto.QueryResponse { return &logproto.QueryResponse{ Streams: streams, @@ -199,10 +180,3 @@ func mockLabelResponse(values []string) *logproto.LabelResponse { Values: values, } } - -func mockTailResponse(stream *logproto.Stream) *logproto.TailResponse { - return &logproto.TailResponse{ - Stream: stream, - DroppedStreams: []*logproto.DroppedStream{}, - } -} diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index b788af1e5422..4505ad9fd77c 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -13,13 +13,20 @@ import ( ) const ( - // if we are not seeing any response from ingester, how long do we want to wait by going into sleep - nextEntryWait = time.Second / 2 - // keep checking connections with ingesters in duration checkConnectionsWithIngestersPeriod = time.Second * 5 - bufferSizeForTailResponse = 10 + // the size of the channel buffer used to send tailing streams + // back to the requesting client + maxBufferedTailResponses = 10 + + // the maximum number of entries to return in a TailResponse + maxEntriesPerTailResponse = 100 + + // the maximum number of dropped entries to keep in memory that will be sent along + // with the next successfully pushed response. Once the dropped entries memory buffer + // exceed this value, we start skipping dropped entries too. + maxDroppedEntriesPerTailResponse = 1000 ) type droppedEntry struct { @@ -48,15 +55,14 @@ type Tailer struct { querierTailClientsMtx sync.Mutex stopped bool - blocked bool - blockedMtx sync.RWMutex delayFor time.Duration responseChan chan *TailResponse closeErrChan chan error tailMaxDuration time.Duration - // when tail client is slow, drop entry and store its details in droppedEntries to notify client - droppedEntries []droppedEntry + // if we are not seeing any response from ingester, + // how long do we want to wait by going into sleep + waitEntryThrottle time.Duration } func (t *Tailer) readTailClients() { @@ -74,13 +80,9 @@ func (t *Tailer) loop() { tailMaxDurationTicker := time.NewTicker(t.tailMaxDuration) defer tailMaxDurationTicker.Stop() - tailResponse := new(TailResponse) - - for { - if t.stopped { - return - } + droppedEntries := make([]droppedEntry, 0) + for !t.stopped { select { case <-checkConnectionTicker.C: // Try to reconnect dropped ingesters and connect to new ingesters @@ -96,44 +98,66 @@ func (t *Tailer) loop() { default: } - if !t.next() { - if len(tailResponse.Streams) == 0 { - if len(t.querierTailClients) == 0 { - // All the connections to ingesters are dropped, try reconnecting or return error - if err := t.checkIngesterConnections(); err != nil { - level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err)) - } else { - continue - } - if err := t.close(); err != nil { - level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) - } - t.closeErrChan <- errors.New("all ingesters closed the connection") - return - } - time.Sleep(nextEntryWait) - continue - } - } else { - // If channel is blocked already, drop current entry directly to save the effort - if t.isBlocked() { - t.dropEntry(t.currEntry.Timestamp, t.currLabels, nil) + // Read as much entries as we can (up to the max allowed) and populate the + // tail response we'll send over the response channel + tailResponse := new(TailResponse) + entriesCount := 0 + + for ; entriesCount < maxEntriesPerTailResponse && t.next(); entriesCount++ { + // If the response channel channel is blocked, we drop the current entry directly + // to save the effort + if t.isResponseChanBlocked() { + droppedEntries = dropEntry(droppedEntries, t.currEntry.Timestamp, t.currLabels) continue } - tailResponse.Streams = append(tailResponse.Streams, logproto.Stream{Labels: t.currLabels, Entries: []logproto.Entry{t.currEntry}}) - if len(tailResponse.Streams) != 100 { - continue + tailResponse.Streams = append(tailResponse.Streams, logproto.Stream{ + Labels: t.currLabels, + Entries: []logproto.Entry{t.currEntry}, + }) + } + + // If all consumed entries have been dropped because the response channel is blocked + // we should reiterate on the loop + if len(tailResponse.Streams) == 0 && entriesCount > 0 { + continue + } + + // If no entry has been consumed we should ensure it's not caused by all ingesters + // connections dropped and then throttle for a while + if len(tailResponse.Streams) == 0 { + if len(t.querierTailClients) == 0 { + // All the connections to ingesters are dropped, try reconnecting or return error + if err := t.checkIngesterConnections(); err != nil { + level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err)) + } else { + continue + } + if err := t.close(); err != nil { + level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) + } + t.closeErrChan <- errors.New("all ingesters closed the connection") + return } - tailResponse.DroppedEntries = t.popDroppedEntries() + + time.Sleep(t.waitEntryThrottle) + continue + } + + // Send the tail response through the response channel without blocking. + // Drop the entry if the response channel buffer is full. + if len(droppedEntries) > 0 { + tailResponse.DroppedEntries = droppedEntries } select { case t.responseChan <- tailResponse: + if len(droppedEntries) > 0 { + droppedEntries = make([]droppedEntry, 0) + } default: - t.dropEntry(t.currEntry.Timestamp, t.currLabels, tailResponse.DroppedEntries) + droppedEntries = dropEntries(droppedEntries, tailResponse.Streams) } - tailResponse = new(TailResponse) } } @@ -219,33 +243,10 @@ func (t *Tailer) close() error { return t.openStreamIterator.Close() } -func (t *Tailer) dropEntry(timestamp time.Time, labels string, alreadyDroppedEntries []droppedEntry) { - t.blockedMtx.Lock() - defer t.blockedMtx.Unlock() - - t.droppedEntries = append(t.droppedEntries, alreadyDroppedEntries...) - t.droppedEntries = append(t.droppedEntries, droppedEntry{timestamp, labels}) -} - -func (t *Tailer) isBlocked() bool { - t.blockedMtx.RLock() - defer t.blockedMtx.RUnlock() - - return t.blocked -} - -func (t *Tailer) popDroppedEntries() []droppedEntry { - t.blockedMtx.Lock() - defer t.blockedMtx.Unlock() - - t.blocked = false - if len(t.droppedEntries) == 0 { - return nil - } - droppedEntries := t.droppedEntries - t.droppedEntries = []droppedEntry{} - - return droppedEntries +func (t *Tailer) isResponseChanBlocked() bool { + // Thread-safety: len() and cap() on a channel are thread-safe. The cap() doesn't + // change over the time, while len() does. + return len(t.responseChan) == cap(t.responseChan) } func (t *Tailer) getResponseChan() <-chan *TailResponse { @@ -262,18 +263,38 @@ func newTailer( historicEntries iter.EntryIterator, tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailMaxDuration time.Duration, + waitEntryThrottle time.Duration, ) *Tailer { t := Tailer{ openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{historicEntries}, logproto.FORWARD), querierTailClients: querierTailClients, delayFor: delayFor, - responseChan: make(chan *TailResponse, bufferSizeForTailResponse), + responseChan: make(chan *TailResponse, maxBufferedTailResponses), closeErrChan: make(chan error), tailDisconnectedIngesters: tailDisconnectedIngesters, tailMaxDuration: tailMaxDuration, + waitEntryThrottle: waitEntryThrottle, } t.readTailClients() go t.loop() return &t } + +func dropEntry(droppedEntries []droppedEntry, timestamp time.Time, labels string) []droppedEntry { + if len(droppedEntries) >= maxDroppedEntriesPerTailResponse { + return droppedEntries + } + + return append(droppedEntries, droppedEntry{timestamp, labels}) +} + +func dropEntries(droppedEntries []droppedEntry, streams []logproto.Stream) []droppedEntry { + for _, stream := range streams { + for _, entry := range stream.Entries { + droppedEntries = dropEntry(droppedEntries, entry.Timestamp, entry.Line) + } + } + + return droppedEntries +} diff --git a/pkg/querier/tail_mock_test.go b/pkg/querier/tail_mock_test.go new file mode 100644 index 000000000000..5db3db1271b6 --- /dev/null +++ b/pkg/querier/tail_mock_test.go @@ -0,0 +1,10 @@ +package querier + +import "github.com/grafana/loki/pkg/logproto" + +func mockTailResponse(stream *logproto.Stream) *logproto.TailResponse { + return &logproto.TailResponse{ + Stream: stream, + DroppedStreams: []*logproto.DroppedStream{}, + } +} diff --git a/pkg/querier/tail_test.go b/pkg/querier/tail_test.go new file mode 100644 index 000000000000..56fffc4e514f --- /dev/null +++ b/pkg/querier/tail_test.go @@ -0,0 +1,252 @@ +package querier + +import ( + "errors" + "testing" + "time" + + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + timeout = 1 * time.Second + throttle = 10 * time.Millisecond +) + +func TestTailer(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + historicEntries iter.EntryIterator + tailClient *tailClientMock + tester func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) + }{ + "tail logs from historic entries only (no tail clients provided)": { + historicEntries: mockStreamIterator(1, 2), + tailClient: nil, + tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) { + responses, err := readFromTailer(tailer, 2) + require.NoError(t, err) + + actual := flattenStreamsFromResponses(responses) + + assert.Equal(t, []logproto.Stream{ + *mockStream(1, 1), + *mockStream(2, 1), + }, actual) + }, + }, + "tail logs from tail clients only (no historic entries provided)": { + historicEntries: mockStreamIterator(0, 0), + tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))), + tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) { + tailClient.triggerRecv() + + responses, err := readFromTailer(tailer, 1) + require.NoError(t, err) + + actual := flattenStreamsFromResponses(responses) + + assert.Equal(t, []logproto.Stream{ + *mockStream(1, 1), + }, actual) + }, + }, + "tail logs both from historic entries and tail clients": { + historicEntries: mockStreamIterator(1, 2), + tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(3, 1))), + tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) { + tailClient.triggerRecv() + + responses, err := readFromTailer(tailer, 3) + require.NoError(t, err) + + actual := flattenStreamsFromResponses(responses) + + assert.Equal(t, []logproto.Stream{ + *mockStream(1, 1), + *mockStream(2, 1), + *mockStream(3, 1), + }, actual) + }, + }, + "honor max entries per tail response": { + historicEntries: mockStreamIterator(1, maxEntriesPerTailResponse+1), + tailClient: nil, + tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) { + responses, err := readFromTailer(tailer, maxEntriesPerTailResponse+1) + require.NoError(t, err) + + require.Equal(t, 2, len(responses)) + assert.Equal(t, maxEntriesPerTailResponse, countEntriesInStreams(responses[0].Streams)) + assert.Equal(t, 1, countEntriesInStreams(responses[1].Streams)) + assert.Equal(t, 0, len(responses[1].DroppedEntries)) + }, + }, + "honor max buffered tail responses": { + historicEntries: mockStreamIterator(1, (maxEntriesPerTailResponse*maxBufferedTailResponses)+5), + tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))), + tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) { + err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer) + require.NoError(t, err) + + // Since the response channel is full/blocked, we do expect that all responses + // are "full" and extra entries from historic entries have been dropped + responses, err := readFromTailer(tailer, (maxEntriesPerTailResponse * maxBufferedTailResponses)) + require.NoError(t, err) + + require.Equal(t, maxBufferedTailResponses, len(responses)) + for i := 0; i < maxBufferedTailResponses; i++ { + assert.Equal(t, maxEntriesPerTailResponse, countEntriesInStreams(responses[i].Streams)) + assert.Equal(t, 0, len(responses[1].DroppedEntries)) + } + + // Since we'll not receive dropped entries until the next tail response, we're now + // going to trigger a Recv() from the tail client + tailClient.triggerRecv() + + responses, err = readFromTailer(tailer, 1) + require.NoError(t, err) + + require.Equal(t, 1, len(responses)) + assert.Equal(t, 1, countEntriesInStreams(responses[0].Streams)) + assert.Equal(t, 5, len(responses[0].DroppedEntries)) + }, + }, + "honor max dropped entries per tail response": { + historicEntries: mockStreamIterator(1, (maxEntriesPerTailResponse*maxBufferedTailResponses)+maxDroppedEntriesPerTailResponse+5), + tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))), + tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) { + err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer) + require.NoError(t, err) + + // Since the response channel is full/blocked, we do expect that all responses + // are "full" and extra entries from historic entries have been dropped + responses, err := readFromTailer(tailer, (maxEntriesPerTailResponse * maxBufferedTailResponses)) + require.NoError(t, err) + + require.Equal(t, maxBufferedTailResponses, len(responses)) + for i := 0; i < maxBufferedTailResponses; i++ { + assert.Equal(t, maxEntriesPerTailResponse, countEntriesInStreams(responses[i].Streams)) + assert.Equal(t, 0, len(responses[1].DroppedEntries)) + } + + // Since we'll not receive dropped entries until the next tail response, we're now + // going to trigger a Recv() from the tail client + tailClient.triggerRecv() + + responses, err = readFromTailer(tailer, 1) + require.NoError(t, err) + + require.Equal(t, 1, len(responses)) + assert.Equal(t, 1, countEntriesInStreams(responses[0].Streams)) + assert.Equal(t, maxDroppedEntriesPerTailResponse, len(responses[0].DroppedEntries)) + }, + }, + } + + for testName, test := range tests { + t.Run(testName, func(t *testing.T) { + tailDisconnectedIngesters := func([]string) (map[string]logproto.Querier_TailClient, error) { + return map[string]logproto.Querier_TailClient{}, nil + } + + tailClients := map[string]logproto.Querier_TailClient{} + if test.tailClient != nil { + tailClients["test"] = test.tailClient + } + + tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle) + defer tailer.close() + + test.tester(t, tailer, test.tailClient) + }) + } +} + +func readFromTailer(tailer *Tailer, maxEntries int) ([]*TailResponse, error) { + responses := make([]*TailResponse, 0) + entriesCount := 0 + + // Ensure we do not wait indefinitely + timeoutTicker := time.NewTicker(timeout) + defer timeoutTicker.Stop() + + for !tailer.stopped && entriesCount < maxEntries { + select { + case <-timeoutTicker.C: + return nil, errors.New("timeout expired while reading responses from Tailer") + case response := <-tailer.getResponseChan(): + responses = append(responses, response) + entriesCount += countEntriesInStreams(response.Streams) + default: + time.Sleep(throttle) + } + } + + return responses, nil +} + +func waitUntilTailerOpenStreamsHaveBeenConsumed(tailer *Tailer) error { + // Ensure we do not wait indefinitely + timeoutTicker := time.NewTicker(timeout) + defer timeoutTicker.Stop() + + for { + if isTailerOpenStreamsConsumed(tailer) { + return nil + } + + select { + case <-timeoutTicker.C: + return errors.New("timeout expired while reading responses from Tailer") + default: + time.Sleep(throttle) + } + } +} + +// isTailerOpenStreamsConsumed returns whether the input Tailer has fully +// consumed all streams from the openStreamIterator, which means the +// Tailer.loop() is now throttling +func isTailerOpenStreamsConsumed(tailer *Tailer) bool { + tailer.streamMtx.Lock() + defer tailer.streamMtx.Unlock() + + return tailer.openStreamIterator.Len() == 0 || tailer.openStreamIterator.Peek() == time.Unix(0, 0) +} + +func countEntriesInStreams(streams []logproto.Stream) int { + count := 0 + + for _, stream := range streams { + count += len(stream.Entries) + } + + return count +} + +// flattenStreamsFromResponses returns an array of streams each one containing +// one and only one entry from the input list of responses. This function is used +// to abstract away implementation details in the Tailer when testing for the output +// regardless how the responses have been generated (ie. multiple entries grouped +// into the same stream) +func flattenStreamsFromResponses(responses []*TailResponse) []logproto.Stream { + result := make([]logproto.Stream, 0) + + for _, response := range responses { + for _, stream := range response.Streams { + for _, entry := range stream.Entries { + result = append(result, logproto.Stream{ + Entries: []logproto.Entry{entry}, + Labels: stream.Labels, + }) + } + } + } + + return result +}