Skip to content

Commit

Permalink
Added support for Redis Cluster and Redis Sentinel (grafana#2961)
Browse files Browse the repository at this point in the history
* Added support for Redis Cluster and Redis Sentinel (grafana#2959)

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* addressed comments

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* fixed 'make doc'

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* fixed 'make lint'

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* updated Changelog

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* updated Changelog

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* updated go.mod

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* addressed comments

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* removed deprecated flags in redis config

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* updated modules

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* updated modules

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* added warning when Redis sentinel returns unexpected master info

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* fixed 'make lint'

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* updated unit test

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* added master group name to Redis Sentinel config

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* updated config validation

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* use redis universal client

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* updated dependencies

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* remove obsolete interface

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* addressed comments

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* add Redis DB index selection

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* updated CHANGELOG

Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@sysdig.com>

* Fixed CHANGELOG

Signed-off-by: Marco Pracucci <marco@pracucci.com>

Co-authored-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
Dmitry Shmulevich and pracucci authored Sep 14, 2020
1 parent 11f5407 commit 9c55b27
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 161 deletions.
2 changes: 1 addition & 1 deletion cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger) (Cache, error
cfg.Redis.Expiration = cfg.DefaultValidity
}
cacheName := cfg.Prefix + "redis"
cache := NewRedisCache(cfg.Redis, cacheName, nil, logger)
cache := NewRedisCache(cacheName, NewRedisClient(&cfg.Redis), logger)
caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache, reg), reg))
}

Expand Down
130 changes: 12 additions & 118 deletions cache/redis_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,102 +2,37 @@ package cache

import (
"context"
"flag"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gomodule/redigo/redis"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
)

// RedisCache type caches chunks in redis
type RedisCache struct {
name string
expiration int
timeout time.Duration
pool *redis.Pool
logger log.Logger
}

// RedisConfig defines how a RedisCache should be constructed.
type RedisConfig struct {
Endpoint string `yaml:"endpoint"`
Timeout time.Duration `yaml:"timeout"`
Expiration time.Duration `yaml:"expiration"`
MaxIdleConns int `yaml:"max_idle_conns"`
MaxActiveConns int `yaml:"max_active_conns"`
Password flagext.Secret `yaml:"password"`
EnableTLS bool `yaml:"enable_tls"`
IdleTimeout time.Duration `yaml:"idle_timeout"`
WaitOnPoolExhaustion bool `yaml:"wait_on_pool_exhaustion"`
MaxConnLifetime time.Duration `yaml:"max_conn_lifetime"`
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.StringVar(&cfg.Endpoint, prefix+"redis.endpoint", "", description+"Redis service endpoint to use when caching chunks. If empty, no redis will be used.")
f.DurationVar(&cfg.Timeout, prefix+"redis.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on redis requests.")
f.DurationVar(&cfg.Expiration, prefix+"redis.expiration", 0, description+"How long keys stay in the redis.")
f.IntVar(&cfg.MaxIdleConns, prefix+"redis.max-idle-conns", 80, description+"Maximum number of idle connections in pool.")
f.IntVar(&cfg.MaxActiveConns, prefix+"redis.max-active-conns", 0, description+"Maximum number of active connections in pool.")
f.Var(&cfg.Password, prefix+"redis.password", description+"Password to use when connecting to redis.")
f.BoolVar(&cfg.EnableTLS, prefix+"redis.enable-tls", false, description+"Enables connecting to redis with TLS.")
f.DurationVar(&cfg.IdleTimeout, prefix+"redis.idle-timeout", 0, description+"Close connections after remaining idle for this duration. If the value is zero, then idle connections are not closed.")
f.BoolVar(&cfg.WaitOnPoolExhaustion, prefix+"redis.wait-on-pool-exhaustion", false, description+"Enables waiting if there are no idle connections. If the value is false and the pool is at the max_active_conns limit, the pool will return a connection with ErrPoolExhausted error and not wait for idle connections.")
f.DurationVar(&cfg.MaxConnLifetime, prefix+"redis.max-conn-lifetime", 0, description+"Close connections older than this duration. If the value is zero, then the pool does not close connections based on age.")
name string
redis *RedisClient
logger log.Logger
}

