From ca0c6bc4ae0103363e181d73ca412b68e442f04f Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Mon, 4 May 2020 19:04:59 +0530 Subject: [PATCH 1/3] some metrics for measuring performance and failures in boltdb shipper --- pkg/loki/loki.go | 4 +- pkg/storage/store.go | 7 ++- pkg/storage/store_test.go | 2 +- .../stores/local/boltdb_index_client.go | 14 +++-- pkg/storage/stores/local/downloads.go | 15 ++++++ pkg/storage/stores/local/metrics.go | 52 +++++++++++++++++++ pkg/storage/stores/local/shipper.go | 16 ++++-- pkg/storage/stores/local/uploads_test.go | 3 +- 8 files changed, 101 insertions(+), 12 deletions(-) create mode 100644 pkg/storage/stores/local/metrics.go diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index eafe0ea88f35..c8d490db90ce 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -7,6 +7,8 @@ import ( "fmt" "net/http" + "github.com/prometheus/client_golang/prometheus" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/ring" @@ -126,7 +128,7 @@ func New(cfg Config) (*Loki, error) { } loki.setupAuthMiddleware() - storage.RegisterCustomIndexClients(cfg.StorageConfig) + storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer) serviceMap, err := loki.initModuleServices(cfg.Target) if err != nil { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 3df298c8356f..595f9a44a0a5 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -206,7 +206,7 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk. return filtered } -func RegisterCustomIndexClients(cfg Config) { +func RegisterCustomIndexClients(cfg Config, registerer prometheus.Registerer) { // BoltDB Shipper is supposed to be run as a singleton. // This could also be done in NewBoltDBIndexClientWithShipper factory method but we are doing it here because that method is used // in tests for creating multiple instances of it at a time. @@ -222,7 +222,10 @@ func RegisterCustomIndexClients(cfg Config) { return nil, err } - boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig) + boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper( + cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, + objectClient, cfg.BoltDBShipperConfig, registerer) + return boltDBIndexClientWithShipper, err }, func() (client chunk.TableClient, e error) { objectClient, err := storage.NewObjectClient(cfg.BoltDBShipperConfig.SharedStoreType, cfg.Config) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 3e8e851b3575..df0d978e1b75 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -460,7 +460,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { BoltDBShipperConfig: boltdbShipperConfig, } - RegisterCustomIndexClients(config) + RegisterCustomIndexClients(config, nil) store, err := NewStore(config, chunk.StoreConfig{}, chunk.SchemaConfig{ Configs: []chunk.PeriodConfig{ diff --git a/pkg/storage/stores/local/boltdb_index_client.go b/pkg/storage/stores/local/boltdb_index_client.go index ef60ff4e5726..703f5bd2f66b 100644 --- a/pkg/storage/stores/local/boltdb_index_client.go +++ b/pkg/storage/stores/local/boltdb_index_client.go @@ -3,6 +3,10 @@ package local import ( "context" + "github.com/prometheus/client_golang/prometheus" + + "github.com/weaveworks/common/instrument" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/local" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" @@ -15,13 +19,13 @@ type BoltdbIndexClientWithShipper struct { } // NewBoltDBIndexClientWithShipper creates a new IndexClient that used BoltDB. -func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) { +func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig, registerer prometheus.Registerer) (chunk.IndexClient, error) { boltDBIndexClient, err := local.NewBoltDBIndexClient(cfg) if err != nil { return nil, err } - shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient) + shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient, registerer) if err != nil { return nil, err } @@ -55,7 +59,9 @@ func (b *BoltdbIndexClientWithShipper) query(ctx context.Context, query chunk.In } } - return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error { - return b.QueryDB(ctx, db, query, callback) + return instrument.CollectedRequest(ctx, "QUERY", instrument.NewHistogramCollector(b.shipper.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { + return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error { + return b.QueryDB(ctx, db, query, callback) + }) }) } diff --git a/pkg/storage/stores/local/downloads.go b/pkg/storage/stores/local/downloads.go index bd7112ddbaa6..25d87cf20d93 100644 --- a/pkg/storage/stores/local/downloads.go +++ b/pkg/storage/stores/local/downloads.go @@ -7,6 +7,7 @@ import ( "os" "path" "strings" + "time" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/local" @@ -160,6 +161,9 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc fc.Lock() defer fc.Unlock() + startTime := time.Now() + totalFilesSize := int64(0) + objects, _, err := s.storageClient.List(ctx, period+"/") if err != nil { return err @@ -192,9 +196,20 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc return err } + stat, err := os.Stat(filePath) + if err != nil { + return err + } + + totalFilesSize += stat.Size() + fc.files[uploader] = df } + duration := time.Since(startTime).Seconds() + s.metrics.initialFilesDownloadDurationSeconds.WithLabelValues(period).Set(duration) + s.metrics.initialFilesDownloadSizeBytes.WithLabelValues(period).Set(float64(totalFilesSize)) + return nil } diff --git a/pkg/storage/stores/local/metrics.go b/pkg/storage/stores/local/metrics.go new file mode 100644 index 000000000000..4c5dbbe267d3 --- /dev/null +++ b/pkg/storage/stores/local/metrics.go @@ -0,0 +1,52 @@ +package local + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/instrument" +) + +type boltDBShipperMetrics struct { + // metrics for measuring performance of downloading of files per period initially i.e for the first time + initialFilesDownloadDurationSeconds *prometheus.GaugeVec + initialFilesDownloadSizeBytes *prometheus.GaugeVec + + // duration in seconds spent in serving request on index managed by BoltDB Shipper + requestDurationSeconds *prometheus.HistogramVec + + filesDownloadFailures prometheus.Counter + filesUploadFailures prometheus.Counter +} + +func newBoltDBShipperMetrics(r prometheus.Registerer) *boltDBShipperMetrics { + m := &boltDBShipperMetrics{ + initialFilesDownloadDurationSeconds: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "initial_files_download_duration_seconds", + Help: "Time (in seconds) spent in downloading of files per period, initially i.e for the first time", + }, []string{"period"}), + initialFilesDownloadSizeBytes: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "initial_files_download_size_bytes", + Help: "Size of files (in bytes) downloaded per period, initially i.e for the first time", + }, []string{"period"}), + requestDurationSeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki_boltdb_shipper", + Name: "request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using boltdb shipper", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}), + filesDownloadFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "files_download_failures_total", + Help: "Total number of failures in downloading of files", + }), + filesUploadFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "files_upload_failures_total", + Help: "TTotal number of failures in downloading of files", + }), + } + + return m +} diff --git a/pkg/storage/stores/local/shipper.go b/pkg/storage/stores/local/shipper.go index 1856e3698a6e..2c526a8241c6 100644 --- a/pkg/storage/stores/local/shipper.go +++ b/pkg/storage/stores/local/shipper.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/cortexproject/cortex/pkg/chunk" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" pkg_util "github.com/cortexproject/cortex/pkg/util" @@ -88,12 +90,13 @@ type Shipper struct { uploadedFilesMtime map[string]time.Time uploadedFilesMtimeMtx sync.RWMutex - done chan struct{} - wait sync.WaitGroup + done chan struct{} + wait sync.WaitGroup + metrics *boltDBShipperMetrics } // NewShipper creates a shipper for syncing local objects with a store -func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter) (*Shipper, error) { +func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter, registerer prometheus.Registerer) (*Shipper, error) { err := chunk_util.EnsureDirectory(cfg.CacheLocation) if err != nil { return nil, err @@ -106,6 +109,7 @@ func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGette storageClient: util.NewPrefixedObjectClient(storageClient, storageKeyPrefix), done: make(chan struct{}), uploadedFilesMtime: map[string]time.Time{}, + metrics: newBoltDBShipperMetrics(registerer), } shipper.uploader, err = shipper.getUploaderName() @@ -113,6 +117,8 @@ func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGette return nil, err } + level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("starting boltdb shipper in %d mode", cfg.Mode)) + shipper.wait.Add(1) go shipper.loop() @@ -171,6 +177,7 @@ func (s *Shipper) loop() { case <-uploadFilesTicker.C: err := s.uploadFiles(context.Background()) if err != nil { + s.metrics.filesUploadFailures.Inc() level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err) } case <-cacheCleanupTicker.C: @@ -192,6 +199,7 @@ func (s *Shipper) Stop() { // Push all boltdb files to storage before returning err := s.uploadFiles(context.Background()) if err != nil { + s.metrics.filesUploadFailures.Inc() level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err) } @@ -235,6 +243,7 @@ func (s *Shipper) syncLocalWithStorage(ctx context.Context) error { for period := range s.downloadedPeriods { if err := s.syncFilesForPeriod(ctx, period, s.downloadedPeriods[period]); err != nil { + s.metrics.filesDownloadFailures.Inc() return err } } @@ -275,6 +284,7 @@ func (s *Shipper) forEach(ctx context.Context, period string, callback func(db * s.downloadedPeriodsMtx.Unlock() if err := s.downloadFilesForPeriod(ctx, period, fc); err != nil { + s.metrics.filesDownloadFailures.Inc() return err } } diff --git a/pkg/storage/stores/local/uploads_test.go b/pkg/storage/stores/local/uploads_test.go index 40404ae2773c..b21249dfbe77 100644 --- a/pkg/storage/stores/local/uploads_test.go +++ b/pkg/storage/stores/local/uploads_test.go @@ -38,7 +38,8 @@ func createTestBoltDBWithShipper(t *testing.T, parentTempDir, ingesterName, loca }) require.NoError(t, err) - boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig) + boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper( + local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig, nil) require.NoError(t, err) return boltdbIndexClientWithShipper.(*BoltdbIndexClientWithShipper) From bade80ca122b1dc7113760508a4fe2b35527b4c4 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 6 May 2020 16:55:53 +0530 Subject: [PATCH 2/3] counting both failures and success in upload and download operation in boltdb shipper --- .../stores/local/boltdb_index_client.go | 6 ++--- pkg/storage/stores/local/metrics.go | 25 +++++++++++-------- pkg/storage/stores/local/shipper.go | 18 ++++++++----- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/pkg/storage/stores/local/boltdb_index_client.go b/pkg/storage/stores/local/boltdb_index_client.go index 703f5bd2f66b..2fa21615fd46 100644 --- a/pkg/storage/stores/local/boltdb_index_client.go +++ b/pkg/storage/stores/local/boltdb_index_client.go @@ -3,13 +3,11 @@ package local import ( "context" - "github.com/prometheus/client_golang/prometheus" - - "github.com/weaveworks/common/instrument" - "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/local" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/instrument" "go.etcd.io/bbolt" ) diff --git a/pkg/storage/stores/local/metrics.go b/pkg/storage/stores/local/metrics.go index 4c5dbbe267d3..e8b165db9214 100644 --- a/pkg/storage/stores/local/metrics.go +++ b/pkg/storage/stores/local/metrics.go @@ -6,6 +6,11 @@ import ( "github.com/weaveworks/common/instrument" ) +const ( + statusFailure = "failure" + statusSuccess = "success" +) + type boltDBShipperMetrics struct { // metrics for measuring performance of downloading of files per period initially i.e for the first time initialFilesDownloadDurationSeconds *prometheus.GaugeVec @@ -14,8 +19,8 @@ type boltDBShipperMetrics struct { // duration in seconds spent in serving request on index managed by BoltDB Shipper requestDurationSeconds *prometheus.HistogramVec - filesDownloadFailures prometheus.Counter - filesUploadFailures prometheus.Counter + filesDownloadOperationTotal *prometheus.CounterVec + filesUploadOperationTotal *prometheus.CounterVec } func newBoltDBShipperMetrics(r prometheus.Registerer) *boltDBShipperMetrics { @@ -36,16 +41,16 @@ func newBoltDBShipperMetrics(r prometheus.Registerer) *boltDBShipperMetrics { Help: "Time (in seconds) spent serving requests when using boltdb shipper", Buckets: instrument.DefBuckets, }, []string{"operation", "status_code"}), - filesDownloadFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{ + filesDownloadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: "loki_boltdb_shipper", - Name: "files_download_failures_total", - Help: "Total number of failures in downloading of files", - }), - filesUploadFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "files_download_operation_total", + Help: "Total number of download operations done by status", + }, []string{"status"}), + filesUploadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: "loki_boltdb_shipper", - Name: "files_upload_failures_total", - Help: "TTotal number of failures in downloading of files", - }), + Name: "files_upload_operation_total", + Help: "Total number of upload operations done by status", + }, []string{"status"}), } return m diff --git a/pkg/storage/stores/local/shipper.go b/pkg/storage/stores/local/shipper.go index 2c526a8241c6..3c2fe5151e2d 100644 --- a/pkg/storage/stores/local/shipper.go +++ b/pkg/storage/stores/local/shipper.go @@ -10,12 +10,11 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/cortexproject/cortex/pkg/chunk" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" pkg_util "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" "go.etcd.io/bbolt" "github.com/grafana/loki/pkg/storage/stores/util" @@ -170,16 +169,21 @@ func (s *Shipper) loop() { for { select { case <-resyncTicker.C: + status := statusSuccess err := s.syncLocalWithStorage(context.Background()) if err != nil { + status = statusFailure level.Error(pkg_util.Logger).Log("msg", "error syncing local boltdb files with storage", "err", err) } + s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() case <-uploadFilesTicker.C: + status := statusSuccess err := s.uploadFiles(context.Background()) if err != nil { - s.metrics.filesUploadFailures.Inc() + status = statusFailure level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err) } + s.metrics.filesUploadOperationTotal.WithLabelValues(status).Inc() case <-cacheCleanupTicker.C: err := s.cleanupCache() if err != nil { @@ -197,11 +201,13 @@ func (s *Shipper) Stop() { s.wait.Wait() // Push all boltdb files to storage before returning + status := statusSuccess err := s.uploadFiles(context.Background()) if err != nil { - s.metrics.filesUploadFailures.Inc() + status = statusFailure level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err) } + s.metrics.filesUploadOperationTotal.WithLabelValues(status).Inc() s.downloadedPeriodsMtx.Lock() defer s.downloadedPeriodsMtx.Unlock() @@ -243,7 +249,6 @@ func (s *Shipper) syncLocalWithStorage(ctx context.Context) error { for period := range s.downloadedPeriods { if err := s.syncFilesForPeriod(ctx, period, s.downloadedPeriods[period]); err != nil { - s.metrics.filesDownloadFailures.Inc() return err } } @@ -284,9 +289,10 @@ func (s *Shipper) forEach(ctx context.Context, period string, callback func(db * s.downloadedPeriodsMtx.Unlock() if err := s.downloadFilesForPeriod(ctx, period, fc); err != nil { - s.metrics.filesDownloadFailures.Inc() + s.metrics.filesDownloadOperationTotal.WithLabelValues(statusFailure).Inc() return err } + s.metrics.filesDownloadOperationTotal.WithLabelValues(statusSuccess).Inc() } } From 126ae8ee47dcbf0907cc2a2fd3ead7ae07ebdc55 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 13 May 2020 11:22:19 +0530 Subject: [PATCH 3/3] changes suggested from PR review --- pkg/storage/stores/local/downloads.go | 31 +++++++----- pkg/storage/stores/local/metrics.go | 68 ++++++++++++++++++++++----- pkg/storage/stores/local/shipper.go | 23 ++++----- pkg/storage/stores/local/uploads.go | 20 +++++--- 4 files changed, 100 insertions(+), 42 deletions(-) diff --git a/pkg/storage/stores/local/downloads.go b/pkg/storage/stores/local/downloads.go index 25d87cf20d93..48319eae2422 100644 --- a/pkg/storage/stores/local/downloads.go +++ b/pkg/storage/stores/local/downloads.go @@ -157,23 +157,31 @@ func (s *Shipper) getFileFromStorage(ctx context.Context, objectKey, destination // downloadFilesForPeriod should be called when files for a period does not exist i.e they were never downloaded or got cleaned up later on by TTL // While files are being downloaded it will block all reads/writes on filesCollection by taking an exclusive lock -func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc *filesCollection) error { +func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc *filesCollection) (err error) { fc.Lock() defer fc.Unlock() + defer func() { + status := statusSuccess + if err != nil { + status = statusFailure + } + s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() + }() + startTime := time.Now() totalFilesSize := int64(0) objects, _, err := s.storageClient.List(ctx, period+"/") if err != nil { - return err + return } level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", period, objects)) folderPath, err := s.getFolderPathForPeriod(period, true) if err != nil { - return err + return } for _, object := range objects { @@ -185,20 +193,21 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc filePath := path.Join(folderPath, uploader) df := downloadedFiles{} - err := s.getFileFromStorage(ctx, object.Key, filePath) + err = s.getFileFromStorage(ctx, object.Key, filePath) if err != nil { - return err + return } df.mtime = object.ModifiedAt df.boltdb, err = local.OpenBoltdbFile(filePath) if err != nil { - return err + return } - stat, err := os.Stat(filePath) + var stat os.FileInfo + stat, err = os.Stat(filePath) if err != nil { - return err + return } totalFilesSize += stat.Size() @@ -207,10 +216,10 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc } duration := time.Since(startTime).Seconds() - s.metrics.initialFilesDownloadDurationSeconds.WithLabelValues(period).Set(duration) - s.metrics.initialFilesDownloadSizeBytes.WithLabelValues(period).Set(float64(totalFilesSize)) + s.metrics.filesDownloadDurationSeconds.add(period, duration) + s.metrics.filesDownloadSizeBytes.add(period, totalFilesSize) - return nil + return } func (s *Shipper) getFolderPathForPeriod(period string, ensureExists bool) (string, error) { diff --git a/pkg/storage/stores/local/metrics.go b/pkg/storage/stores/local/metrics.go index e8b165db9214..426b0ba8fc57 100644 --- a/pkg/storage/stores/local/metrics.go +++ b/pkg/storage/stores/local/metrics.go @@ -1,6 +1,8 @@ package local import ( + "sync" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/instrument" @@ -11,10 +13,48 @@ const ( statusSuccess = "success" ) +type downloadPeriodDurationMetric struct { + sync.RWMutex + gauge prometheus.Gauge + periods map[string]float64 +} + +func (m *downloadPeriodDurationMetric) add(period string, downloadDuration float64) { + m.Lock() + defer m.Unlock() + m.periods[period] = downloadDuration + + totalDuration := float64(0) + for _, dur := range m.periods { + totalDuration += dur + } + + m.gauge.Set(totalDuration) +} + +type downloadPeriodBytesMetric struct { + sync.RWMutex + gauge prometheus.Gauge + periods map[string]int64 +} + +func (m *downloadPeriodBytesMetric) add(period string, downloadedBytes int64) { + m.Lock() + defer m.Unlock() + m.periods[period] = downloadedBytes + + totalDownloadedBytes := int64(0) + for _, downloadedBytes := range m.periods { + totalDownloadedBytes += downloadedBytes + } + + m.gauge.Set(float64(totalDownloadedBytes)) +} + type boltDBShipperMetrics struct { // metrics for measuring performance of downloading of files per period initially i.e for the first time - initialFilesDownloadDurationSeconds *prometheus.GaugeVec - initialFilesDownloadSizeBytes *prometheus.GaugeVec + filesDownloadDurationSeconds *downloadPeriodDurationMetric + filesDownloadSizeBytes *downloadPeriodBytesMetric // duration in seconds spent in serving request on index managed by BoltDB Shipper requestDurationSeconds *prometheus.HistogramVec @@ -25,16 +65,20 @@ type boltDBShipperMetrics struct { func newBoltDBShipperMetrics(r prometheus.Registerer) *boltDBShipperMetrics { m := &boltDBShipperMetrics{ - initialFilesDownloadDurationSeconds: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "loki_boltdb_shipper", - Name: "initial_files_download_duration_seconds", - Help: "Time (in seconds) spent in downloading of files per period, initially i.e for the first time", - }, []string{"period"}), - initialFilesDownloadSizeBytes: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "loki_boltdb_shipper", - Name: "initial_files_download_size_bytes", - Help: "Size of files (in bytes) downloaded per period, initially i.e for the first time", - }, []string{"period"}), + filesDownloadDurationSeconds: &downloadPeriodDurationMetric{ + periods: map[string]float64{}, + gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "initial_files_download_duration_seconds", + Help: "Time (in seconds) spent in downloading of files per period, initially i.e for the first time", + })}, + filesDownloadSizeBytes: &downloadPeriodBytesMetric{ + periods: map[string]int64{}, + gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "initial_files_download_size_bytes", + Help: "Size of files (in bytes) downloaded per period, initially i.e for the first time", + })}, requestDurationSeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "loki_boltdb_shipper", Name: "request_duration_seconds", diff --git a/pkg/storage/stores/local/shipper.go b/pkg/storage/stores/local/shipper.go index 3c2fe5151e2d..bd60ea2be1ba 100644 --- a/pkg/storage/stores/local/shipper.go +++ b/pkg/storage/stores/local/shipper.go @@ -169,21 +169,15 @@ func (s *Shipper) loop() { for { select { case <-resyncTicker.C: - status := statusSuccess err := s.syncLocalWithStorage(context.Background()) if err != nil { - status = statusFailure level.Error(pkg_util.Logger).Log("msg", "error syncing local boltdb files with storage", "err", err) } - s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() case <-uploadFilesTicker.C: - status := statusSuccess err := s.uploadFiles(context.Background()) if err != nil { - status = statusFailure level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err) } - s.metrics.filesUploadOperationTotal.WithLabelValues(status).Inc() case <-cacheCleanupTicker.C: err := s.cleanupCache() if err != nil { @@ -201,13 +195,10 @@ func (s *Shipper) Stop() { s.wait.Wait() // Push all boltdb files to storage before returning - status := statusSuccess err := s.uploadFiles(context.Background()) if err != nil { - status = statusFailure level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err) } - s.metrics.filesUploadOperationTotal.WithLabelValues(status).Inc() s.downloadedPeriodsMtx.Lock() defer s.downloadedPeriodsMtx.Unlock() @@ -243,17 +234,25 @@ func (s *Shipper) cleanupCache() error { // syncLocalWithStorage syncs all the periods that we have in the cache with the storage // i.e download new and updated files and remove files which were delete from the storage. -func (s *Shipper) syncLocalWithStorage(ctx context.Context) error { +func (s *Shipper) syncLocalWithStorage(ctx context.Context) (err error) { s.downloadedPeriodsMtx.RLock() defer s.downloadedPeriodsMtx.RUnlock() + defer func() { + status := statusSuccess + if err != nil { + status = statusFailure + } + s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() + }() + for period := range s.downloadedPeriods { if err := s.syncFilesForPeriod(ctx, period, s.downloadedPeriods[period]); err != nil { return err } } - return nil + return } // deleteFileFromCache removes a file from cache. @@ -289,10 +288,8 @@ func (s *Shipper) forEach(ctx context.Context, period string, callback func(db * s.downloadedPeriodsMtx.Unlock() if err := s.downloadFilesForPeriod(ctx, period, fc); err != nil { - s.metrics.filesDownloadOperationTotal.WithLabelValues(statusFailure).Inc() return err } - s.metrics.filesDownloadOperationTotal.WithLabelValues(statusSuccess).Inc() } } diff --git a/pkg/storage/stores/local/uploads.go b/pkg/storage/stores/local/uploads.go index b44e52928900..1990f3dcf025 100644 --- a/pkg/storage/stores/local/uploads.go +++ b/pkg/storage/stores/local/uploads.go @@ -16,14 +16,22 @@ import ( // uploadFiles uploads all new and updated files to storage. // It uploads the files from configured boltdb dir where ingester writes the index. -func (s *Shipper) uploadFiles(ctx context.Context) error { +func (s *Shipper) uploadFiles(ctx context.Context) (err error) { if s.cfg.Mode == ShipperModeReadOnly { - return nil + return } + defer func() { + status := statusSuccess + if err != nil { + status = statusFailure + } + s.metrics.filesUploadOperationTotal.WithLabelValues(status).Inc() + }() + filesInfo, err := ioutil.ReadDir(s.cfg.ActiveIndexDirectory) if err != nil { - return err + return } for _, fileInfo := range filesInfo { @@ -40,9 +48,9 @@ func (s *Shipper) uploadFiles(ctx context.Context) error { continue } - err := s.uploadFile(ctx, fileInfo.Name()) + err = s.uploadFile(ctx, fileInfo.Name()) if err != nil { - return err + return } s.uploadedFilesMtimeMtx.Lock() @@ -50,7 +58,7 @@ func (s *Shipper) uploadFiles(ctx context.Context) error { s.uploadedFilesMtimeMtx.Unlock() } - return nil + return } // uploadFile uploads one of the files locally written by ingesters to storage.