Skip to content

Commit

Permalink
TSDB shipper + WAL (#6049)
Browse files Browse the repository at this point in the history
* begins speccing out TSDB Head

* auto incrementing series ref + mempostings

* mintime/maxtime methods

* tsdb head IndexReader impl

* head correctly populates ref lookup

* tsdb head tests

* adds prometheus license to tsdb head

* linting

* [WIP] speccing out tsdb head wal

* fix length check and adds tsdb wal encoding tests

* exposes wal structs & removes closed semantics

* logs start time in the tsdb wal

* wal interface + testing

* exports walrecord + returns ref when appending

* specs out head manager

* tsdb head manager wal initialization

* tsdb wal rotation

* wals dont use node name, but tsdb files do

* cleans up fn signature

* multi tsdb idx now just wraps Index interfaces

* no longer sorts indices when creating multi-idx

* tenantHeads & HeadManger index impls

* head mgr tests

* bugfixes & head manager tests

* tsdb dir selection now helper fns

* period utility

* pulls out more code to helpers, fixes some var races

* head recovery is more generic

* tsdb manager builds from wals

* pulls more helpers out of headmanager

* lockedIdx, Close() on idx, tsdbManager update

* removes mmap from index reader implementation

* tsdb file

* adds tsdb shipper config and refactors initStore

* removes unused tsdbManager code

* implements stores.Index and stores.ChunkWriter for tsdb

* chunk.Data now supports an Entries() method

* moves walreader to new util/wal pkg to avoid circular dep + tsdb storage alignment

* tsdb store

* passes indexWriter to chunkWriter

* build a tsdb per index bucket in according with shipper conventions

* dont open tsdb files until necessary for indexshipper

* tsdbManager Index impl

* tsdb defaults + initStore fix for invalid looping

* fixes UsingTSDB helper

* disables deleteRequestStore when using TSDB

* pass limits to tsdb store

* always start headmanager for tsdb
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* fixes copy bug
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* more logging
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* fixes duplicate tenant label bug
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* debug logs, uses label builder, removes __name__=logs for tsdb
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* tsdb fixes labels at earlier pt
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* account for setting tenant label in head manager test

* changing tsdb dir names

* identifier interface, builder to tsdb pkg

* tsdb version path prefix

* fixes buildfromwals identifier

* fixes tsdb shipper paths

* split buckets once per user set

* refactors combining single and multi tenant tsdb indices on shipper reads

* indexshipper ignores old gzip logic

* method name refactor

* remove unused record type

* removes v1 prefix in tsdb paths and refactores indices method

* ignores double optimization in tsdb looking for multitenant idx, shipper handles this

* removes 5-ln requirement on shipper tablename regexp

* groups identifiers, begins removing multitenant prefix in shipped files

* passses open fn to indexshipper

* exposes RealByteSlice

* TSDBFile no longer needs a file descriptor, parses gzip extensions

* method signature fixing

* stop masquerading as compressed indices post-download in indexshipper

* variable bucket regexp

* removes accidental configs committed

* label matcher handling for multitenancy and metricname in tsdb

* explicitly require fingerprint when creating tsdb index

* only add tenant label when creating multitenant tsdb

write fingerprints without synthetic tenant label

strip out tenant labels from queries

* linting + unused removal

* more linting :(

* goimports

* removes uploadername from indexshipper

* maxuint32 for arm32 builds

* tsdb chunk filterer support

* always set ingester name when using object storage index

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
owen-d and sandeepsukhani authored May 5, 2022
1 parent 03153e8 commit b45efd4
Show file tree
Hide file tree
Showing 49 changed files with 2,926 additions and 297 deletions.
11 changes: 10 additions & 1 deletion pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func (f Facade) Utilization() float64 {
return f.c.Utilization()
}

// Size implements encoding.Chunk.
// Size implements encoding.Chunk, which unfortunately uses
// the Size method to refer to the byte size and not the entry count
// like chunkenc.Chunk does.
func (f Facade) Size() int {
if f.c == nil {
return 0
Expand All @@ -82,6 +84,13 @@ func (f Facade) Size() int {
return f.c.CompressedSize()
}

func (f Facade) Entries() int {
if f.c == nil {
return 0
}
return f.c.Size()
}

// LokiChunk returns the chunkenc.Chunk.
func (f Facade) LokiChunk() Chunk {
return f.c
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/grafana/loki/pkg/util"
errUtil "github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/wal"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -420,7 +421,7 @@ func (i *Ingester) starting(ctx context.Context) error {
)

level.Info(util_log.Logger).Log("msg", "recovering from WAL")
segmentReader, segmentCloser, err := newWalReader(i.cfg.WAL.Dir, -1)
segmentReader, segmentCloser, err := wal.NewWalReader(i.cfg.WAL.Dir, -1)
if err != nil {
return err
}
Expand Down
34 changes: 0 additions & 34 deletions pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,6 @@ func (NoopWALReader) Err() error { return nil }
func (NoopWALReader) Record() []byte { return nil }
func (NoopWALReader) Close() error { return nil }

// If startSegment is <0, it means all the segments.
func newWalReader(dir string, startSegment int) (*wal.Reader, io.Closer, error) {
var (
segmentReader io.ReadCloser
err error
)
if startSegment < 0 {
segmentReader, err = wal.NewSegmentsReader(dir)
if err != nil {
return nil, nil, err
}
} else {
first, last, err := wal.Segments(dir)
if err != nil {
return nil, nil, err
}
if startSegment > last {
return nil, nil, errors.New("start segment is beyond the last WAL segment")
}
if first > startSegment {
startSegment = first
}
segmentReader, err = wal.NewSegmentsRangeReader(wal.SegmentRange{
Dir: dir,
First: startSegment,
Last: -1, // Till the end.
})
if err != nil {
return nil, nil, err
}
}
return wal.NewReader(segmentReader), segmentReader, nil
}

func newCheckpointReader(dir string) (WALReader, io.Closer, error) {
lastCheckpointDir, idx, err := lastCheckpoint(dir)
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
betterBoltdbShipperDefaults(r, &defaults)
}

if len(r.SchemaConfig.Configs) > 0 && config.UsingTSDB(r.SchemaConfig.Configs) {
betterTSDBShipperDefaults(r, &defaults)
}

applyFIFOCacheConfig(r)
applyIngesterFinalSleep(r)
applyIngesterReplicationFactor(r)
Expand Down Expand Up @@ -497,6 +501,31 @@ func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper) {
}
}

func betterTSDBShipperDefaults(cfg, defaults *ConfigWrapper) {
currentSchemaIdx := config.ActivePeriodConfig(cfg.SchemaConfig.Configs)
currentSchema := cfg.SchemaConfig.Configs[currentSchemaIdx]

if cfg.StorageConfig.TSDBShipperConfig.SharedStoreType == defaults.StorageConfig.TSDBShipperConfig.SharedStoreType {
cfg.StorageConfig.TSDBShipperConfig.SharedStoreType = currentSchema.ObjectType
}

if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = currentSchema.ObjectType
}

if cfg.Common.PathPrefix != "" {
prefix := strings.TrimSuffix(cfg.Common.PathPrefix, "/")

if cfg.StorageConfig.TSDBShipperConfig.ActiveIndexDirectory == "" {
cfg.StorageConfig.TSDBShipperConfig.ActiveIndexDirectory = fmt.Sprintf("%s/tsdb-shipper-active", prefix)
}

if cfg.StorageConfig.TSDBShipperConfig.CacheLocation == "" {
cfg.StorageConfig.TSDBShipperConfig.CacheLocation = fmt.Sprintf("%s/tsdb-shipper-cache", prefix)
}
}
}

// applyFIFOCacheConfig turns on FIFO cache for the chunk store and for the query range results,
// but only if no other cache storage is configured (redis or memcache).
//
Expand Down
111 changes: 79 additions & 32 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/cache"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
Expand Down Expand Up @@ -382,19 +383,25 @@ func (t *Loki) initTableManager() (services.Service, error) {
}

func (t *Loki) initStore() (_ services.Service, err error) {
// Always set these configs
// TODO(owen-d): Do the same for TSDB when we add IndexGatewayClientConfig
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Mode = t.Cfg.IndexGateway.Mode
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRing

// If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache.
// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
if t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
if t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) {
t.Cfg.ChunkStoreConfig.DisableIndexDeduplication = true
t.Cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{}
}

if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
// Set configs pertaining to object storage based indices
if config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) {
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID

switch true {
case t.Cfg.isModuleEnabled(Ingester), t.Cfg.isModuleEnabled(Write):
// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
// Use fifo cache for caching index in memory, this also significantly helps performance.
t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{
EnableFifoCache: true,
Expand All @@ -412,22 +419,53 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// have query gaps on chunks flushed after an index entry is cached by keeping them retained in the ingester
// and queried as part of live data until the cache TTL expires on the index entry.
t.Cfg.Ingester.RetainPeriod = t.Cfg.StorageConfig.IndexCacheValidity + 1*time.Minute
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg)

// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval)

t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)

case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.isModuleActive(IndexGateway):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
default:
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg)
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval)
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadWrite
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)

}
}

