diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 7a66097b8d38..64a1a722bda9 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -41,6 +41,11 @@ type Config struct { // For tests to inject specific implementations. Cache Cache `yaml:"-"` + + // AsyncCacheWriteBackConcurrency specifies the number of goroutines to use when asynchronously writing chunks fetched from the store to the chunk cache. + AsyncCacheWriteBackConcurrency int `yaml:"async_cache_write_back_concurrency"` + // AsyncCacheWriteBackBufferSize specifies the maximum number of fetched chunks to buffer for writing back to the chunk cache. + AsyncCacheWriteBackBufferSize int `yaml:"async_cache_write_back_buffer_size"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet @@ -50,6 +55,8 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f) + f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.") + f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index 5267a66dce4f..3e9a5cb9271b 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -124,7 +124,7 @@ func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk }, } - fetcher, err := chunk.NewChunkFetcher(c, false, s, nil) + fetcher, err := chunk.NewChunkFetcher(c, false, s, nil, 10, 100) require.NoError(t, err) defer fetcher.Stop() diff --git a/pkg/storage/chunk/chunk_store.go b/pkg/storage/chunk/chunk_store.go index f9aa983ce828..8b691ab6885f 100644 --- a/pkg/storage/chunk/chunk_store.go +++ b/pkg/storage/chunk/chunk_store.go @@ -99,7 +99,7 @@ type baseStore struct { } func newBaseStore(cfg StoreConfig, scfg SchemaConfig, schema BaseSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (baseStore, error) { - fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, scfg, chunks) + fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, scfg, chunks, cfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, cfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize) if err != nil { return baseStore{}, err } diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index 392941d78e53..8354f4a446ec 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -5,6 +5,9 @@ import ( "sync" "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" @@ -16,6 +19,22 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/cache" ) +var ( + errAsyncBufferFull = errors.New("the async buffer is full") + skipped = promauto.NewCounter(prometheus.CounterOpts{ + Name: "loki_chunk_fetcher_cache_skipped_buffer_full_total", + Help: "Total number of operations against cache that have been skipped.", + }) + chunkFetcherCacheQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{ + Name: "loki_chunk_fetcher_cache_enqueued_total", + Help: "Total number of chunks enqueued to a buffer to be asynchronously written back to the chunk cache.", + }) + chunkFetcherCacheQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{ + Name: "loki_chunk_fetcher_cache_dequeued_total", + Help: "Total number of chunks asynchronously dequeued from a buffer and written back to the chunk cache.", + }) +) + const chunkDecodeParallelism = 16 func filterChunksByTime(from, through model.Time, chunks []Chunk) []Chunk { @@ -89,6 +108,12 @@ type Fetcher struct { wait sync.WaitGroup decodeRequests chan decodeRequest + + maxAsyncConcurrency int + maxAsyncBufferSize int + + asyncQueue chan []Chunk + stop chan struct{} } type decodeRequest struct { @@ -103,13 +128,16 @@ type decodeResponse struct { } // NewChunkFetcher makes a new ChunkFetcher. -func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, storage Client) (*Fetcher, error) { +func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, storage Client, maxAsyncConcurrency int, maxAsyncBufferSize int) (*Fetcher, error) { c := &Fetcher{ - schema: schema, - storage: storage, - cache: cacher, - cacheStubs: cacheStubs, - decodeRequests: make(chan decodeRequest), + schema: schema, + storage: storage, + cache: cacher, + cacheStubs: cacheStubs, + decodeRequests: make(chan decodeRequest), + maxAsyncConcurrency: maxAsyncConcurrency, + maxAsyncBufferSize: maxAsyncBufferSize, + stop: make(chan struct{}), } c.wait.Add(chunkDecodeParallelism) @@ -117,14 +145,47 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, s go c.worker() } + // Start a number of goroutines - processing async operations - equal + // to the max concurrency we have. + c.asyncQueue = make(chan []Chunk, c.maxAsyncBufferSize) + for i := 0; i < c.maxAsyncConcurrency; i++ { + go c.asyncWriteBackCacheQueueProcessLoop() + } + return c, nil } +func (c *Fetcher) writeBackCacheAsync(fromStorage []Chunk) error { + select { + case c.asyncQueue <- fromStorage: + chunkFetcherCacheQueueEnqueue.Add(float64(len(fromStorage))) + return nil + default: + return errAsyncBufferFull + } +} + +func (c *Fetcher) asyncWriteBackCacheQueueProcessLoop() { + for { + select { + case fromStorage := <-c.asyncQueue: + chunkFetcherCacheQueueDequeue.Add(float64(len(fromStorage))) + cacheErr := c.writeBackCache(context.Background(), fromStorage) + if cacheErr != nil { + level.Warn(util_log.Logger).Log("msg", "could not write fetched chunks from storage into chunk cache", "err", cacheErr) + } + case <-c.stop: + return + } + } +} + // Stop the ChunkFetcher. func (c *Fetcher) Stop() { close(c.decodeRequests) c.wait.Wait() c.cache.Stop() + close(c.stop) } func (c *Fetcher) worker() { @@ -162,7 +223,10 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string } // Always cache any chunks we did get - if cacheErr := c.writeBackCache(ctx, fromStorage); cacheErr != nil { + if cacheErr := c.writeBackCacheAsync(fromStorage); cacheErr != nil { + if cacheErr == errAsyncBufferFull { + skipped.Inc() + } level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 50a4e34e86c9..8c3545abf521 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -210,7 +210,7 @@ func (m *mockChunkStore) GetChunkRefs(ctx context.Context, userID string, from, panic(err) } - f, err := chunk.NewChunkFetcher(cache, false, m.schemas, m.client) + f, err := chunk.NewChunkFetcher(cache, false, m.schemas, m.client, 10, 100) if err != nil { panic(err) }