Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safe per tenant overrides loading #4421

Merged
merged 2 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions pkg/ruler/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ func newFakeLimits() fakeLimits {
RulerRemoteWriteDisabled: true,
},
additionalHeadersRWTenant: {
RulerRemoteWriteHeaders: map[string]string{
user.OrgIDHeaderName: "overridden",
strings.ToLower(user.OrgIDHeaderName): "overridden-lower",
strings.ToUpper(user.OrgIDHeaderName): "overridden-upper",
"Additional": "Header",
RulerRemoteWriteHeaders: validation.OverwriteMarshalingStringMap{
M: map[string]string{
user.OrgIDHeaderName: "overridden",
strings.ToLower(user.OrgIDHeaderName): "overridden-lower",
strings.ToUpper(user.OrgIDHeaderName): "overridden-upper",
"Additional": "Header",
},
},
},
customRelabelsTenant: {
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/flagext/bytesize.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ func (bs *ByteSize) UnmarshalYAML(unmarshal func(interface{}) error) error {
return bs.Set(str)
}

// MarshalYAML implements yaml.Marshaller.
// Use a string representation for consistency
func (bs *ByteSize) MarshalYAML() (interface{}, error) {
return bs.String(), nil
}

// UnmarshalJSON implements json.Unmarsal interface to work with JSON.
func (bs *ByteSize) UnmarshalJSON(val []byte) error {
var str string
Expand All @@ -56,3 +62,8 @@ func (bs *ByteSize) UnmarshalJSON(val []byte) error {

return bs.Set(str)
}

// Use a string representation for consistency
func (bs *ByteSize) MarshalJSON() ([]byte, error) {
return json.Marshal(bs.String())
}
88 changes: 71 additions & 17 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package validation

import (
"encoding/json"
"flag"
"fmt"
"strconv"
"time"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"golang.org/x/time/rate"
"gopkg.in/yaml.v2"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/ruler/util"
Expand Down Expand Up @@ -81,23 +84,23 @@ type Limits struct {

// this field is the inversion of the general remote_write.enabled because the zero value of a boolean is false,
// and if it were ruler_remote_write_enabled, it would be impossible to know if the value was explicitly set or default
RulerRemoteWriteDisabled bool `yaml:"ruler_remote_write_disabled" json:"ruler_remote_write_disabled"`
RulerRemoteWriteURL string `yaml:"ruler_remote_write_url" json:"ruler_remote_write_url"`
RulerRemoteWriteTimeout time.Duration `yaml:"ruler_remote_write_timeout" json:"ruler_remote_write_timeout"`
RulerRemoteWriteHeaders map[string]string `yaml:"ruler_remote_write_headers" json:"ruler_remote_write_headers"`
RulerRemoteWriteRelabelConfigs []*util.RelabelConfig `yaml:"ruler_remote_write_relabel_configs" json:"ruler_remote_write_relabel_configs"`
RulerRemoteWriteQueueCapacity int `yaml:"ruler_remote_write_queue_capacity" json:"ruler_remote_write_queue_capacity"`
RulerRemoteWriteQueueMinShards int `yaml:"ruler_remote_write_queue_min_shards" json:"ruler_remote_write_queue_min_shards"`
RulerRemoteWriteQueueMaxShards int `yaml:"ruler_remote_write_queue_max_shards" json:"ruler_remote_write_queue_max_shards"`
RulerRemoteWriteQueueMaxSamplesPerSend int `yaml:"ruler_remote_write_queue_max_samples_per_send" json:"ruler_remote_write_queue_max_samples_per_send"`
RulerRemoteWriteQueueBatchSendDeadline time.Duration `yaml:"ruler_remote_write_queue_batch_send_deadline" json:"ruler_remote_write_queue_batch_send_deadline"`
RulerRemoteWriteQueueMinBackoff time.Duration `yaml:"ruler_remote_write_queue_min_backoff" json:"ruler_remote_write_queue_min_backoff"`
RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"`
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`
RulerRemoteWriteDisabled bool `yaml:"ruler_remote_write_disabled" json:"ruler_remote_write_disabled"`
RulerRemoteWriteURL string `yaml:"ruler_remote_write_url" json:"ruler_remote_write_url"`
RulerRemoteWriteTimeout time.Duration `yaml:"ruler_remote_write_timeout" json:"ruler_remote_write_timeout"`
RulerRemoteWriteHeaders OverwriteMarshalingStringMap `yaml:"ruler_remote_write_headers" json:"ruler_remote_write_headers"`
RulerRemoteWriteRelabelConfigs []*util.RelabelConfig `yaml:"ruler_remote_write_relabel_configs,omitempty" json:"ruler_remote_write_relabel_configs,omitempty"`
RulerRemoteWriteQueueCapacity int `yaml:"ruler_remote_write_queue_capacity" json:"ruler_remote_write_queue_capacity"`
RulerRemoteWriteQueueMinShards int `yaml:"ruler_remote_write_queue_min_shards" json:"ruler_remote_write_queue_min_shards"`
RulerRemoteWriteQueueMaxShards int `yaml:"ruler_remote_write_queue_max_shards" json:"ruler_remote_write_queue_max_shards"`
RulerRemoteWriteQueueMaxSamplesPerSend int `yaml:"ruler_remote_write_queue_max_samples_per_send" json:"ruler_remote_write_queue_max_samples_per_send"`
RulerRemoteWriteQueueBatchSendDeadline time.Duration `yaml:"ruler_remote_write_queue_batch_send_deadline" json:"ruler_remote_write_queue_batch_send_deadline"`
RulerRemoteWriteQueueMinBackoff time.Duration `yaml:"ruler_remote_write_queue_min_backoff" json:"ruler_remote_write_queue_min_backoff"`
RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"`
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`

// Global and per tenant retention
RetentionPeriod model.Duration `yaml:"retention_period" json:"retention_period"`
StreamRetention []StreamRetention `yaml:"retention_stream" json:"retention_stream"`
StreamRetention []StreamRetention `yaml:"retention_stream,omitempty" json:"retention_stream,omitempty"`

// Config for overrides, convenient if it goes here.
PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"`
Expand Down Expand Up @@ -179,12 +182,18 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error {
// We want to set c to the defaults and then overwrite it with the input.
// To make unmarshal fill the plain data struct rather than calling UnmarshalYAML
// again, we have to hide it using a type indirection. See prometheus/config.
type plain Limits

// During startup we wont have a default value so we don't want to overwrite them
if defaultLimits != nil {
*l = *defaultLimits
b, err := yaml.Marshal(defaultLimits)
if err != nil {
return errors.Wrap(err, "cloning limits (marshaling)")
}
if err := yaml.Unmarshal(b, (*plain)(l)); err != nil {
return errors.Wrap(err, "cloning limits (unmarshaling)")
}
}
type plain Limits
return unmarshal((*plain)(l))
}

Expand Down Expand Up @@ -434,7 +443,7 @@ func (o *Overrides) RulerRemoteWriteTimeout(userID string) time.Duration {

// RulerRemoteWriteHeaders returns the headers to use in a remote-write for a given user.
func (o *Overrides) RulerRemoteWriteHeaders(userID string) map[string]string {
return o.getOverridesForUser(userID).RulerRemoteWriteHeaders
return o.getOverridesForUser(userID).RulerRemoteWriteHeaders.Map()
}

// RulerRemoteWriteRelabelConfigs returns the write relabel configs to use in a remote-write for a given user.
Expand Down Expand Up @@ -522,3 +531,48 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits {
}
return o.defaultLimits
}

// OverwriteMarshalingStringMap will overwrite the src map when unmarshaling
// as opposed to merging.
type OverwriteMarshalingStringMap struct {
M map[string]string
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
}

func (sm *OverwriteMarshalingStringMap) Map() map[string]string {
return sm.M
}

// MarshalJSON explicitly uses the the type receiver and not pointer receiver
// or it won't be called
func (sm OverwriteMarshalingStringMap) MarshalJSON() ([]byte, error) {
return json.Marshal(sm.M)
}

func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error {
var def map[string]string
if err := json.Unmarshal(val, &def); err != nil {
return err
}
sm.M = def

return nil

}

// MarshalYAML explicitly uses the the type receiver and not pointer receiver
// or it won't be called
func (sm OverwriteMarshalingStringMap) MarshalYAML() (interface{}, error) {
return sm.M, nil
}

func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{}) error) error {
var def map[string]string

err := unmarshal(&def)
if err != nil {
return err
}
sm.M = def

return nil
}
134 changes: 134 additions & 0 deletions pkg/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"encoding/json"
"reflect"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -109,3 +111,135 @@ per_tenant_override_period: 230s

assert.Equal(t, limitsYAML, limitsJSON)
}

func TestOverwriteMarshalingStringMapJSON(t *testing.T) {
m := OverwriteMarshalingStringMap{
M: map[string]string{"foo": "bar"},
}

require.Nil(t, json.Unmarshal([]byte(`{"bazz": "buzz"}`), &m))
require.Equal(t, map[string]string{"bazz": "buzz"}, m.Map())
out, err := json.Marshal(m)
require.Nil(t, err)
var back OverwriteMarshalingStringMap
require.Nil(t, json.Unmarshal(out, &back))
require.Equal(t, m, back)
}

func TestOverwriteMarshalingStringMapYAML(t *testing.T) {
m := OverwriteMarshalingStringMap{
M: map[string]string{"foo": "bar"},
}

require.Nil(t, yaml.Unmarshal([]byte(`{"bazz": "buzz"}`), &m))
require.Equal(t, map[string]string{"bazz": "buzz"}, m.Map())
out, err := yaml.Marshal(m)
require.Nil(t, err)
var back OverwriteMarshalingStringMap
require.Nil(t, yaml.Unmarshal(out, &back))
require.Equal(t, m, back)
}

func TestLimitsDoesNotMutate(t *testing.T) {
initialDefault := defaultLimits
defer func() {
defaultLimits = initialDefault
}()

// Set new defaults with non-nil values for non-scalar types
newDefaults := Limits{
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
StreamRetention: []StreamRetention{
{
Period: model.Duration(24 * time.Hour),
Selector: `{a="b"}`,
},
},
}
SetDefaultLimitsForYAMLUnmarshalling(newDefaults)

for _, tc := range []struct {
desc string
yaml string
exp Limits
}{
{
desc: "map",
yaml: `
ruler_remote_write_headers:
foo: "bar"
`,
exp: Limits{
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"foo": "bar"}},

// Rest from new defaults
StreamRetention: []StreamRetention{
{
Period: model.Duration(24 * time.Hour),
Selector: `{a="b"}`,
},
},
},
},
{
desc: "empty map overrides defaults",
yaml: `
ruler_remote_write_headers:
`,
exp: Limits{

// Rest from new defaults
StreamRetention: []StreamRetention{
{
Period: model.Duration(24 * time.Hour),
Selector: `{a="b"}`,
},
},
},
},
{
desc: "slice",
yaml: `
retention_stream:
- period: '24h'
selector: '{foo="bar"}'
`,
exp: Limits{
StreamRetention: []StreamRetention{
{
Period: model.Duration(24 * time.Hour),
Selector: `{foo="bar"}`,
},
},

// Rest from new defaults
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
},
},
{
desc: "scalar field",
yaml: `
reject_old_samples: true
`,
exp: Limits{
RejectOldSamples: true,

// Rest from new defaults
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
StreamRetention: []StreamRetention{
{
Period: model.Duration(24 * time.Hour),
Selector: `{a="b"}`,
},
},
},
},
} {

t.Run(tc.desc, func(t *testing.T) {
var out Limits
require.Nil(t, yaml.UnmarshalStrict([]byte(tc.yaml), &out))
require.Equal(t, tc.exp, out)
})
}
}