t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Mode = t.Cfg.IndexGateway.Mode
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRing
if config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) {
var asyncStore bool

shipperConfigIdx := config.ActivePeriodConfig(t.Cfg.SchemaConfig.Configs)
iTy := t.Cfg.SchemaConfig.Configs[shipperConfigIdx].IndexType
if iTy != config.BoltDBShipperType && iTy != config.TSDBType {
shipperConfigIdx++
}

// TODO(owen-d): make helper more agnostic between boltdb|tsdb
var resyncInterval time.Duration
switch t.Cfg.SchemaConfig.Configs[shipperConfigIdx].IndexType {
case config.BoltDBShipperType:
resyncInterval = t.Cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval
case config.TSDBType:
resyncInterval = t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval
}

minIngesterQueryStoreDuration := shipperMinIngesterQueryStoreDuration(
t.Cfg.Ingester.MaxChunkAge,
shipperQuerierIndexUpdateDelay(
t.Cfg.StorageConfig.IndexCacheValidity,
resyncInterval,
),
)

var asyncStore bool
if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)
switch true {
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read):
// Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true
Expand All @@ -439,30 +477,34 @@ func (t *Loki) initStore() (_ services.Service, err error) {
asyncStore = true
case t.Cfg.isModuleEnabled(IndexGateway):
// we want to use the actual storage when running the index-gateway, so we remove the Addr from the config
// TODO(owen-d): Do the same for TSDB when we add IndexGatewayClientConfig
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true
case t.Cfg.isModuleEnabled(All):
// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
// ToDo: See if we can avoid doing this when not running loki in clustered mode.
t.Cfg.Ingester.QueryStore = true
boltdbShipperConfigIdx := config.ActivePeriodConfig(t.Cfg.SchemaConfig.Configs)
if t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != config.BoltDBShipperType {
boltdbShipperConfigIdx++
}
mlb, err := calculateMaxLookBack(t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.Cfg.Ingester.QueryStoreMaxLookBackPeriod,
boltdbShipperMinIngesterQueryStoreDuration)

mlb, err := calculateMaxLookBack(
t.Cfg.SchemaConfig.Configs[shipperConfigIdx],
t.Cfg.Ingester.QueryStoreMaxLookBackPeriod,
minIngesterQueryStoreDuration,
)
if err != nil {
return nil, err
}
t.Cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
}
}