// NewRedisCache creates a new RedisCache
func NewRedisCache(cfg RedisConfig, name string, pool *redis.Pool, logger log.Logger) *RedisCache {
func NewRedisCache(name string, redisClient *RedisClient, logger log.Logger) *RedisCache {
util.WarnExperimentalUse("Redis cache")
// pool != nil only in unit tests
if pool == nil {
pool = &redis.Pool{
Dial: func() (redis.Conn, error) {
options := make([]redis.DialOption, 0, 2)
if cfg.EnableTLS {
options = append(options, redis.DialUseTLS(true))
}
if cfg.Password.Value != "" {
options = append(options, redis.DialPassword(cfg.Password.Value))
}

c, err := redis.Dial("tcp", cfg.Endpoint, options...)
if err != nil {
return nil, err
}
return c, err
},
MaxIdle: cfg.MaxIdleConns,
MaxActive: cfg.MaxActiveConns,
IdleTimeout: cfg.IdleTimeout,
Wait: cfg.WaitOnPoolExhaustion,
MaxConnLifetime: cfg.MaxConnLifetime,
}
}

cache := &RedisCache{
expiration: int(cfg.Expiration.Seconds()),
timeout: cfg.Timeout,
name: name,
pool: pool,
logger: logger,
name: name,
redis: redisClient,
logger: logger,
}

if err := cache.ping(context.Background()); err != nil {
level.Error(logger).Log("msg", "error connecting to redis", "endpoint", cfg.Endpoint, "err", err)
if err := cache.redis.Ping(context.Background()); err != nil {
level.Error(logger).Log("msg", "error connecting to redis", "name", name, "err", err)
}

return cache
}

// Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested.
func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) {
data, err := c.mget(ctx, keys)

data, err := c.redis.MGet(ctx, keys)
if err != nil {
level.Error(c.logger).Log("msg", "failed to get from redis", "name", c.name, "err", err)
missed = make([]string, len(keys))
Expand All @@ -117,54 +52,13 @@ func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string,

// Store stores the key in the cache.
func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) {
err := c.mset(ctx, keys, bufs, c.expiration)
err := c.redis.MSet(ctx, keys, bufs)
if err != nil {
level.Error(c.logger).Log("msg", "failed to put to redis", "name", c.name, "err", err)
}
}

// Stop stops the redis client.
func (c *RedisCache) Stop() {
_ = c.pool.Close()
}

// mset adds key-value pairs to the cache.
func (c *RedisCache) mset(_ context.Context, keys []string, bufs [][]byte, ttl int) error {
conn := c.pool.Get()
defer conn.Close()

if err := conn.Send("MULTI"); err != nil {
return err
}
for i := range keys {
if err := conn.Send("SETEX", keys[i], ttl, bufs[i]); err != nil {
return err
}
}
_, err := redis.DoWithTimeout(conn, c.timeout, "EXEC")
return err
}

// mget retrieves values from the cache.
func (c *RedisCache) mget(_ context.Context, keys []string) ([][]byte, error) {
intf := make([]interface{}, len(keys))
for i, key := range keys {
intf[i] = key
}

conn := c.pool.Get()
defer conn.Close()

return redis.ByteSlices(redis.DoWithTimeout(conn, c.timeout, "MGET", intf...))
}

func (c *RedisCache) ping(_ context.Context) error {
conn := c.pool.Get()
defer conn.Close()

pong, err := redis.DoWithTimeout(conn, c.timeout, "PING")
if err == nil {
_, err = redis.String(pong, err)
}
return err
_ = c.redis.Close()
}
61 changes: 19 additions & 42 deletions cache/redis_cache_test.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,20 @@
package cache_test
package cache

import (
"context"
"testing"
"time"

"github.com/alicebob/miniredis"
"github.com/go-kit/kit/log"
"github.com/gomodule/redigo/redis"
"github.com/rafaeljusto/redigomock"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/chunk/cache"
)

func TestRedisCache(t *testing.T) {
cfg := cache.RedisConfig{
Timeout: 10 * time.Millisecond,
}

conn := redigomock.NewConn()
conn.Clear()
pool := &redis.Pool{Dial: func() (redis.Conn, error) {
return conn, nil
}, MaxIdle: 10}
c, err := mockRedisCache()
require.Nil(t, err)
defer c.redis.Close()

keys := []string{"key1", "key2", "key3"}
bufs := [][]byte{[]byte("data1"), []byte("data2"), []byte("data3")}
Expand All @@ -32,29 +24,8 @@ func TestRedisCache(t *testing.T) {
nHit := len(keys)
require.Len(t, bufs, nHit)

// mock Redis Store
mockRedisStore(conn, keys, bufs)

//mock cache hit
keyIntf := make([]interface{}, nHit)
bufIntf := make([]interface{}, nHit)

for i := 0; i < nHit; i++ {
keyIntf[i] = keys[i]
bufIntf[i] = bufs[i]
}
conn.Command("MGET", keyIntf...).Expect(bufIntf)

// mock cache miss
nMiss := len(miss)
missIntf := make([]interface{}, nMiss)
for i, s := range miss {
missIntf[i] = s
}
conn.Command("MGET", missIntf...).ExpectError(nil)

// mock the cache
c := cache.NewRedisCache(cfg, "mock", pool, log.NewNopLogger())
ctx := context.Background()

c.Store(ctx, keys, bufs)
Expand All @@ -79,12 +50,18 @@ func TestRedisCache(t *testing.T) {
}
}

func mockRedisStore(conn *redigomock.Conn, keys []string, bufs [][]byte) {
conn.Command("MULTI")
ret := []interface{}{}
for i := range keys {
conn.Command("SETEX", keys[i], 0, bufs[i])
ret = append(ret, "OK")
func mockRedisCache() (*RedisCache, error) {
redisServer, err := miniredis.Run()
if err != nil {
return nil, err

}
redisClient := &RedisClient{
expiration: time.Minute,
timeout: 100 * time.Millisecond,
rdb: redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{redisServer.Addr()},
}),
}
conn.Command("EXEC").Expect(ret)
return NewRedisCache("mock", redisClient, log.NewNopLogger()), nil
}
Loading

0 comments on commit 9c55b27

Please sign in to comment.