Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

promtail: add metrics on sent and dropped log entries #952

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ Promtail metrics:
- `promtail_read_bytes_total` Number of bytes read.
- `promtail_read_lines_total` Number of lines read.
- `promtail_request_duration_seconds_count` Number of send requests.
- `promtail_encoded_bytes_total` Number of bytes encoded and ready to send.
- `promtail_sent_bytes_total` Number of bytes sent.
- `promtail_dropped_bytes_total` Number of bytes dropped because failed to be sent to the ingester after all retries.
- `promtail_sent_entries_total` Number of log entries sent to the ingester.
- `promtail_dropped_entries_total` Number of log entries dropped because failed to be sent to the ingester after all retries.

Most of these metrics are counters and should continuously increase during normal operations:

Expand Down
33 changes: 29 additions & 4 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ var (
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
}, []string{"host"})
droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{"host"})
sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Help: "Number of log entries sent to the ingester.",
}, []string{"host"})
droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{"host"})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Expand All @@ -50,6 +65,9 @@ var (
func init() {
prometheus.MustRegister(encodedBytes)
prometheus.MustRegister(sentBytes)
prometheus.MustRegister(droppedBytes)
prometheus.MustRegister(sentEntries)
prometheus.MustRegister(droppedEntries)
prometheus.MustRegister(requestDuration)
}

Expand Down Expand Up @@ -154,7 +172,7 @@ func (c *client) run() {
}

func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
buf, err := encodeBatch(batch)
buf, entriesCount, err := encodeBatch(batch)
if err != nil {
level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
return
Expand All @@ -172,6 +190,7 @@ func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {

if err == nil {
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
return
}

Expand All @@ -186,22 +205,28 @@ func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {

if err != nil {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
}
}

func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) {
func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, int, error) {
req := logproto.PushRequest{
Streams: make([]*logproto.Stream, 0, len(batch)),
}

entriesCount := 0
for _, stream := range batch {
req.Streams = append(req.Streams, stream)
entriesCount += len(stream.Entries)
}

buf, err := proto.Marshal(&req)
if err != nil {
return nil, err
return nil, 0, err
}
buf = snappy.Encode(nil, buf)
return buf, nil
return buf, entriesCount, nil
}

func (c *client) send(ctx context.Context, buf []byte) (int, error) {
Expand Down
83 changes: 81 additions & 2 deletions pkg/promtail/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand All @@ -14,17 +15,21 @@ import (
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/logproto"
lokiflag "github.com/grafana/loki/pkg/util/flagext"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
)

func TestClient_Handle(t *testing.T) {
logEntries := []entry{
var (
logEntries = []entry{
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}},
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}},
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}},
}
)

func TestClient_Handle(t *testing.T) {
tests := map[string]struct {
clientBatchSize int
clientBatchWait time.Duration
Expand All @@ -33,6 +38,7 @@ func TestClient_Handle(t *testing.T) {
inputEntries []entry
inputDelay time.Duration
expectedBatches [][]*logproto.Stream
expectedMetrics string
}{
"batch log entries together until the batch size is reached": {
clientBatchSize: 10,
Expand All @@ -48,6 +54,11 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 3.0
`,
},
"batch log entries together until the batch wait time is reached": {
clientBatchSize: 10,
Expand All @@ -64,6 +75,11 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[1].Entry}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 2.0
`,
},
"retry send a batch up to backoff's max retries in case the server responds with a 5xx": {
clientBatchSize: 10,
Expand All @@ -82,6 +98,11 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
`,
},
"do not retry send a batch in case the server responds with a 4xx": {
clientBatchSize: 10,
Expand All @@ -94,11 +115,20 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
`,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Reset metrics
sentEntries.Reset()
droppedEntries.Reset()

// Create a buffer channel where we do enqueue received requests
receivedReqsChan := make(chan logproto.PushRequest, 10)

Expand Down Expand Up @@ -156,6 +186,55 @@ func TestClient_Handle(t *testing.T) {
for i, batch := range receivedReqs {
assert.Equal(t, testData.expectedBatches[i], batch.Streams)
}

expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
assert.NoError(t, err)
})
}
}

func TestClient_encodeBatch(t *testing.T) {
t.Parallel()

tests := map[string]struct {
inputBatch map[model.Fingerprint]*logproto.Stream
expectedEntriesCount int
}{
"empty batch": {
inputBatch: map[model.Fingerprint]*logproto.Stream{},
expectedEntriesCount: 0,
},
"single stream with single log entry": {
inputBatch: map[model.Fingerprint]*logproto.Stream{
model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
},
expectedEntriesCount: 1,
},
"single stream with multiple log entries": {
inputBatch: map[model.Fingerprint]*logproto.Stream{
model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}},
},
expectedEntriesCount: 2,
},
"multiple streams with multiple log entries": {
inputBatch: map[model.Fingerprint]*logproto.Stream{
model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}},
model.Fingerprint(2): {Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}},
},
expectedEntriesCount: 3,
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
t.Parallel()

_, entriesCount, err := encodeBatch(testData.inputBatch)
require.NoError(t, err)
assert.Equal(t, testData.expectedEntriesCount, entriesCount)
})
}
}
Expand Down