Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some metrics for measuring performance and failures in boltdb shipper #2034

Merged
merged 3 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"net/http"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
Expand Down Expand Up @@ -126,7 +128,7 @@ func New(cfg Config) (*Loki, error) {
}

loki.setupAuthMiddleware()
storage.RegisterCustomIndexClients(cfg.StorageConfig)
storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer)

serviceMap, err := loki.initModuleServices(cfg.Target)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.
return filtered
}

func RegisterCustomIndexClients(cfg Config) {
func RegisterCustomIndexClients(cfg Config, registerer prometheus.Registerer) {
// BoltDB Shipper is supposed to be run as a singleton.
// This could also be done in NewBoltDBIndexClientWithShipper factory method but we are doing it here because that method is used
// in tests for creating multiple instances of it at a time.
Expand All @@ -222,7 +222,10 @@ func RegisterCustomIndexClients(cfg Config) {
return nil, err
}

boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig)
boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper(
cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory},
objectClient, cfg.BoltDBShipperConfig, registerer)

return boltDBIndexClientWithShipper, err
}, func() (client chunk.TableClient, e error) {
objectClient, err := storage.NewObjectClient(cfg.BoltDBShipperConfig.SharedStoreType, cfg.Config)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
BoltDBShipperConfig: boltdbShipperConfig,
}

RegisterCustomIndexClients(config)
RegisterCustomIndexClients(config, nil)

store, err := NewStore(config, chunk.StoreConfig{}, chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/stores/local/boltdb_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
"go.etcd.io/bbolt"
)

Expand All @@ -15,13 +17,13 @@ type BoltdbIndexClientWithShipper struct {
}

// NewBoltDBIndexClientWithShipper creates a new IndexClient that used BoltDB.
func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) {
func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig, registerer prometheus.Registerer) (chunk.IndexClient, error) {
boltDBIndexClient, err := local.NewBoltDBIndexClient(cfg)
if err != nil {
return nil, err
}

shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient)
shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient, registerer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -55,7 +57,9 @@ func (b *BoltdbIndexClientWithShipper) query(ctx context.Context, query chunk.In
}
}

return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error {
return b.QueryDB(ctx, db, query, callback)
return instrument.CollectedRequest(ctx, "QUERY", instrument.NewHistogramCollector(b.shipper.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error {
return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error {
return b.QueryDB(ctx, db, query, callback)
})
})
}
38 changes: 31 additions & 7 deletions pkg/storage/stores/local/downloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path"
"strings"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
Expand Down Expand Up @@ -156,20 +157,31 @@ func (s *Shipper) getFileFromStorage(ctx context.Context, objectKey, destination

// downloadFilesForPeriod should be called when files for a period does not exist i.e they were never downloaded or got cleaned up later on by TTL
// While files are being downloaded it will block all reads/writes on filesCollection by taking an exclusive lock
func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc *filesCollection) error {
func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc *filesCollection) (err error) {
fc.Lock()
defer fc.Unlock()

defer func() {
status := statusSuccess
if err != nil {
status = statusFailure
}
s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc()
}()

startTime := time.Now()
totalFilesSize := int64(0)

objects, _, err := s.storageClient.List(ctx, period+"/")
if err != nil {
return err
return
}

level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", period, objects))

folderPath, err := s.getFolderPathForPeriod(period, true)
if err != nil {
return err
return
}

for _, object := range objects {
Expand All @@ -181,21 +193,33 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc
filePath := path.Join(folderPath, uploader)
df := downloadedFiles{}

err := s.getFileFromStorage(ctx, object.Key, filePath)
err = s.getFileFromStorage(ctx, object.Key, filePath)
if err != nil {
return err
return
}

df.mtime = object.ModifiedAt
df.boltdb, err = local.OpenBoltdbFile(filePath)
if err != nil {
return err
return
}

var stat os.FileInfo
stat, err = os.Stat(filePath)
if err != nil {
return
}

totalFilesSize += stat.Size()

fc.files[uploader] = df
}

return nil
duration := time.Since(startTime).Seconds()
s.metrics.filesDownloadDurationSeconds.add(period, duration)
s.metrics.filesDownloadSizeBytes.add(period, totalFilesSize)

return
}

func (s *Shipper) getFolderPathForPeriod(period string, ensureExists bool) (string, error) {
Expand Down
101 changes: 101 additions & 0 deletions pkg/storage/stores/local/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package local

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"
)

const (
statusFailure = "failure"
statusSuccess = "success"
)

type downloadPeriodDurationMetric struct {
sync.RWMutex
gauge prometheus.Gauge
periods map[string]float64
}

func (m *downloadPeriodDurationMetric) add(period string, downloadDuration float64) {
m.Lock()
defer m.Unlock()
m.periods[period] = downloadDuration

totalDuration := float64(0)
for _, dur := range m.periods {
totalDuration += dur
}

m.gauge.Set(totalDuration)
}

type downloadPeriodBytesMetric struct {
sync.RWMutex
gauge prometheus.Gauge
periods map[string]int64
}

func (m *downloadPeriodBytesMetric) add(period string, downloadedBytes int64) {
m.Lock()
defer m.Unlock()
m.periods[period] = downloadedBytes

totalDownloadedBytes := int64(0)
for _, downloadedBytes := range m.periods {
totalDownloadedBytes += downloadedBytes
}

m.gauge.Set(float64(totalDownloadedBytes))
}

