Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send logs to multiple loki instances #536

Merged
merged 5 commits into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/promtail/promtail-docker-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ server:
positions:
filename: /tmp/positions.yaml

client:
url: http://loki:3100/api/prom/push
clients:
- url: http://loki:3100/api/prom/push

scrape_configs:
- job_name: system
Expand Down
4 changes: 2 additions & 2 deletions cmd/promtail/promtail-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ server:
positions:
filename: /tmp/positions.yaml

client:
url: http://localhost:3100/api/prom/push
clients:
- url: http://localhost:3100/api/prom/push

scrape_configs:
- job_name: system
Expand Down
67 changes: 25 additions & 42 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import (
"bufio"
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"

"github.com/grafana/loki/pkg/promtail/api"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
Expand All @@ -29,21 +29,21 @@ const contentType = "application/x-protobuf"
const maxErrMsgLen = 1024

var (
encodedBytes = prometheus.NewCounter(prometheus.CounterOpts{
encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
})
sentBytes = prometheus.NewCounter(prometheus.CounterOpts{
}, []string{"host"})
sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
})
}, []string{"host"})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Help: "Duration of send requests.",
}, []string{"status_code"})
}, []string{"status_code", "host"})
)

func init() {
Expand All @@ -52,32 +52,15 @@ func init() {
prometheus.MustRegister(requestDuration)
}

// Config describes configuration for a HTTP pusher client.
type Config struct {
URL flagext.URLValue
BatchWait time.Duration
BatchSize int

BackoffConfig util.BackoffConfig `yaml:"backoff_config"`
// The labels to add to any time series or alerts when communicating with loki
ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
Timeout time.Duration `yaml:"timeout"`
}

// RegisterFlags registers flags.
func (c *Config) RegisterFlags(flags *flag.FlagSet) {
flags.Var(&c.URL, "client.url", "URL of log server")
flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")

flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.")
flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.")
flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.")
flag.DurationVar(&c.Timeout, "client.timeout", 10*time.Second, "Maximum time to wait for server to respond to a request")
// Client pushes entries to Loki and can be stopped
type Client interface {
api.EntryHandler
// Stop goroutine sending batch of entries.
Stop()
}

// Client for pushing logs in snappy-compressed protos over HTTP.
type Client struct {
type client struct {
logger log.Logger
cfg Config
quit chan struct{}
Expand All @@ -93,9 +76,9 @@ type entry struct {
}

// New makes a new Client.
func New(cfg Config, logger log.Logger) (*Client, error) {
c := &Client{
logger: logger,
func New(cfg Config, logger log.Logger) Client {
c := &client{
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
quit: make(chan struct{}),
entries: make(chan entry),
Expand All @@ -104,10 +87,10 @@ func New(cfg Config, logger log.Logger) (*Client, error) {
}
c.wg.Add(1)
go c.run()
return c, nil
return c
}

func (c *Client) run() {
func (c *client) run() {
batch := map[model.Fingerprint]*logproto.Stream{}
batchSize := 0
maxWait := time.NewTimer(c.cfg.BatchWait)
Expand Down Expand Up @@ -151,25 +134,25 @@ func (c *Client) run() {
}
}

func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
buf, err := encodeBatch(batch)
if err != nil {
level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
return
}
bufBytes := float64(len(buf))
encodedBytes.Add(bufBytes)
encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

ctx := context.Background()
backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
var status int
for backoff.Ongoing() {
start := time.Now()
status, err = c.send(ctx, buf)
requestDuration.WithLabelValues(strconv.Itoa(status)).Observe(time.Since(start).Seconds())
requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

if err == nil {
sentBytes.Add(bufBytes)
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
return
}

Expand Down Expand Up @@ -202,7 +185,7 @@ func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) {
return buf, nil
}

func (c *Client) send(ctx context.Context, buf []byte) (int, error) {
func (c *client) send(ctx context.Context, buf []byte) (int, error) {
ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()
req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
Expand Down Expand Up @@ -230,13 +213,13 @@ func (c *Client) send(ctx context.Context, buf []byte) (int, error) {
}

// Stop the client.
func (c *Client) Stop() {
func (c *client) Stop() {
close(c.quit)
c.wg.Wait()
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *Client) Handle(ls model.LabelSet, t time.Time, s string) error {
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
if len(c.externalLabels) > 0 {
ls = c.externalLabels.Merge(ls)
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/promtail/client/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package client

import (
"flag"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/prometheus/common/model"
)

// Config describes configuration for a HTTP pusher client.
type Config struct {
URL flagext.URLValue
BatchWait time.Duration
BatchSize int

BackoffConfig util.BackoffConfig `yaml:"backoff_config"`
// The labels to add to any time series or alerts when communicating with loki
ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
Timeout time.Duration `yaml:"timeout"`
}

// RegisterFlags registers flags.
func (c *Config) RegisterFlags(flags *flag.FlagSet) {
flags.Var(&c.URL, "client.url", "URL of log server")
flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")

flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.")
flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.")
flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.")
flag.DurationVar(&c.Timeout, "client.timeout", 10*time.Second, "Maximum time to wait for server to respond to a request")

}

// UnmarshalYAML implement Yaml Unmarshaler
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
type raw Config
// force sane defaults.
cfg := raw{
BackoffConfig: util.BackoffConfig{
MaxBackoff: 5 * time.Second,
MaxRetries: 5,
MinBackoff: 100 * time.Millisecond,
},
BatchSize: 100 * 1024,
BatchWait: 1 * time.Second,
Timeout: 10 * time.Second,
}
if err := unmarshal(&cfg); err != nil {
return err
}

*c = Config(cfg)
return nil
}
24 changes: 24 additions & 0 deletions pkg/promtail/client/fake/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package fake

import (
"time"

"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/common/model"
)

// Client is a fake client used for testing.
type Client struct {
OnHandleEntry api.EntryHandlerFunc
OnStop func()
}

// Stop implements client.Client
func (c *Client) Stop() {
c.OnStop()
}

// Handle implements client.Client
func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error {
return c.OnHandleEntry.Handle(labels, time, entry)
}
43 changes: 43 additions & 0 deletions pkg/promtail/client/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package client

import (
"errors"
"time"

"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
)

// MultiClient is client pushing to one or more loki instances.
type MultiClient []Client

// NewMulti creates a new client
func NewMulti(logger log.Logger, cfgs ...Config) (Client, error) {
if len(cfgs) == 0 {
return nil, errors.New("at least one client config should be provided")
}
var clients []Client
for _, cfg := range cfgs {
clients = append(clients, New(cfg, logger))
}
return MultiClient(clients), nil
}

// Handle Implements api.EntryHandler
func (m MultiClient) Handle(labels model.LabelSet, time time.Time, entry string) error {
var result util.MultiError
for _, client := range m {
if err := client.Handle(labels, time, entry); err != nil {
result.Add(err)
}
}
return result.Err()
}

// Stop implements Client
func (m MultiClient) Stop() {
for _, c := range m {
c.Stop()
}
}
Loading