Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply better defaults when boltdb shipper is being used #4508

Merged
merged 7 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 90 additions & 56 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"

"github.com/grafana/loki/pkg/storage/chunk/storage"
loki_storage "github.com/grafana/loki/pkg/storage"
chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"

"github.com/grafana/loki/pkg/util/cfg"
)

Expand Down Expand Up @@ -74,7 +76,14 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}

applyMemberlistConfig(r)
applyStorageConfig(r, &defaults)
err := applyStorageConfig(r, &defaults)
if err != nil {
return err
}

if len(r.SchemaConfig.Configs) > 0 && loki_storage.UsingBoltdbShipper(r.SchemaConfig.Configs) {
betterBoltdbShipperDefaults(r, &defaults)
}

return nil
}
Expand All @@ -93,96 +102,121 @@ func applyMemberlistConfig(r *ConfigWrapper) {
}
}

var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend")

// applyStorageConfig will attempt to apply a common storage config for either
// s3, gcs, azure, or swift to all the places we create an object storage client.
// s3, gcs, azure, or swift to all the places we create a storage client.
// If any specific configs for an object storage client have been provided elsewhere in the
// configuration file, applyStorageConfig will not override them.
// If multiple storage configurations are provided, applyStorageConfig will apply
// all of them, and will set the value for the Ruler's StoreConfig `type` to the
// last one (alphabetically) that was defined.
func applyStorageConfig(cfg, defaults *ConfigWrapper) {
rulerStoreConfigsToApply := make([]func(*ConfigWrapper), 0, 4)
chunkStorageConfigsToApply := make([]func(*ConfigWrapper), 0, 4)
// If multiple storage configurations are provided, applyStorageConfig will return an error
func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
var applyRulerStoreConfig func(*ConfigWrapper)
var applyChunkStorageConfig func(*ConfigWrapper)

//only one config is allowed
configsFound := 0

if cfg.Common.Storage.Azure != nil {
rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) {
configsFound++

applyRulerStoreConfig = func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "azure"
r.Ruler.StoreConfig.Azure = r.Common.Storage.Azure.ToCortexAzureConfig()
})
}

chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) {
applyChunkStorageConfig = func(r *ConfigWrapper) {
r.StorageConfig.AzureStorageConfig = *r.Common.Storage.Azure
r.CompactorConfig.SharedStoreType = storage.StorageTypeAzure
})
}

if cfg.Common.Storage.GCS != nil {
rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "gcs"
r.Ruler.StoreConfig.GCS = r.Common.Storage.GCS.ToCortexGCSConfig()
})

chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) {
r.StorageConfig.GCSConfig = *r.Common.Storage.GCS
r.CompactorConfig.SharedStoreType = storage.StorageTypeGCS
})
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeAzure
}
}

if cfg.Common.Storage.FSConfig != nil {
trevorwhitney marked this conversation as resolved.
Show resolved Hide resolved
rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) {
configsFound++

applyRulerStoreConfig = func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "local"
r.Ruler.StoreConfig.Local = r.Common.Storage.FSConfig.ToCortexLocalConfig()
})
}

chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) {
applyChunkStorageConfig = func(r *ConfigWrapper) {
r.StorageConfig.FSConfig = *r.Common.Storage.FSConfig
r.CompactorConfig.SharedStoreType = storage.StorageTypeFileSystem
})
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeFileSystem
}
}

if cfg.Common.Storage.GCS != nil {
configsFound++

applyRulerStoreConfig = func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "gcs"
r.Ruler.StoreConfig.GCS = r.Common.Storage.GCS.ToCortexGCSConfig()
}

applyChunkStorageConfig = func(r *ConfigWrapper) {
r.StorageConfig.GCSConfig = *r.Common.Storage.GCS
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeGCS
}
}

if cfg.Common.Storage.S3 != nil {
rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) {
configsFound++

applyRulerStoreConfig = func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "s3"
r.Ruler.StoreConfig.S3 = r.Common.Storage.S3.ToCortexS3Config()
})
}

chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) {
applyChunkStorageConfig = func(r *ConfigWrapper) {
r.StorageConfig.AWSStorageConfig.S3Config = *r.Common.Storage.S3
r.CompactorConfig.SharedStoreType = storage.StorageTypeS3
})
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeS3
}
}

if cfg.Common.Storage.Swift != nil {
rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) {
configsFound++

applyRulerStoreConfig = func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "swift"
r.Ruler.StoreConfig.Swift = r.Common.Storage.Swift.ToCortexSwiftConfig()
})
}

chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) {
applyChunkStorageConfig = func(r *ConfigWrapper) {
r.StorageConfig.Swift = *r.Common.Storage.Swift
r.CompactorConfig.SharedStoreType = storage.StorageTypeSwift
})
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeSwift
}
}

// store change funcs in slices and apply all at once, because once we change the
// config we can no longer compare it to the default, this allows us to only
// do that comparison once
applyRulerStoreConfigs(cfg, defaults, rulerStoreConfigsToApply)
applyChunkStorageConfigs(cfg, defaults, chunkStorageConfigsToApply)
if configsFound > 1 {
return ErrTooManyStorageConfigs
}

applyRulerStoreConfigs(cfg, defaults, applyRulerStoreConfig)
applyChunkStorageConfigs(cfg, defaults, applyChunkStorageConfig)

return nil
}

func applyRulerStoreConfigs(cfg, defaults *ConfigWrapper, apply []func(*ConfigWrapper)) {
if reflect.DeepEqual(cfg.Ruler.StoreConfig, defaults.Ruler.StoreConfig) {
for _, ap := range apply {
ap(cfg)
}
func applyRulerStoreConfigs(cfg, defaults *ConfigWrapper, apply func(*ConfigWrapper)) {
if apply != nil && reflect.DeepEqual(cfg.Ruler.StoreConfig, defaults.Ruler.StoreConfig) {
apply(cfg)
}
}

func applyChunkStorageConfigs(cfg, defaults *ConfigWrapper, apply []func(*ConfigWrapper)) {
if reflect.DeepEqual(cfg.StorageConfig, defaults.StorageConfig) {
for _, ap := range apply {
ap(cfg)
}
func applyChunkStorageConfigs(cfg, defaults *ConfigWrapper, apply func(*ConfigWrapper)) {
if apply != nil && reflect.DeepEqual(cfg.StorageConfig, defaults.StorageConfig) {
apply(cfg)
}
}

func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper) {
currentSchemaIdx := loki_storage.ActivePeriodConfig(cfg.SchemaConfig.Configs)
currentSchema := cfg.SchemaConfig.Configs[currentSchemaIdx]

if cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType == defaults.StorageConfig.BoltDBShipperConfig.SharedStoreType {
cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType = currentSchema.ObjectType
}

if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = currentSchema.ObjectType
}
}
102 changes: 90 additions & 12 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/grafana/loki/pkg/util/cfg"
)

func Test_CommonConfig(t *testing.T) {
testContext := func(configFileString string, args []string) (ConfigWrapper, ConfigWrapper) {
func Test_ApplyDynamicConfig(t *testing.T) {
testContextExposingErrs := func(configFileString string, args []string) (error, ConfigWrapper, ConfigWrapper) {
config := ConfigWrapper{}
fs := flag.NewFlagSet(t.Name(), flag.PanicOnError)

Expand All @@ -42,13 +42,22 @@ func Test_CommonConfig(t *testing.T) {
args = append(args, configFileArgs...)
}
err = cfg.DynamicUnmarshal(&config, args, fs)
require.NoError(t, err)
if err != nil {
return err, ConfigWrapper{}, ConfigWrapper{}
}

defaults := ConfigWrapper{}
freshFlags := flag.NewFlagSet(t.Name(), flag.PanicOnError)
err = cfg.DefaultUnmarshal(&defaults, args, freshFlags)
require.NoError(t, err)

return nil, config, defaults
}

testContext := func(configFileString string, args []string) (ConfigWrapper, ConfigWrapper) {
err, config, defaults := testContextExposingErrs(configFileString, args)
require.NoError(t, err)

return config, defaults
}

Expand Down Expand Up @@ -185,7 +194,7 @@ memberlist:
assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig)
})

t.Run("when multiple configs are provided, the last (alphabetically) is used as the ruler store type", func(t *testing.T) {
t.Run("when multiple configs are provided, an error is returned", func(t *testing.T) {
multipleConfig := `common:
storage:
s3:
Expand All @@ -199,14 +208,8 @@ memberlist:
chunk_buffer_size: 27
request_timeout: 5m`

config, _ := testContext(multipleConfig, nil)
assert.Equal(t, "s3", config.Ruler.StoreConfig.Type)

assert.Equal(t, "s3://foo-bucket", config.Ruler.StoreConfig.S3.Endpoint)
assert.Equal(t, "foobar", config.Ruler.StoreConfig.GCS.BucketName)

assert.Equal(t, "s3://foo-bucket", config.StorageConfig.AWSStorageConfig.S3Config.Endpoint)
assert.Equal(t, "foobar", config.StorageConfig.GCSConfig.BucketName)
err, _, _ := testContextExposingErrs(multipleConfig, nil)
assert.ErrorIs(t, err, ErrTooManyStorageConfigs)
})

t.Run("when common s3 storage config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) {
Expand Down Expand Up @@ -576,6 +579,81 @@ compactor:
assert.Equal(t, "gcs", config.CompactorConfig.SharedStoreType)
})
})

