From 9c55b2722ae5fd87969e0e7752d8a95ec23caf78 Mon Sep 17 00:00:00 2001 From: Dmitry Shmulevich Date: Mon, 14 Sep 2020 07:00:38 -0700 Subject: [PATCH] Added support for Redis Cluster and Redis Sentinel (#2961) * Added support for Redis Cluster and Redis Sentinel (#2959) Signed-off-by: Dmitry Shmulevich * addressed comments Signed-off-by: Dmitry Shmulevich * fixed 'make doc' Signed-off-by: Dmitry Shmulevich * fixed 'make lint' Signed-off-by: Dmitry Shmulevich * updated Changelog Signed-off-by: Dmitry Shmulevich * updated Changelog Signed-off-by: Dmitry Shmulevich * updated go.mod Signed-off-by: Dmitry Shmulevich * addressed comments Signed-off-by: Dmitry Shmulevich * removed deprecated flags in redis config Signed-off-by: Dmitry Shmulevich * updated modules Signed-off-by: Dmitry Shmulevich * updated modules Signed-off-by: Dmitry Shmulevich * added warning when Redis sentinel returns unexpected master info Signed-off-by: Dmitry Shmulevich * fixed 'make lint' Signed-off-by: Dmitry Shmulevich * updated unit test Signed-off-by: Dmitry Shmulevich * added master group name to Redis Sentinel config Signed-off-by: Dmitry Shmulevich * updated config validation Signed-off-by: Dmitry Shmulevich * use redis universal client Signed-off-by: Dmitry Shmulevich * updated dependencies Signed-off-by: Dmitry Shmulevich * remove obsolete interface Signed-off-by: Dmitry Shmulevich * addressed comments Signed-off-by: Dmitry Shmulevich * add Redis DB index selection Signed-off-by: Dmitry Shmulevich * updated CHANGELOG Signed-off-by: Dmitry Shmulevich * Fixed CHANGELOG Signed-off-by: Marco Pracucci Co-authored-by: Marco Pracucci --- cache/cache.go | 2 +- cache/redis_cache.go | 130 ++++-------------------------------- cache/redis_cache_test.go | 61 ++++++----------- cache/redis_client.go | 137 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 169 insertions(+), 161 deletions(-) create mode 100644 cache/redis_client.go diff --git a/cache/cache.go b/cache/cache.go index dbbc6b2e8c4f..b6144dbf8fea 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -99,7 +99,7 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger) (Cache, error cfg.Redis.Expiration = cfg.DefaultValidity } cacheName := cfg.Prefix + "redis" - cache := NewRedisCache(cfg.Redis, cacheName, nil, logger) + cache := NewRedisCache(cacheName, NewRedisClient(&cfg.Redis), logger) caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache, reg), reg)) } diff --git a/cache/redis_cache.go b/cache/redis_cache.go index 382290e30ba8..5887bd84eedf 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -2,102 +2,37 @@ package cache import ( "context" - "flag" - "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/gomodule/redigo/redis" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" ) // RedisCache type caches chunks in redis type RedisCache struct { - name string - expiration int - timeout time.Duration - pool *redis.Pool - logger log.Logger -} - -// RedisConfig defines how a RedisCache should be constructed. -type RedisConfig struct { - Endpoint string `yaml:"endpoint"` - Timeout time.Duration `yaml:"timeout"` - Expiration time.Duration `yaml:"expiration"` - MaxIdleConns int `yaml:"max_idle_conns"` - MaxActiveConns int `yaml:"max_active_conns"` - Password flagext.Secret `yaml:"password"` - EnableTLS bool `yaml:"enable_tls"` - IdleTimeout time.Duration `yaml:"idle_timeout"` - WaitOnPoolExhaustion bool `yaml:"wait_on_pool_exhaustion"` - MaxConnLifetime time.Duration `yaml:"max_conn_lifetime"` -} - -// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet -func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { - f.StringVar(&cfg.Endpoint, prefix+"redis.endpoint", "", description+"Redis service endpoint to use when caching chunks. If empty, no redis will be used.") - f.DurationVar(&cfg.Timeout, prefix+"redis.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on redis requests.") - f.DurationVar(&cfg.Expiration, prefix+"redis.expiration", 0, description+"How long keys stay in the redis.") - f.IntVar(&cfg.MaxIdleConns, prefix+"redis.max-idle-conns", 80, description+"Maximum number of idle connections in pool.") - f.IntVar(&cfg.MaxActiveConns, prefix+"redis.max-active-conns", 0, description+"Maximum number of active connections in pool.") - f.Var(&cfg.Password, prefix+"redis.password", description+"Password to use when connecting to redis.") - f.BoolVar(&cfg.EnableTLS, prefix+"redis.enable-tls", false, description+"Enables connecting to redis with TLS.") - f.DurationVar(&cfg.IdleTimeout, prefix+"redis.idle-timeout", 0, description+"Close connections after remaining idle for this duration. If the value is zero, then idle connections are not closed.") - f.BoolVar(&cfg.WaitOnPoolExhaustion, prefix+"redis.wait-on-pool-exhaustion", false, description+"Enables waiting if there are no idle connections. If the value is false and the pool is at the max_active_conns limit, the pool will return a connection with ErrPoolExhausted error and not wait for idle connections.") - f.DurationVar(&cfg.MaxConnLifetime, prefix+"redis.max-conn-lifetime", 0, description+"Close connections older than this duration. If the value is zero, then the pool does not close connections based on age.") + name string + redis *RedisClient + logger log.Logger } // NewRedisCache creates a new RedisCache -func NewRedisCache(cfg RedisConfig, name string, pool *redis.Pool, logger log.Logger) *RedisCache { +func NewRedisCache(name string, redisClient *RedisClient, logger log.Logger) *RedisCache { util.WarnExperimentalUse("Redis cache") - // pool != nil only in unit tests - if pool == nil { - pool = &redis.Pool{ - Dial: func() (redis.Conn, error) { - options := make([]redis.DialOption, 0, 2) - if cfg.EnableTLS { - options = append(options, redis.DialUseTLS(true)) - } - if cfg.Password.Value != "" { - options = append(options, redis.DialPassword(cfg.Password.Value)) - } - - c, err := redis.Dial("tcp", cfg.Endpoint, options...) - if err != nil { - return nil, err - } - return c, err - }, - MaxIdle: cfg.MaxIdleConns, - MaxActive: cfg.MaxActiveConns, - IdleTimeout: cfg.IdleTimeout, - Wait: cfg.WaitOnPoolExhaustion, - MaxConnLifetime: cfg.MaxConnLifetime, - } - } - cache := &RedisCache{ - expiration: int(cfg.Expiration.Seconds()), - timeout: cfg.Timeout, - name: name, - pool: pool, - logger: logger, + name: name, + redis: redisClient, + logger: logger, } - - if err := cache.ping(context.Background()); err != nil { - level.Error(logger).Log("msg", "error connecting to redis", "endpoint", cfg.Endpoint, "err", err) + if err := cache.redis.Ping(context.Background()); err != nil { + level.Error(logger).Log("msg", "error connecting to redis", "name", name, "err", err) } - return cache } // Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested. func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { - data, err := c.mget(ctx, keys) - + data, err := c.redis.MGet(ctx, keys) if err != nil { level.Error(c.logger).Log("msg", "failed to get from redis", "name", c.name, "err", err) missed = make([]string, len(keys)) @@ -117,7 +52,7 @@ func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, // Store stores the key in the cache. func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) { - err := c.mset(ctx, keys, bufs, c.expiration) + err := c.redis.MSet(ctx, keys, bufs) if err != nil { level.Error(c.logger).Log("msg", "failed to put to redis", "name", c.name, "err", err) } @@ -125,46 +60,5 @@ func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) { // Stop stops the redis client. func (c *RedisCache) Stop() { - _ = c.pool.Close() -} - -// mset adds key-value pairs to the cache. -func (c *RedisCache) mset(_ context.Context, keys []string, bufs [][]byte, ttl int) error { - conn := c.pool.Get() - defer conn.Close() - - if err := conn.Send("MULTI"); err != nil { - return err - } - for i := range keys { - if err := conn.Send("SETEX", keys[i], ttl, bufs[i]); err != nil { - return err - } - } - _, err := redis.DoWithTimeout(conn, c.timeout, "EXEC") - return err -} - -// mget retrieves values from the cache. -func (c *RedisCache) mget(_ context.Context, keys []string) ([][]byte, error) { - intf := make([]interface{}, len(keys)) - for i, key := range keys { - intf[i] = key - } - - conn := c.pool.Get() - defer conn.Close() - - return redis.ByteSlices(redis.DoWithTimeout(conn, c.timeout, "MGET", intf...)) -} - -func (c *RedisCache) ping(_ context.Context) error { - conn := c.pool.Get() - defer conn.Close() - - pong, err := redis.DoWithTimeout(conn, c.timeout, "PING") - if err == nil { - _, err = redis.String(pong, err) - } - return err + _ = c.redis.Close() } diff --git a/cache/redis_cache_test.go b/cache/redis_cache_test.go index 1511c11e96bb..acccf96e067a 100644 --- a/cache/redis_cache_test.go +++ b/cache/redis_cache_test.go @@ -1,28 +1,20 @@ -package cache_test +package cache import ( "context" "testing" "time" + "github.com/alicebob/miniredis" "github.com/go-kit/kit/log" - "github.com/gomodule/redigo/redis" - "github.com/rafaeljusto/redigomock" + "github.com/go-redis/redis/v8" "github.com/stretchr/testify/require" - - "github.com/cortexproject/cortex/pkg/chunk/cache" ) func TestRedisCache(t *testing.T) { - cfg := cache.RedisConfig{ - Timeout: 10 * time.Millisecond, - } - - conn := redigomock.NewConn() - conn.Clear() - pool := &redis.Pool{Dial: func() (redis.Conn, error) { - return conn, nil - }, MaxIdle: 10} + c, err := mockRedisCache() + require.Nil(t, err) + defer c.redis.Close() keys := []string{"key1", "key2", "key3"} bufs := [][]byte{[]byte("data1"), []byte("data2"), []byte("data3")} @@ -32,29 +24,8 @@ func TestRedisCache(t *testing.T) { nHit := len(keys) require.Len(t, bufs, nHit) - // mock Redis Store - mockRedisStore(conn, keys, bufs) - - //mock cache hit - keyIntf := make([]interface{}, nHit) - bufIntf := make([]interface{}, nHit) - - for i := 0; i < nHit; i++ { - keyIntf[i] = keys[i] - bufIntf[i] = bufs[i] - } - conn.Command("MGET", keyIntf...).Expect(bufIntf) - - // mock cache miss nMiss := len(miss) - missIntf := make([]interface{}, nMiss) - for i, s := range miss { - missIntf[i] = s - } - conn.Command("MGET", missIntf...).ExpectError(nil) - // mock the cache - c := cache.NewRedisCache(cfg, "mock", pool, log.NewNopLogger()) ctx := context.Background() c.Store(ctx, keys, bufs) @@ -79,12 +50,18 @@ func TestRedisCache(t *testing.T) { } } -func mockRedisStore(conn *redigomock.Conn, keys []string, bufs [][]byte) { - conn.Command("MULTI") - ret := []interface{}{} - for i := range keys { - conn.Command("SETEX", keys[i], 0, bufs[i]) - ret = append(ret, "OK") +func mockRedisCache() (*RedisCache, error) { + redisServer, err := miniredis.Run() + if err != nil { + return nil, err + + } + redisClient := &RedisClient{ + expiration: time.Minute, + timeout: 100 * time.Millisecond, + rdb: redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: []string{redisServer.Addr()}, + }), } - conn.Command("EXEC").Expect(ret) + return NewRedisCache("mock", redisClient, log.NewNopLogger()), nil } diff --git a/cache/redis_client.go b/cache/redis_client.go new file mode 100644 index 000000000000..df4ad5aadb3c --- /dev/null +++ b/cache/redis_client.go @@ -0,0 +1,137 @@ +package cache + +import ( + "context" + "crypto/tls" + "flag" + "fmt" + "strings" + "time" + "unsafe" + + "github.com/cortexproject/cortex/pkg/util/flagext" + + "github.com/go-redis/redis/v8" +) + +// RedisConfig defines how a RedisCache should be constructed. +type RedisConfig struct { + Endpoint string `yaml:"endpoint"` + MasterName string `yaml:"master_name"` + Timeout time.Duration `yaml:"timeout"` + Expiration time.Duration `yaml:"expiration"` + DB int `yaml:"db"` + PoolSize int `yaml:"pool_size"` + Password flagext.Secret `yaml:"password"` + EnableTLS bool `yaml:"enable_tls"` + IdleTimeout time.Duration `yaml:"idle_timeout"` + MaxConnAge time.Duration `yaml:"max_connection_age"` +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { + f.StringVar(&cfg.Endpoint, prefix+"redis.endpoint", "", description+"Redis Server endpoint to use for caching. A comma-separated list of endpoints for Redis Cluster or Redis Sentinel. If empty, no redis will be used.") + f.StringVar(&cfg.MasterName, prefix+"redis.master-name", "", description+"Redis Sentinel master name. An empty string for Redis Server or Redis Cluster.") + f.DurationVar(&cfg.Timeout, prefix+"redis.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on redis requests.") + f.DurationVar(&cfg.Expiration, prefix+"redis.expiration", 0, description+"How long keys stay in the redis.") + f.IntVar(&cfg.DB, prefix+"redis.db", 0, description+"Database index.") + f.IntVar(&cfg.PoolSize, prefix+"redis.pool-size", 0, description+"Maximum number of connections in the pool.") + f.Var(&cfg.Password, prefix+"redis.password", description+"Password to use when connecting to redis.") + f.BoolVar(&cfg.EnableTLS, prefix+"redis.enable-tls", false, description+"Enables connecting to redis with TLS.") + f.DurationVar(&cfg.IdleTimeout, prefix+"redis.idle-timeout", 0, description+"Close connections after remaining idle for this duration. If the value is zero, then idle connections are not closed.") + f.DurationVar(&cfg.MaxConnAge, prefix+"redis.max-connection-age", 0, description+"Close connections older than this duration. If the value is zero, then the pool does not close connections based on age.") +} + +type RedisClient struct { + expiration time.Duration + timeout time.Duration + rdb redis.UniversalClient +} + +// NewRedisClient creates Redis client +func NewRedisClient(cfg *RedisConfig) *RedisClient { + opt := &redis.UniversalOptions{ + Addrs: strings.Split(cfg.Endpoint, ","), + MasterName: cfg.MasterName, + Password: cfg.Password.Value, + DB: cfg.DB, + PoolSize: cfg.PoolSize, + IdleTimeout: cfg.IdleTimeout, + MaxConnAge: cfg.MaxConnAge, + } + if cfg.EnableTLS { + opt.TLSConfig = &tls.Config{} + } + return &RedisClient{ + expiration: cfg.Expiration, + timeout: cfg.Timeout, + rdb: redis.NewUniversalClient(opt), + } +} + +func (c *RedisClient) Ping(ctx context.Context) error { + var cancel context.CancelFunc + if c.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, c.timeout) + defer cancel() + } + + pong, err := c.rdb.Ping(ctx).Result() + if err != nil { + return err + } + if pong != "PONG" { + return fmt.Errorf("redis: Unexpected PING response %q", pong) + } + return nil +} + +func (c *RedisClient) MSet(ctx context.Context, keys []string, values [][]byte) error { + var cancel context.CancelFunc + if c.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, c.timeout) + defer cancel() + } + + pipe := c.rdb.TxPipeline() + for i := range keys { + pipe.Set(ctx, keys[i], values[i], c.expiration) + } + _, err := pipe.Exec(ctx) + return err +} + +func (c *RedisClient) MGet(ctx context.Context, keys []string) ([][]byte, error) { + var cancel context.CancelFunc + if c.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, c.timeout) + defer cancel() + } + + cmd := c.rdb.MGet(ctx, keys...) + if err := cmd.Err(); err != nil { + return nil, err + } + + ret := make([][]byte, len(keys)) + for i, val := range cmd.Val() { + if val != nil { + ret[i] = StringToBytes(val.(string)) + } + } + return ret, nil +} + +func (c *RedisClient) Close() error { + return c.rdb.Close() +} + +// StringToBytes converts string to byte slice. (copied from vendor/github.com/go-redis/redis/v8/internal/util/unsafe.go) +func StringToBytes(s string) []byte { + return *(*[]byte)(unsafe.Pointer( + &struct { + string + Cap int + }{s, len(s)}, + )) +}