type boltDBShipperMetrics struct {
// metrics for measuring performance of downloading of files per period initially i.e for the first time
filesDownloadDurationSeconds *downloadPeriodDurationMetric
filesDownloadSizeBytes *downloadPeriodBytesMetric

// duration in seconds spent in serving request on index managed by BoltDB Shipper
requestDurationSeconds *prometheus.HistogramVec

filesDownloadOperationTotal *prometheus.CounterVec
filesUploadOperationTotal *prometheus.CounterVec
}

func newBoltDBShipperMetrics(r prometheus.Registerer) *boltDBShipperMetrics {
m := &boltDBShipperMetrics{
filesDownloadDurationSeconds: &downloadPeriodDurationMetric{
periods: map[string]float64{},
gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "initial_files_download_duration_seconds",
Help: "Time (in seconds) spent in downloading of files per period, initially i.e for the first time",
})},
filesDownloadSizeBytes: &downloadPeriodBytesMetric{
periods: map[string]int64{},
gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "initial_files_download_size_bytes",
Help: "Size of files (in bytes) downloaded per period, initially i.e for the first time",
})},
requestDurationSeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki_boltdb_shipper",
Name: "request_duration_seconds",
Help: "Time (in seconds) spent serving requests when using boltdb shipper",
Buckets: instrument.DefBuckets,
}, []string{"operation", "status_code"}),
filesDownloadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_boltdb_shipper",
Name: "files_download_operation_total",
Help: "Total number of download operations done by status",
}, []string{"status"}),
filesUploadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_boltdb_shipper",
Name: "files_upload_operation_total",
Help: "Total number of upload operations done by status",
}, []string{"status"}),
}

return m
}
23 changes: 18 additions & 5 deletions pkg/storage/stores/local/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
pkg_util "github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/storage/stores/util"
Expand Down Expand Up @@ -88,12 +89,13 @@ type Shipper struct {
uploadedFilesMtime map[string]time.Time
uploadedFilesMtimeMtx sync.RWMutex

done chan struct{}
wait sync.WaitGroup
done chan struct{}
wait sync.WaitGroup
metrics *boltDBShipperMetrics
}

// NewShipper creates a shipper for syncing local objects with a store
func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter) (*Shipper, error) {
func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter, registerer prometheus.Registerer) (*Shipper, error) {
err := chunk_util.EnsureDirectory(cfg.CacheLocation)
if err != nil {
return nil, err
Expand All @@ -106,13 +108,16 @@ func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGette
storageClient: util.NewPrefixedObjectClient(storageClient, storageKeyPrefix),
done: make(chan struct{}),
uploadedFilesMtime: map[string]time.Time{},
metrics: newBoltDBShipperMetrics(registerer),
}

shipper.uploader, err = shipper.getUploaderName()
if err != nil {
return nil, err
}

level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("starting boltdb shipper in %d mode", cfg.Mode))

shipper.wait.Add(1)
go shipper.loop()

Expand Down Expand Up @@ -229,17 +234,25 @@ func (s *Shipper) cleanupCache() error {

// syncLocalWithStorage syncs all the periods that we have in the cache with the storage
// i.e download new and updated files and remove files which were delete from the storage.
func (s *Shipper) syncLocalWithStorage(ctx context.Context) error {
func (s *Shipper) syncLocalWithStorage(ctx context.Context) (err error) {
s.downloadedPeriodsMtx.RLock()
defer s.downloadedPeriodsMtx.RUnlock()

defer func() {
status := statusSuccess
if err != nil {
status = statusFailure
}
s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc()
}()

for period := range s.downloadedPeriods {
if err := s.syncFilesForPeriod(ctx, period, s.downloadedPeriods[period]); err != nil {
return err
}
}

return nil
return
}

// deleteFileFromCache removes a file from cache.
Expand Down
20 changes: 14 additions & 6 deletions pkg/storage/stores/local/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,22 @@ import (

// uploadFiles uploads all new and updated files to storage.
// It uploads the files from configured boltdb dir where ingester writes the index.
func (s *Shipper) uploadFiles(ctx context.Context) error {
func (s *Shipper) uploadFiles(ctx context.Context) (err error) {
if s.cfg.Mode == ShipperModeReadOnly {
return nil
return
}

defer func() {
status := statusSuccess
if err != nil {
status = statusFailure
}
s.metrics.filesUploadOperationTotal.WithLabelValues(status).Inc()
}()

filesInfo, err := ioutil.ReadDir(s.cfg.ActiveIndexDirectory)
if err != nil {
return err
return
}

for _, fileInfo := range filesInfo {
Expand All @@ -40,17 +48,17 @@ func (s *Shipper) uploadFiles(ctx context.Context) error {
continue
}

err := s.uploadFile(ctx, fileInfo.Name())
err = s.uploadFile(ctx, fileInfo.Name())
if err != nil {
return err
return
}

s.uploadedFilesMtimeMtx.Lock()
s.uploadedFilesMtime[fileInfo.Name()] = fileInfo.ModTime()
s.uploadedFilesMtimeMtx.Unlock()
}

return nil
return
}

// uploadFile uploads one of the files locally written by ingesters to storage.
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/local/uploads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func createTestBoltDBWithShipper(t *testing.T, parentTempDir, ingesterName, loca
})
require.NoError(t, err)

boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig)
boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper(
local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig, nil)
require.NoError(t, err)

return boltdbIndexClientWithShipper.(*BoltdbIndexClientWithShipper)
Expand Down