Skip to content

Commit

Permalink
Send logs to multiple loki instances (#536)
Browse files Browse the repository at this point in the history
* Adds the ability to provide multiple Loki URL

For backward compatibility `client:` still works with flag.

* add some tests for multi client

* update ksonnet module to support multiple client

* fix comment

* fix lint issues
  • Loading branch information
cyriltovena authored May 7, 2019
1 parent fa4f936 commit 53075db
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 68 deletions.
4 changes: 2 additions & 2 deletions cmd/promtail/promtail-docker-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ server:
positions:
filename: /tmp/positions.yaml

client:
url: http://loki:3100/api/prom/push
clients:
- url: http://loki:3100/api/prom/push

scrape_configs:
- job_name: system
Expand Down
4 changes: 2 additions & 2 deletions cmd/promtail/promtail-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ server:
positions:
filename: /tmp/positions.yaml

client:
url: http://localhost:3100/api/prom/push
clients:
- url: http://localhost:3100/api/prom/push

scrape_configs:
- job_name: system
Expand Down
67 changes: 25 additions & 42 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import (
"bufio"
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"

"github.com/grafana/loki/pkg/promtail/api"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
Expand All @@ -29,21 +29,21 @@ const contentType = "application/x-protobuf"
const maxErrMsgLen = 1024

var (
encodedBytes = prometheus.NewCounter(prometheus.CounterOpts{
encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
})
sentBytes = prometheus.NewCounter(prometheus.CounterOpts{
}, []string{"host"})
sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
})
}, []string{"host"})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Help: "Duration of send requests.",
}, []string{"status_code"})
}, []string{"status_code", "host"})
)

func init() {
Expand All @@ -52,32 +52,15 @@ func init() {
prometheus.MustRegister(requestDuration)
}

// Config describes configuration for a HTTP pusher client.
type Config struct {
URL flagext.URLValue
BatchWait time.Duration
BatchSize int

BackoffConfig util.BackoffConfig `yaml:"backoff_config"`
// The labels to add to any time series or alerts when communicating with loki
ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
Timeout time.Duration `yaml:"timeout"`
}

// RegisterFlags registers flags.
func (c *Config) RegisterFlags(flags *flag.FlagSet) {
flags.Var(&c.URL, "client.url", "URL of log server")
flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")

flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.")
flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.")
flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.")
flag.DurationVar(&c.Timeout, "client.timeout", 10*time.Second, "Maximum time to wait for server to respond to a request")
// Client pushes entries to Loki and can be stopped
type Client interface {
api.EntryHandler
// Stop goroutine sending batch of entries.
Stop()
}

// Client for pushing logs in snappy-compressed protos over HTTP.
type Client struct {
type client struct {
logger log.Logger
cfg Config
quit chan struct{}
Expand All @@ -93,9 +76,9 @@ type entry struct {
}

// New makes a new Client.
func New(cfg Config, logger log.Logger) (*Client, error) {
c := &Client{
logger: logger,
func New(cfg Config, logger log.Logger) Client {
c := &client{
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
quit: make(chan struct{}),
entries: make(chan entry),
Expand All @@ -104,10 +87,10 @@ func New(cfg Config, logger log.Logger) (*Client, error) {
}
c.wg.Add(1)
go c.run()
return c, nil
return c
}

func (c *Client) run() {
func (c *client) run() {
batch := map[model.Fingerprint]*logproto.Stream{}
batchSize := 0
maxWait := time.NewTimer(c.cfg.BatchWait)
Expand Down Expand Up @@ -151,25 +134,25 @@ func (c *Client) run() {
}
}

func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
buf, err := encodeBatch(batch)
if err != nil {
level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
return
}
bufBytes := float64(len(buf))
encodedBytes.Add(bufBytes)
encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

ctx := context.Background()
backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
var status int
for backoff.Ongoing() {
start := time.Now()
status, err = c.send(ctx, buf)
requestDuration.WithLabelValues(strconv.Itoa(status)).Observe(time.Since(start).Seconds())
requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

if err == nil {
sentBytes.Add(bufBytes)
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
return
}

Expand Down Expand Up @@ -202,7 +185,7 @@ func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) {
return buf, nil
}

func (c *Client) send(ctx context.Context, buf []byte) (int, error) {
func (c *client) send(ctx context.Context, buf []byte) (int, error) {
ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()
req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
Expand Down Expand Up @@ -230,13 +213,13 @@ func (c *Client) send(ctx context.Context, buf []byte) (int, error) {
}

// Stop the client.
func (c *Client) Stop() {
func (c *client) Stop() {
close(c.quit)
c.wg.Wait()
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *Client) Handle(ls model.LabelSet, t time.Time, s string) error {
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
if len(c.externalLabels) > 0 {
ls = c.externalLabels.Merge(ls)
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/promtail/client/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package client

import (
"flag"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/prometheus/common/model"
)

// Config describes configuration for a HTTP pusher client.
type Config struct {
URL flagext.URLValue
BatchWait time.Duration
BatchSize int

BackoffConfig util.BackoffConfig `yaml:"backoff_config"`
// The labels to add to any time series or alerts when communicating with loki
ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
Timeout time.Duration `yaml:"timeout"`
}

// RegisterFlags registers flags.
func (c *Config) RegisterFlags(flags *flag.FlagSet) {
flags.Var(&c.URL, "client.url", "URL of log server")
flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")

flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.")
flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.")
flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.")
flag.DurationVar(&c.Timeout, "client.timeout", 10*time.Second, "Maximum time to wait for server to respond to a request")

}

// UnmarshalYAML implement Yaml Unmarshaler
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
type raw Config
// force sane defaults.
cfg := raw{
BackoffConfig: util.BackoffConfig{
MaxBackoff: 5 * time.Second,
MaxRetries: 5,
MinBackoff: 100 * time.Millisecond,
},
BatchSize: 100 * 1024,
BatchWait: 1 * time.Second,
Timeout: 10 * time.Second,
}
if err := unmarshal(&cfg); err != nil {
return err
}

*c = Config(cfg)
return nil
}
24 changes: 24 additions & 0 deletions pkg/promtail/client/fake/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package fake

import (
"time"

"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/common/model"
)

// Client is a fake client used for testing.
type Client struct {
OnHandleEntry api.EntryHandlerFunc
OnStop func()
}

// Stop implements client.Client
func (c *Client) Stop() {
c.OnStop()
}

// Handle implements client.Client
func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error {
return c.OnHandleEntry.Handle(labels, time, entry)
}
43 changes: 43 additions & 0 deletions pkg/promtail/client/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package client

import (
"errors"
"time"

"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
)

// MultiClient is client pushing to one or more loki instances.
type MultiClient []Client

// NewMulti creates a new client
func NewMulti(logger log.Logger, cfgs ...Config) (Client, error) {
if len(cfgs) == 0 {
return nil, errors.New("at least one client config should be provided")
}
var clients []Client
for _, cfg := range cfgs {
clients = append(clients, New(cfg, logger))
}
return MultiClient(clients), nil
}

// Handle Implements api.EntryHandler
func (m MultiClient) Handle(labels model.LabelSet, time time.Time, entry string) error {
var result util.MultiError
for _, client := range m {
if err := client.Handle(labels, time, entry); err != nil {
result.Add(err)
}
}
return result.Err()
}

// Stop implements Client
func (m MultiClient) Stop() {
for _, c := range m {
c.Stop()
}
}
Loading

0 comments on commit 53075db

Please sign in to comment.