if asyncStore {
t.Cfg.StorageConfig.EnableAsyncStore = true
t.Cfg.StorageConfig.AsyncStoreConfig = storage.AsyncStoreCfg{
IngesterQuerier: t.ingesterQuerier,
QueryIngestersWithin: calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)),
if asyncStore {
t.Cfg.StorageConfig.EnableAsyncStore = true
t.Cfg.StorageConfig.AsyncStoreConfig = storage.AsyncStoreCfg{
IngesterQuerier: t.ingesterQuerier,
QueryIngestersWithin: calculateAsyncStoreQueryIngestersWithin(
t.Cfg.Querier.QueryIngestersWithin,
minIngesterQueryStoreDuration,
),
}
}
}

Expand Down Expand Up @@ -908,6 +950,11 @@ func (t *Loki) initUsageReport() (services.Service, error) {
}

func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode)
if err != nil {
return nil, err
Expand Down Expand Up @@ -954,24 +1001,24 @@ func calculateAsyncStoreQueryIngestersWithin(queryIngestersWithinConfig, minDura
return queryIngestersWithinConfig
}

// boltdbShipperQuerierIndexUpdateDelay returns duration it could take for queriers to serve the index since it was uploaded.
// shipperQuerierIndexUpdateDelay returns duration it could take for queriers to serve the index since it was uploaded.
// It considers upto 3 sync attempts for the indexgateway/queries to be successful in syncing the files to factor in worst case scenarios like
// failures in sync, low download throughput, various kinds of caches in between etc. which can delay the sync operation from getting all the updates from the storage.
// It also considers index cache validity because a querier could have cached index just before it was going to resync which means
// it would keep serving index until the cache entries expire.
func boltdbShipperQuerierIndexUpdateDelay(cfg Config) time.Duration {
return cfg.StorageConfig.IndexCacheValidity + cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval*3
func shipperQuerierIndexUpdateDelay(cacheValidity, resyncInterval time.Duration) time.Duration {
return cacheValidity + resyncInterval*3
}

// boltdbShipperIngesterIndexUploadDelay returns duration it could take for an index file containing id of a chunk to be uploaded to the shared store since it got flushed.
func boltdbShipperIngesterIndexUploadDelay() time.Duration {
// shipperIngesterIndexUploadDelay returns duration it could take for an index file containing id of a chunk to be uploaded to the shared store since it got flushed.
func shipperIngesterIndexUploadDelay() time.Duration {
return uploads.ShardDBsByDuration + shipper.UploadInterval
}

// boltdbShipperMinIngesterQueryStoreDuration returns minimum duration(with some buffer) ingesters should query their stores to
// avoid queriers from missing any logs or chunk ids due to async nature of BoltDB Shipper.
func boltdbShipperMinIngesterQueryStoreDuration(cfg Config) time.Duration {
return cfg.Ingester.MaxChunkAge + boltdbShipperIngesterIndexUploadDelay() + boltdbShipperQuerierIndexUpdateDelay(cfg) + 5*time.Minute
// shipperMinIngesterQueryStoreDuration returns minimum duration(with some buffer) ingesters should query their stores to
// avoid missing any logs or chunk ids due to async nature of shipper.
func shipperMinIngesterQueryStoreDuration(maxChunkAge, querierUpdateDelay time.Duration) time.Duration {
return maxChunkAge + shipperIngesterIndexUploadDelay() + querierUpdateDelay + 5*time.Minute
}

// NewServerService constructs service from Server component.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func newBigchunk() *bigchunk {
return &bigchunk{}
}

// TODO(owen-d): remove bigchunk from our code, we don't use it.
// Hack an Entries() impl
func (b *bigchunk) Entries() int { return 0 }

func (b *bigchunk) Add(sample model.SamplePair) (Data, error) {
if b.remainingSamples == 0 {
if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes {
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/chunk/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Data interface {
Rebound(start, end model.Time, filter filter.Func) (Data, error)
// Size returns the approximate length of the chunk in bytes.
Size() int
// Entries returns the number of entries in a chunk
Entries() int
Utilization() float64
}

Expand Down
39 changes: 35 additions & 4 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
StorageTypeSwift = "swift"
// BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage
BoltDBShipperType = "boltdb-shipper"
TSDBType = "tsdb"
)

var (
Expand Down Expand Up @@ -184,17 +185,47 @@ func ActivePeriodConfig(configs []PeriodConfig) int {
return i
}

// UsingBoltdbShipper checks whether current or the next index type is boltdb-shipper, returns true if yes.
func UsingBoltdbShipper(configs []PeriodConfig) bool {
func usingForPeriodConfigs(configs []PeriodConfig, fn func(PeriodConfig) bool) bool {
activePCIndex := ActivePeriodConfig(configs)
if configs[activePCIndex].IndexType == BoltDBShipperType ||
(len(configs)-1 > activePCIndex && configs[activePCIndex+1].IndexType == BoltDBShipperType) {

if fn(configs[activePCIndex]) ||
(len(configs)-1 > activePCIndex && fn(configs[activePCIndex+1])) {
return true
}

return false
}

func UsingObjectStorageIndex(configs []PeriodConfig) bool {
fn := func(cfg PeriodConfig) bool {
switch cfg.IndexType {
case BoltDBShipperType, TSDBType:
return true
default:
return false
}
}

return usingForPeriodConfigs(configs, fn)
}

// UsingBoltdbShipper checks whether current or the next index type is boltdb-shipper, returns true if yes.
func UsingBoltdbShipper(configs []PeriodConfig) bool {
fn := func(cfg PeriodConfig) bool {
return cfg.IndexType == BoltDBShipperType
}

return usingForPeriodConfigs(configs, fn)
}

func UsingTSDB(configs []PeriodConfig) bool {
fn := func(cfg PeriodConfig) bool {
return cfg.IndexType == TSDBType
}

return usingForPeriodConfigs(configs, fn)
}

func defaultRowShards(schema string) uint32 {
switch schema {
case "v1", "v2", "v3", "v4", "v5", "v6", "v9":
Expand Down
Loading

0 comments on commit b45efd4

Please sign in to comment.