t.Run("when using boltdb storage type", func(t *testing.T) {
t.Run("default storage_config.boltdb.shared_store to the value of current_schema.object_store", func(t *testing.T) {
const boltdbSchemaConfig = `---
schema_config:
configs:
- from: 2021-08-01
store: boltdb-shipper
object_store: s3
schema: v11
index:
prefix: index_
period: 24h`
config, _ := testContext(boltdbSchemaConfig, nil)

assert.Equal(t, storage.StorageTypeS3, config.StorageConfig.BoltDBShipperConfig.SharedStoreType)
})

t.Run("default compactor.shared_store to the value of current_schema.object_store", func(t *testing.T) {
const boltdbSchemaConfig = `---
schema_config:
configs:
- from: 2021-08-01
store: boltdb-shipper
object_store: gcs
schema: v11
index:
prefix: index_
period: 24h`
config, _ := testContext(boltdbSchemaConfig, nil)

assert.Equal(t, storage.StorageTypeGCS, config.CompactorConfig.SharedStoreType)
})

t.Run("shared store types provided via config file take precedence", func(t *testing.T) {
const boltdbSchemaConfig = `---
schema_config:
configs:
- from: 2021-08-01
store: boltdb-shipper
object_store: gcs
schema: v11
index:
prefix: index_
period: 24h

storage_config:
boltdb_shipper:
shared_store: s3

compactor:
shared_store: s3`
config, _ := testContext(boltdbSchemaConfig, nil)

assert.Equal(t, storage.StorageTypeS3, config.StorageConfig.BoltDBShipperConfig.SharedStoreType)
assert.Equal(t, storage.StorageTypeS3, config.CompactorConfig.SharedStoreType)
})

t.Run("shared store types provided via command line take precedence", func(t *testing.T) {
const boltdbSchemaConfig = `---
schema_config:
configs:
- from: 2021-08-01
store: boltdb-shipper
object_store: gcs
schema: v11
index:
prefix: index_
period: 24h`
config, _ := testContext(boltdbSchemaConfig, []string{"-boltdb.shipper.compactor.shared-store", "s3", "-boltdb.shipper.shared-store", "s3"})

assert.Equal(t, storage.StorageTypeS3, config.StorageConfig.BoltDBShipperConfig.SharedStoreType)
assert.Equal(t, storage.StorageTypeS3, config.CompactorConfig.SharedStoreType)
})
})
}

// Can't use a totally empty yaml file or it causes weird behavior in the unmarhsalling
Expand Down