From 704ae773a5dbe3b05c01f0380b8534be8dcbb8e4 Mon Sep 17 00:00:00 2001 From: fuling Date: Mon, 10 Jan 2022 13:34:22 +0800 Subject: [PATCH 01/10] [enhancement] querier cache: WriteBackCache should be off query path #5072 --- pkg/storage/chunk/chunk_store_utils.go | 63 +++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 6 deletions(-) diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index 392941d78e53..289d82fd0800 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -5,6 +5,9 @@ import ( "sync" "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" @@ -16,6 +19,15 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/cache" ) +var ( + errMemcachedAsyncBufferFull = errors.New("the async buffer is full") + reasonAsyncBufferFull = "async-buffer-full" + skipped = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "loki_chunk_fetcher_cache_skipped_total", + Help: "Total number of operations against cache that have been skipped.", + }, []string{"reason"}) +) + const chunkDecodeParallelism = 16 func filterChunksByTime(from, through model.Time, chunks []Chunk) []Chunk { @@ -89,6 +101,11 @@ type Fetcher struct { wait sync.WaitGroup decodeRequests chan decodeRequest + + maxAsyncConcurrency int + maxAsyncBufferSize int + + asyncQueue chan []Chunk } type decodeRequest struct { @@ -105,11 +122,13 @@ type decodeResponse struct { // NewChunkFetcher makes a new ChunkFetcher. func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, storage Client) (*Fetcher, error) { c := &Fetcher{ - schema: schema, - storage: storage, - cache: cacher, - cacheStubs: cacheStubs, - decodeRequests: make(chan decodeRequest), + schema: schema, + storage: storage, + cache: cacher, + cacheStubs: cacheStubs, + decodeRequests: make(chan decodeRequest), + maxAsyncConcurrency: 16, + maxAsyncBufferSize: 1000, } c.wait.Add(chunkDecodeParallelism) @@ -117,14 +136,43 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, s go c.worker() } + // Start a number of goroutines - processing async operations - equal + // to the max concurrency we have. + c.asyncQueue = make(chan []Chunk, c.maxAsyncBufferSize) + for i := 0; i < c.maxAsyncConcurrency; i++ { + go c.asyncQueueProcessLoop() + } + return c, nil } +func (c *Fetcher) writeBackCacheAsync(fromStorage []Chunk) error { + select { + case c.asyncQueue <- fromStorage: + return nil + default: + return errMemcachedAsyncBufferFull + } +} + +func (c *Fetcher) asyncQueueProcessLoop() { + for { + select { + case fromStorage := <-c.asyncQueue: + cacheErr := c.writeBackCache(context.Background(), fromStorage) + if cacheErr != nil { + level.Warn(util_log.Logger).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) + } + } + } +} + // Stop the ChunkFetcher. func (c *Fetcher) Stop() { close(c.decodeRequests) c.wait.Wait() c.cache.Stop() + close(c.asyncQueue) } func (c *Fetcher) worker() { @@ -162,7 +210,10 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string } // Always cache any chunks we did get - if cacheErr := c.writeBackCache(ctx, fromStorage); cacheErr != nil { + if cacheErr := c.writeBackCacheAsync(fromStorage); cacheErr != nil { + if cacheErr == errMemcachedAsyncBufferFull { + skipped.WithLabelValues(reasonAsyncBufferFull).Inc() + } level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } From 9ef4b45b4e2f9dd77377abeaeeae549dba362107 Mon Sep 17 00:00:00 2001 From: fuling Date: Mon, 10 Jan 2022 13:57:39 +0800 Subject: [PATCH 02/10] [enhancement] querier cache: WriteBackCache should be off query path #5083 --- pkg/storage/chunk/chunk_store_utils.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index 289d82fd0800..1f0786a61b75 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -106,6 +106,7 @@ type Fetcher struct { maxAsyncBufferSize int asyncQueue chan []Chunk + stop chan struct{} } type decodeRequest struct { @@ -129,6 +130,7 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, s decodeRequests: make(chan decodeRequest), maxAsyncConcurrency: 16, maxAsyncBufferSize: 1000, + stop: make(chan struct{}, 1), } c.wait.Add(chunkDecodeParallelism) @@ -163,6 +165,8 @@ func (c *Fetcher) asyncQueueProcessLoop() { if cacheErr != nil { level.Warn(util_log.Logger).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } + case <-c.stop: + return } } } @@ -172,7 +176,7 @@ func (c *Fetcher) Stop() { close(c.decodeRequests) c.wait.Wait() c.cache.Stop() - close(c.asyncQueue) + close(c.stop) } func (c *Fetcher) worker() { From 65ee8bbd4c6370fbaf34f2eacd220a304be6ef25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=9B=BD=E5=BF=A0?= <249032432@qq.com> Date: Mon, 10 Jan 2022 15:44:10 +0800 Subject: [PATCH 03/10] Update pkg/storage/chunk/chunk_store_utils.go Co-authored-by: Danny Kopping --- pkg/storage/chunk/chunk_store_utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index 1f0786a61b75..0237f8317bef 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -163,7 +163,7 @@ func (c *Fetcher) asyncQueueProcessLoop() { case fromStorage := <-c.asyncQueue: cacheErr := c.writeBackCache(context.Background(), fromStorage) if cacheErr != nil { - level.Warn(util_log.Logger).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) + level.Warn(util_log.Logger).Log("msg", "could not write fetched chunks from storage into chunk cache", "err", cacheErr) } case <-c.stop: return From ed7c3cef91e5ab05663fc10e708455d9f723a1b7 Mon Sep 17 00:00:00 2001 From: fuling Date: Mon, 10 Jan 2022 15:53:06 +0800 Subject: [PATCH 04/10] [enhancement] querier cache: WriteBackCache should be off query path #5083 --- pkg/storage/chunk/cache/cache.go | 7 +++++++ pkg/storage/chunk/cache/cache_test.go | 2 +- pkg/storage/chunk/chunk_store.go | 2 +- pkg/storage/chunk/chunk_store_utils.go | 20 ++++++++++---------- pkg/storage/util_test.go | 2 +- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 7a66097b8d38..52faeae01084 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -41,6 +41,11 @@ type Config struct { // For tests to inject specific implementations. Cache Cache `yaml:"-"` + + // MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines. + MaxAsyncConcurrency int `yaml:"max_async_concurrency"` + // MaxAsyncBufferSize specifies the queue buffer size for SetAsync operations. + MaxAsyncBufferSize int `yaml:"max_async_buffer_size"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet @@ -50,6 +55,8 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f) + f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 16, "The maximum number of concurrent asynchronous operations can occur.") + f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 500, "The maximum number of enqueued asynchronous operations allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index 5267a66dce4f..3e9a5cb9271b 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -124,7 +124,7 @@ func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk }, } - fetcher, err := chunk.NewChunkFetcher(c, false, s, nil) + fetcher, err := chunk.NewChunkFetcher(c, false, s, nil, 10, 100) require.NoError(t, err) defer fetcher.Stop() diff --git a/pkg/storage/chunk/chunk_store.go b/pkg/storage/chunk/chunk_store.go index f9aa983ce828..833800b7e101 100644 --- a/pkg/storage/chunk/chunk_store.go +++ b/pkg/storage/chunk/chunk_store.go @@ -99,7 +99,7 @@ type baseStore struct { } func newBaseStore(cfg StoreConfig, scfg SchemaConfig, schema BaseSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (baseStore, error) { - fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, scfg, chunks) + fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, scfg, chunks, cfg.ChunkCacheConfig.MaxAsyncConcurrency, cfg.ChunkCacheConfig.MaxAsyncBufferSize) if err != nil { return baseStore{}, err } diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index 0237f8317bef..925e2417769e 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -20,9 +20,9 @@ import ( ) var ( - errMemcachedAsyncBufferFull = errors.New("the async buffer is full") - reasonAsyncBufferFull = "async-buffer-full" - skipped = promauto.NewCounterVec(prometheus.CounterOpts{ + errAsyncBufferFull = errors.New("the async buffer is full") + reasonAsyncBufferFull = "async-buffer-full" + skipped = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "loki_chunk_fetcher_cache_skipped_total", Help: "Total number of operations against cache that have been skipped.", }, []string{"reason"}) @@ -121,15 +121,15 @@ type decodeResponse struct { } // NewChunkFetcher makes a new ChunkFetcher. -func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, storage Client) (*Fetcher, error) { +func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, storage Client, maxAsyncConcurrency int, maxAsyncBufferSize int) (*Fetcher, error) { c := &Fetcher{ schema: schema, storage: storage, cache: cacher, cacheStubs: cacheStubs, decodeRequests: make(chan decodeRequest), - maxAsyncConcurrency: 16, - maxAsyncBufferSize: 1000, + maxAsyncConcurrency: maxAsyncConcurrency, + maxAsyncBufferSize: maxAsyncBufferSize, stop: make(chan struct{}, 1), } @@ -142,7 +142,7 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, s // to the max concurrency we have. c.asyncQueue = make(chan []Chunk, c.maxAsyncBufferSize) for i := 0; i < c.maxAsyncConcurrency; i++ { - go c.asyncQueueProcessLoop() + go c.asyncWriteBackCacheQueueProcessLoop() } return c, nil @@ -153,11 +153,11 @@ func (c *Fetcher) writeBackCacheAsync(fromStorage []Chunk) error { case c.asyncQueue <- fromStorage: return nil default: - return errMemcachedAsyncBufferFull + return errAsyncBufferFull } } -func (c *Fetcher) asyncQueueProcessLoop() { +func (c *Fetcher) asyncWriteBackCacheQueueProcessLoop() { for { select { case fromStorage := <-c.asyncQueue: @@ -215,7 +215,7 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string // Always cache any chunks we did get if cacheErr := c.writeBackCacheAsync(fromStorage); cacheErr != nil { - if cacheErr == errMemcachedAsyncBufferFull { + if cacheErr == errAsyncBufferFull { skipped.WithLabelValues(reasonAsyncBufferFull).Inc() } level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 50a4e34e86c9..4044126bfb14 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -210,7 +210,7 @@ func (m *mockChunkStore) GetChunkRefs(ctx context.Context, userID string, from, panic(err) } - f, err := chunk.NewChunkFetcher(cache, false, m.schemas, m.client) + f, err := chunk.NewChunkFetcher(cache, false, m.schemas, m.client, 10, 100)) if err != nil { panic(err) } From f139a052c45716caff6ac2093090c9b43bd6edd6 Mon Sep 17 00:00:00 2001 From: fuling Date: Mon, 10 Jan 2022 15:53:38 +0800 Subject: [PATCH 05/10] [enhancement] querier cache: WriteBackCache should be off query path #5083 --- pkg/storage/util_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 4044126bfb14..8c3545abf521 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -210,7 +210,7 @@ func (m *mockChunkStore) GetChunkRefs(ctx context.Context, userID string, from, panic(err) } - f, err := chunk.NewChunkFetcher(cache, false, m.schemas, m.client, 10, 100)) + f, err := chunk.NewChunkFetcher(cache, false, m.schemas, m.client, 10, 100) if err != nil { panic(err) } From 21bae260be1e834de3db2cef80517ce18950e0f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=9B=BD=E5=BF=A0?= <249032432@qq.com> Date: Mon, 10 Jan 2022 16:22:36 +0800 Subject: [PATCH 06/10] Update pkg/storage/chunk/cache/cache.go Co-authored-by: Danny Kopping --- pkg/storage/chunk/cache/cache.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 52faeae01084..d26bfefb677e 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -42,10 +42,10 @@ type Config struct { // For tests to inject specific implementations. Cache Cache `yaml:"-"` - // MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines. - MaxAsyncConcurrency int `yaml:"max_async_concurrency"` - // MaxAsyncBufferSize specifies the queue buffer size for SetAsync operations. - MaxAsyncBufferSize int `yaml:"max_async_buffer_size"` + // AsyncCacheWriteBackConcurrency specifies the number of goroutines to use when asynchronously writing chunks fetched from the store to the chunk cache. + AsyncCacheWriteBackConcurrency int `yaml:"async_cache_write_back_concurrency"` + // AsyncCacheWriteBackBufferSize specifies the maximum number of fetched chunks to buffer for writing back to the chunk cache. + AsyncCacheWriteBackBufferSize int `yaml:"async_cache_write_back_buffer_size"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet From e797b8ba0c571bfa76677a66769e1d9541f36c14 Mon Sep 17 00:00:00 2001 From: fuling Date: Mon, 10 Jan 2022 17:26:12 +0800 Subject: [PATCH 07/10] [enhancement] querier cache: WriteBackCache should be off query path #5083 --- pkg/storage/chunk/chunk_store_utils.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index 925e2417769e..0e59a6f66ad8 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -23,9 +23,17 @@ var ( errAsyncBufferFull = errors.New("the async buffer is full") reasonAsyncBufferFull = "async-buffer-full" skipped = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_skipped_total", + Name: "loki_chunk_fetcher_cache_skipped_buffer_full_total", Help: "Total number of operations against cache that have been skipped.", }, []string{"reason"}) + chunkFetcherCacheQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{ + Name: "loki_chunk_fetcher_cache_enqueue_total", + Help: "Total number of chunk enqueue cache queue.", + }) + chunkFetcherCacheQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{ + Name: "loki_chunk_fetcher_cache_dequeue_total", + Help: "Total number of chunk dequeue cache queue.", + }) ) const chunkDecodeParallelism = 16 @@ -151,6 +159,7 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, s func (c *Fetcher) writeBackCacheAsync(fromStorage []Chunk) error { select { case c.asyncQueue <- fromStorage: + chunkFetcherCacheQueueEnqueue.Inc() return nil default: return errAsyncBufferFull @@ -161,6 +170,7 @@ func (c *Fetcher) asyncWriteBackCacheQueueProcessLoop() { for { select { case fromStorage := <-c.asyncQueue: + chunkFetcherCacheQueueDequeue.Inc() cacheErr := c.writeBackCache(context.Background(), fromStorage) if cacheErr != nil { level.Warn(util_log.Logger).Log("msg", "could not write fetched chunks from storage into chunk cache", "err", cacheErr) From 862383925365a8b3bf3277b17d5c6b53570d80de Mon Sep 17 00:00:00 2001 From: fuling Date: Mon, 10 Jan 2022 17:33:22 +0800 Subject: [PATCH 08/10] [enhancement] querier cache: WriteBackCache should be off query path #5083 --- pkg/storage/chunk/cache/cache.go | 4 ++-- pkg/storage/chunk/chunk_store.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index d26bfefb677e..216d39316dde 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -55,8 +55,8 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f) - f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 16, "The maximum number of concurrent asynchronous operations can occur.") - f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 500, "The maximum number of enqueued asynchronous operations allowed.") + f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-concurrency", 16, "The maximum number of concurrent asynchronous operations can occur.") + f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-buffer-size", 500, "The maximum number of enqueued asynchronous operations allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") diff --git a/pkg/storage/chunk/chunk_store.go b/pkg/storage/chunk/chunk_store.go index 833800b7e101..8b691ab6885f 100644 --- a/pkg/storage/chunk/chunk_store.go +++ b/pkg/storage/chunk/chunk_store.go @@ -99,7 +99,7 @@ type baseStore struct { } func newBaseStore(cfg StoreConfig, scfg SchemaConfig, schema BaseSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (baseStore, error) { - fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, scfg, chunks, cfg.ChunkCacheConfig.MaxAsyncConcurrency, cfg.ChunkCacheConfig.MaxAsyncBufferSize) + fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, scfg, chunks, cfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, cfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize) if err != nil { return baseStore{}, err } From 28126efe1ff42907d70b528cbbd9f474d502523d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=9B=BD=E5=BF=A0?= <249032432@qq.com> Date: Tue, 11 Jan 2022 12:10:31 +0800 Subject: [PATCH 09/10] Update pkg/storage/chunk/chunk_store_utils.go Co-authored-by: Danny Kopping --- pkg/storage/chunk/chunk_store_utils.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index 0e59a6f66ad8..df58cae627ea 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -27,12 +27,12 @@ var ( Help: "Total number of operations against cache that have been skipped.", }, []string{"reason"}) chunkFetcherCacheQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_enqueue_total", - Help: "Total number of chunk enqueue cache queue.", + Name: "loki_chunk_fetcher_cache_enqueued_total", + Help: "Total number of chunks enqueued to a buffer to be asynchronously written back to the chunk cache.", }) chunkFetcherCacheQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_dequeue_total", - Help: "Total number of chunk dequeue cache queue.", + Name: "loki_chunk_fetcher_cache_dequeued_total", + Help: "Total number of chunks asynchronously dequeued from a buffer and written back to the chunk cache.", }) ) From 49b83aebd164f741932bf76f05e743785a6ceff8 Mon Sep 17 00:00:00 2001 From: fuling Date: Tue, 11 Jan 2022 12:24:08 +0800 Subject: [PATCH 10/10] [enhancement] querier cache: WriteBackCache should be off query path #5083 --- pkg/storage/chunk/cache/cache.go | 4 ++-- pkg/storage/chunk/chunk_store_utils.go | 15 +++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 216d39316dde..64a1a722bda9 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -55,8 +55,8 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f) - f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-concurrency", 16, "The maximum number of concurrent asynchronous operations can occur.") - f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-buffer-size", 500, "The maximum number of enqueued asynchronous operations allowed.") + f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.") + f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index df58cae627ea..8354f4a446ec 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -20,12 +20,11 @@ import ( ) var ( - errAsyncBufferFull = errors.New("the async buffer is full") - reasonAsyncBufferFull = "async-buffer-full" - skipped = promauto.NewCounterVec(prometheus.CounterOpts{ + errAsyncBufferFull = errors.New("the async buffer is full") + skipped = promauto.NewCounter(prometheus.CounterOpts{ Name: "loki_chunk_fetcher_cache_skipped_buffer_full_total", Help: "Total number of operations against cache that have been skipped.", - }, []string{"reason"}) + }) chunkFetcherCacheQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{ Name: "loki_chunk_fetcher_cache_enqueued_total", Help: "Total number of chunks enqueued to a buffer to be asynchronously written back to the chunk cache.", @@ -138,7 +137,7 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, s decodeRequests: make(chan decodeRequest), maxAsyncConcurrency: maxAsyncConcurrency, maxAsyncBufferSize: maxAsyncBufferSize, - stop: make(chan struct{}, 1), + stop: make(chan struct{}), } c.wait.Add(chunkDecodeParallelism) @@ -159,7 +158,7 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, s func (c *Fetcher) writeBackCacheAsync(fromStorage []Chunk) error { select { case c.asyncQueue <- fromStorage: - chunkFetcherCacheQueueEnqueue.Inc() + chunkFetcherCacheQueueEnqueue.Add(float64(len(fromStorage))) return nil default: return errAsyncBufferFull @@ -170,7 +169,7 @@ func (c *Fetcher) asyncWriteBackCacheQueueProcessLoop() { for { select { case fromStorage := <-c.asyncQueue: - chunkFetcherCacheQueueDequeue.Inc() + chunkFetcherCacheQueueDequeue.Add(float64(len(fromStorage))) cacheErr := c.writeBackCache(context.Background(), fromStorage) if cacheErr != nil { level.Warn(util_log.Logger).Log("msg", "could not write fetched chunks from storage into chunk cache", "err", cacheErr) @@ -226,7 +225,7 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string // Always cache any chunks we did get if cacheErr := c.writeBackCacheAsync(fromStorage); cacheErr != nil { if cacheErr == errAsyncBufferFull { - skipped.WithLabelValues(reasonAsyncBufferFull).Inc() + skipped.Inc() } level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) }