Skip to content

Commit

Permalink
Add ingestion rate global limit support (#1486)
Browse files Browse the repository at this point in the history
* Added ingestion rate global limit support

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed code comments

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Updated config doc

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Updated changelog

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Updated doc

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored and cyriltovena committed Jan 9, 2020
1 parent 33f70b7 commit ce407d3
Show file tree
Hide file tree
Showing 63 changed files with 11,401 additions and 107 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
## master / unreleased

### Notable Changes
* [1486](https://github.com/grafana/loki/pull/1486) **pracucci**: Deprecated `-distributor.limiter-reload-period` flag / distributor's `limiter_reload_period` config option.

### Features

* [FEATURE] promtail positions file corruptions can be ignored with the `positions.ignore-invalid-yaml` flag. In the case the positions yaml is corrupted an empty positions config will be used and should later overwrite the malformed yaml.
* [1486](https://github.com/grafana/loki/pull/1486) **pracucci**: Added `global` ingestion rate limiter strategy support.

# 1.2.0 (2019-12-09)

Expand Down
26 changes: 21 additions & 5 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ The `server_config` block configures Promtail's behavior as an HTTP server:
The `distributor_config` block configures the Loki Distributor.

```yaml
# Period at which to reload user ingestion limits.
[limiter_reload_period: <duration> | default = 5m]
# Configures the distributors ring, used when the "global" ingestion rate
# strategy is enabled.
[ring: <ring_config>]
```

## querier_config
Expand Down Expand Up @@ -716,12 +717,27 @@ The `limits_config` block configures global and per-tenant limits for ingesting
logs in Loki.

```yaml
# Whether the ingestion rate limit should be applied individually to each
# distributor instance (local), or evenly shared across the cluster (global).
# The ingestion rate strategy cannot be overridden on a per-tenant basis.
#
# - local: enforces the limit on a per distributor basis. The actual effective
# rate limit will be N times higher, where N is the number of distributor
# replicas.
# - global: enforces the limit globally, configuring a per-distributor local
# rate limiter as "ingestion_rate / N", where N is the number of distributor
# replicas (it's automatically adjusted if the number of replicas change).
# The global strategy requires the distributors to form their own ring, which
# is used to keep track of the current number of healthy distributor replicas.
[ingestion_rate_strategy: <string> | default = "local"]
# Per-user ingestion rate limit in sample size per second. Units in MB.
[ingestion_rate_mb: <float> | default = 4]
# Per-user allowed ingestion burst size (in sample size). Units in MB. Warning,
# very high limits will be reset every limiter_reload_period defined in
# distributor_config.
# Per-user allowed ingestion burst size (in sample size). Units in MB.
# The burst size refers to the per-distributor local rate limiter even in the
# case of the "global" strategy, and should be set at least to the maximum logs
# size expected in a single push request.
[ingestion_burst_size_mb: <int> | default = 6]
# Maximum length of a label name.
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ require (
golang.org/x/net v0.0.0-20190923162816-aa69164e4478
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
golang.org/x/sys v0.0.0-20191218084908-4a24b4065292 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20190925134113-a044388aa56f // indirect
google.golang.org/appengine v1.6.3 // indirect
google.golang.org/genproto v0.0.0-20190916214212-f660b8655731 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/structtag v1.0.0/go.mod h1:IKitwq45uXL/yqi5mYghiD3w9H6eTOvI9vnk8tXMphA=
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c h1:QwbffUs/+ptC4kTFPEN9Ej2latTq3bZJ5HO/OwPXYMs=
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ=
github.com/fluent/fluent-logger-golang v1.2.1 h1:CMA+mw2zMiOGEOarZtaqM3GBWT1IVLNncNi0nKELtmU=
github.com/fluent/fluent-logger-golang v1.2.1/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk=
Expand Down Expand Up @@ -563,6 +564,7 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/philhofer/fwd v0.0.0-20160129035939-98c11a7a6ec8 h1:jkUFVqrKRttbdDqkTrvOmHxfqIsJK0Oe2WGi1ACAE+M=
github.com/philhofer/fwd v0.0.0-20160129035939-98c11a7a6ec8/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.3.1-0.20191115212037-9085dacd1e1e+incompatible h1:5isCJDRADbeSlWx6KVXAYwrcihyCGVXr7GNCdLEVDr8=
Expand Down Expand Up @@ -666,6 +668,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/thanos-io/thanos v0.8.1/go.mod h1:qQDi/6tgypn96+VzSumlxfJIgFX2y3ablfhHHLZ05cg=
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d h1:Ninez2SUm08xpmnw7kVxCeOc3DahF6IuMuRMCdM4wTQ=
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
Expand All @@ -687,6 +690,7 @@ github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVM
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1 h1:qi+YkNiB7T3Ikw1DoDIFhdAPbDU7fUPDsKrUoZdupnQ=
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1/go.mod h1:7gGdEUJaCrSlWi/mjd68CZv0sfqektYPDcro9cE+M9k=
github.com/weaveworks/common v0.0.0-20190822150010-afb9996716e4 h1:O8BmyjqQoByXjAj6XaTfcxxqSIK6DYLmOSiYQPL9yJg=
github.com/weaveworks/common v0.0.0-20190822150010-afb9996716e4/go.mod h1:pSm+0KR57BG3pvGoJWFXJSAC7+sEPewcvdt5StevL3A=
Expand Down
120 changes: 51 additions & 69 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"flag"
"net/http"
"sync"
"sync/atomic"
"time"

cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"

"github.com/go-kit/kit/log/level"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/time/rate"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester/client"
Expand All @@ -30,7 +30,6 @@ import (

const (
metricName = "logs"
bytesInMB = 1048576
)

var readinessProbeSuccess = []byte("Ready")
Expand Down Expand Up @@ -60,77 +59,78 @@ var (

// Config for a Distributor.
type Config struct {
// For testing.
factory func(addr string) (grpc_health_v1.HealthClient, error)
// Distributors ring
DistributorRing cortex_distributor.RingConfig `yaml:"ring,omitempty"`

LimiterReloadPeriod time.Duration `yaml:"limiter_reload_period"`
// For testing.
factory func(addr string) (grpc_health_v1.HealthClient, error) `yaml:"-"`
}

// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
cfg.DistributorRing.RegisterFlags(f)
}

// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
cfg Config
clientCfg client.Config
ring ring.ReadRing
overrides *validation.Overrides
pool *cortex_client.Pool

ingestLimitersMtx sync.RWMutex
ingestLimiters map[string]*rate.Limiter
quit chan struct{}
cfg Config
clientCfg client.Config
ingestersRing ring.ReadRing
overrides *validation.Overrides
pool *cortex_client.Pool

// The global rate limiter requires a distributors ring to count
// the number of healthy instances.
distributorsRing *ring.Lifecycler

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
}

// New a distributor creates.
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) {
func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(clientCfg, addr)
}
}

d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
ingestLimiters: map[string]*rate.Limiter{},
quit: make(chan struct{}),
}
// Create the configured ingestion rate limit strategy (local or global).
var ingestionRateStrategy limiter.RateLimiterStrategy
var distributorsRing *ring.Lifecycler

go d.loop()
if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
var err error
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey)
if err != nil {
return nil, err
}

return &d, nil
}
distributorsRing.Start()

func (d *Distributor) loop() {
if d.cfg.LimiterReloadPeriod == 0 {
return
ingestionRateStrategy = newGlobalIngestionRateStrategy(overrides, distributorsRing)
} else {
ingestionRateStrategy = newLocalIngestionRateStrategy(overrides)
}

ticker := time.NewTicker(d.cfg.LimiterReloadPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
d.ingestLimitersMtx.Lock()
d.ingestLimiters = make(map[string]*rate.Limiter, len(d.ingestLimiters))
d.ingestLimitersMtx.Unlock()

case <-d.quit:
return
}
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ingestersRing, factory, cortex_util.Logger),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
}

return &d, nil
}

func (d *Distributor) Stop() {
close(d.quit)
if d.distributorsRing != nil {
d.distributorsRing.Shutdown()
}
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand All @@ -153,7 +153,7 @@ type pushTracker struct {
// ReadinessHandler is used to indicate to k8s when the distributor is ready.
// Returns 200 when the distributor is ready, 500 otherwise.
func (d *Distributor) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
_, err := d.ring.GetAll()
_, err := d.ingestersRing.GetAll()
if err != nil {
http.Error(w, "Not ready: "+err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -226,13 +226,13 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

limiter := d.getOrCreateIngestLimiter(userID)
if !limiter.AllowN(time.Now(), validatedSamplesSize) {
now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) {
// Return a 4xx here to have the client discard the data and not retry. If a client
// is sending too much data consistently we will unlikely ever catch up otherwise.
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d) exceeded while adding %d lines", int(limiter.Limit()), validatedSamplesCount)
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d bytes) exceeded while adding %d lines for a total size of %d bytes", int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)
}

const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
Expand All @@ -241,7 +241,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
samplesByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.IngesterDesc{}
for i, key := range keys {
replicationSet, err := d.ring.Get(key, ring.Write, descs[:0])
replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -349,21 +349,3 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
d.ingestLimitersMtx.RLock()
limiter, ok := d.ingestLimiters[userID]
d.ingestLimitersMtx.RUnlock()

if ok {
return limiter
}

limiter = rate.NewLimiter(rate.Limit(int64(d.overrides.IngestionRate(userID)*bytesInMB)), int(d.overrides.IngestionBurstSize(userID)*bytesInMB))

d.ingestLimitersMtx.Lock()
d.ingestLimiters[userID] = limiter
d.ingestLimitersMtx.Unlock()

return limiter
}
Loading

0 comments on commit ce407d3

Please sign in to comment.