Skip to content

Commit

Permalink
Support Server-assisted Client-side Caching (#3757)
Browse files Browse the repository at this point in the history
* Initial support for client-side caching (#3658)

* Support for client-side caching - phase 2 (#3673)

* Code re-use?

* Stop forcing to read push notifications before checking cache and remove BCAST

* Rename variable

* Remove ensureFillSafe()

* Refactor peeking and reading push notifications

* Cleanup comments

* Fix transaction failure tests using mock (#3683)

Now we have to mock Protocol#read(RedisInputStream, ClientSideCache) instead of Protocol#read(RedisInputStream).

* Support client-side caching from UnifiedJedis (#3691)

* Support client side caching from UnifiedJedis

* Support client side caching as a separate parameter

* format imports

* Support CSC in sentinel mode

* undo change

* Client-side caching by hashing command arguments (#3700)

* Support TTL in client side caching (using Caffeine library)

* Also Guava cache

* format pom.xml

* Client-side caching by command arguments

TODO: Compute hash code.

* send keys

* todo comment for clean-up

* rename method to invalidate

* Client-side caching by hashing command arguments

* Hash command arguments for CaffeineCSC using OpenHFT hashing

* Clean-up keyHashes map

* added javadoc

* rename method

* remove lock

* descriptive name

* descriptive names and fix

* common default values in base class

* Cover Redis commands for client side caching (#3702)

* Support Client-side caching through URI/URL (#3703)

* Support Client-side caching through URI/URL

* check idx of '=' sign

* nicer exception

* edit/fix condition

* rename param

* Throw IllegalArgumentException at all such cases

* Test GuavaCSC and CaffeineCSC (#3742)

* Support white-list and black-list commands and keys (#3755)

* Create csc package

* Create csc.util package

* Create a config interface for client-side caching

* Default isCacheable

* Config to WhiteList/BlackList commands and String keys

* Create csc test package(s)

* Test white-list/black-list commands and keys

* Merge fix

* Remove csc.util package

* Fix javadoc links

* Added ClientSideCacheable interface and removed ClientSideCacheConfig interface

* Format imports

* Re-create csc.util package

* Rename to allow/deny instead of white/black

* Introduce interface(s) for hashing CommandObject (#3743)

* Client-side cache related naming changes (#3758)

Changes:
1. CommandLongHashing is renamed to CommandLongHasher.
2. Expanded the names of GuavaCSC (GuavaClientSideCache) and CaffeineCSC (CaffeineClientSideCache).

* Reformat clientSideCache variable names (#3761)

* Format tabs in pom.xml

* Use Experimental annotation

* Fix client side cache tests (#3799)

Due to redis/redis#13167 

* Fix JedisClusterClientSideCacheTest

* Fix JedisSentineledClientSideCacheTest

* Remove openhft hashing from source dependency (#3800)

* Test different functionalities of client side cache (#3828)

* Test JedisURIHelper#getClientSideCache(URI) (#3835)

* Merge fix: after introducing EndpointConfig in #3836

* Tweak maximumSize test in CaffeineClientSideCacheTest

* Little more tweak maximumSize test in CaffeineClientSideCacheTest

* Fix incompatibilities with the latest RedisStack (#3855)

* Fix tests

- Skip Graph tests
- Fix JSON RESP3 test

* JSON.GET behaves identically on RESP2 and RESP3

* Revert "Fix incompatibilities with the latest RedisStack (#3855)"

This reverts commit 6b9d338.

* [TEMPORARY] [TEST] Use redis-stack-server:7.4.0-rc1 image for testing

* Support RediSearch DIALECT 5 (#3831)

- [x] Avoid escaping at query time
- [ ] Alias for tag fields (EXACT)
- [x] Avoid repeating for numeral equality
- [x] New dialect (5)

* Support FLOAT16 and BFLOAT16 VecSim storage types (#3849)

* Test: INTERSECTS and DIJOINT conditions support in GeoSearch (#3862)

* Support IGNORE and other optional arguments for timeseries commands (#3860)

* Re-implement TS.ADD command with optional arguments
* Implement TS.INCRBY and TS.DECRBY commands with optional arguments
* Support IGNORE argument for TS.[ CREATE | ALTER | ADD | INCRBY | DECRBY] commands

---

* Cover optional arguments for timeseries commands
   - Re-implement TS.ADD command with optional arguments
   - Implement TS.INCRBY and TS.DECRBY commands with optional arguments

* Introduce EncodingFormat enum for <COMPRESSED|UNCOMPRESSED>

* Support IGNORE option
   and rename to TSIncrOrDecrByParams

* Polish #3860: Separate params for TS.INCRBY and TS.DECRBY (#3863)

* Support indexing of MISSING and EMPTY values (#3866)

* Little tweak maximumSize test in CaffeineClientSideCacheTest

* Inject ClientSideCacheable via set method (#3882)

* Use CommandObject(s) as cache-key (#3875)

and remove hashing of CommandObject(s).

* #3886 merge fix

* Revert "[TEMPORARY] [TEST] Use redis-stack-server:7.4.0-rc1 image for testing"

This reverts commit 92c09f3.

* More tweak maximumSize test in CaffeineClientSideCacheTest

This reverts and modifies commit 3534996.

* Remove client side cache support through uri/url (#3892)

This partially reverts #3703 and #3835

* Bump com.google.guava:guava from 33.0.0-jre to 33.2.1-jre (#3893)

* Prepare client side caching - design 2 (#3889)

* Separate CacheConnection

* Introduce CacheKey and CacheEntry

* Little tweak maximumSize test in CaffeineClientSideCacheTest

* Remove resetting timeout; we'll PING instead

* Refactor Client-Side Caching implementation (#3900)

* adding a DataProvider to access connection from cache

* resolve keys from commandarguments

* clean up in unifiiedjedis and add csc test with ssl

* - fix readtimeout exception with sockets for consuming invalidations pending in buffer
- apply a default list of cacheable commands to DefaultClientSideCacheable
- fix failing unit tests with cacheable / non-cacheable keys
- remove formatting changes

* - add serialization for cache instances
- add unit test with UnifiedJedis
- add benchmark for CSC execution
- clean unused imports

* - added 'Cache' interface and 'DefaultCache' implementation in regard to design doc
- added 'EvictionPolicy' interface and LRU implementation
- move cache object validation and cache control stuf from 'ClientSideCache' into 'CacheConnection'
- make guava and caffeine caches experimental

* - added SSLSocketWrapper and plug it to use 'available'
- handle exceptions properly
- fix some issues with unit tests

* implementing thread safety

* - fix eviction issue and add related test
- fix consuming invalidation messages on a response read
- introduce cachestats
- fix potential issue with cacheKeysRelatedtoRedisKey cleanup
- tests for sequential access, concurrent acces and maxsize

* - renmae abstract cache class
- add test case for returning new instance of cache object

* - change order of execution in sequential acces test

* -  flush the cache on any disconnect
- replace LRU policy references with EvictionPolicy interface
-  add some constructor overloads to enable custom eviction policies on cache

* fix testcache

* fix javadoc issue

* - fix multithreaded eviction policy issue
- update guava and caffeine implementations according to abstract cache

* Jedis test plan coverage for CSC (#3918)

* initial changes

* cover tests for JedisPooled and functionality

* fix javadoc

* cover new tests for JedisCluster and JedisSentineled

* Fix CSC allow-and-deny-list and rename Cacheable interface

* Tag CommandArguments#getKeys() as Internal

* cover lruEvictionTest

* Address code reviews and more updates

* fix format and more minor changes

* format Connection

* modify WeakReference usage

* Use ExecutorService.shutdownNow() in tests (#3922)

* Use ExecutorService.shutdownNow()

* More ExecutorService.shutdownNow() and other changes

* [minor change] Avoid creating same CacheKey twice

* Support caching null values (#3939)

* caching null results

* add more assertion

* Adding CacheConfig (#3919)

* add cacheconfig

* remove empty file

* -modify constructors with cache as public
- trim guava caffeine

* remove cachetype

* - add getCache to UnifiedJedis
- add builder method to CacheConfig

* add evictionpolicy to cacheconfig

* - unifiedjedis constructor with cacheconfig
-  wrap IOException on protocol read error

* fix merge issue

---------

Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com>

* Polish "Adding CacheConfig"

Polish #3919 - address some pending change requests

- Swap contructor placements
- Fix grammar in exception message

* Adding Cache class to CacheConfig (#3942)

* adding cacheclass to cacheconfig

* - add cachefactory test

* - revert connection ctors to public
- udpate some tests with UnifiedJedis.getCache
- add ping to flaky tests

* remove unnecessary anonymous types

* change ctor access modifiers

* fix test name

* make cachefactory methods static

* removing pings  due to still flaky with inv messages

* - drop CustomCache in tests and use TestCache
- check null cacheable issue with defaultcache
- support both ctors in custom cache classes regarding to value of cacheconfig.cacheable

* remove unncessary maxsize

* - remove inline anonymious

* Server version check for CSC activation  (#3954)

* checking server version for CSC

* fix format change

* fix noauth hello exception in integration tests

* fix version check

* remove redundant check

* remove unused imports

* 'toString' for Version

* rename to RedisVersion

* moving RedisVersion package

---------

Co-authored-by: Igor Malinovskiy <u.glide@gmail.com>
Co-authored-by: atakavci <58048133+atakavci@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 27, 2024
1 parent d22df4b commit 5442642
Show file tree
Hide file tree
Showing 59 changed files with 3,491 additions and 170 deletions.
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
<version>2.11.0</version>
</dependency>

<!-- Optional dependencies -->

<!-- UNIX socket connection support -->
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
Expand All @@ -90,6 +92,7 @@
<version>1.20.0</version>
<scope>test</scope>
</dependency>

<!-- test -->
<dependency>
<groupId>junit</groupId>
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/redis/clients/jedis/CommandArguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.annots.Internal;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.args.RawableFactory;
import redis.clients.jedis.commands.ProtocolCommand;
Expand All @@ -17,6 +20,8 @@ public class CommandArguments implements Iterable<Rawable> {
private CommandKeyArgumentPreProcessor keyPreProc = null;
private final ArrayList<Rawable> args;

private List<Object> keys;

private boolean blocking;

private CommandArguments() {
Expand All @@ -26,6 +31,8 @@ private CommandArguments() {
public CommandArguments(ProtocolCommand command) {
args = new ArrayList<>();
args.add(command);

keys = Collections.emptyList();
}

public ProtocolCommand getCommand() {
Expand Down Expand Up @@ -127,9 +134,24 @@ public CommandArguments key(Object key) {
throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
}

addKeyInKeys(key);

return this;
}

private void addKeyInKeys(Object key) {
if (keys.isEmpty()) {
keys = Collections.singletonList(key);
} else if (keys.size() == 1) {
List oldKeys = keys;
keys = new ArrayList();
keys.addAll(oldKeys);
keys.add(key);
} else {
keys.add(key);
}
}

public final CommandArguments keys(Object... keys) {
Arrays.stream(keys).forEach(this::key);
return this;
Expand Down Expand Up @@ -178,6 +200,11 @@ public Iterator<Rawable> iterator() {
return args.iterator();
}

@Internal
public List<Object> getKeys() {
return keys;
}

public boolean isBlocking() {
return blocking;
}
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObject.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package redis.clients.jedis;

import java.util.Iterator;
import redis.clients.jedis.args.Rawable;

public class CommandObject<T> {

private final CommandArguments arguments;
Expand All @@ -17,4 +20,39 @@ public CommandArguments getArguments() {
public Builder<T> getBuilder() {
return builder;
}

@Override
public int hashCode() {
int hashCode = 1;
for (Rawable e : arguments) {
hashCode = 31 * hashCode + e.hashCode();
}
hashCode = 31 * hashCode + builder.hashCode();
return hashCode;
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof CommandObject)) {
return false;
}

Iterator<Rawable> e1 = arguments.iterator();
Iterator<Rawable> e2 = ((CommandObject) o).arguments.iterator();
while (e1.hasNext() && e2.hasNext()) {
Rawable o1 = e1.next();
Rawable o2 = e2.next();
if (!(o1 == null ? o2 == null : o1.equals(o2))) {
return false;
}
}
if (e1.hasNext() || e2.hasNext()) {
return false;
}

return builder == ((CommandObject) o).builder;
}
}
126 changes: 80 additions & 46 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import redis.clients.jedis.Protocol.Command;
Expand All @@ -31,7 +32,7 @@
public class Connection implements Closeable {

private ConnectionPool memberOf;
private RedisProtocol protocol;
protected RedisProtocol protocol;
private final JedisSocketFactory socketFactory;
private Socket socket;
private RedisOutputStream outputStream;
Expand All @@ -41,6 +42,8 @@ public class Connection implements Closeable {
private boolean broken = false;
private boolean strValActive;
private String strVal;
protected String server;
protected String version;

public Connection() {
this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT);
Expand All @@ -55,9 +58,7 @@ public Connection(final HostAndPort hostAndPort) {
}

public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig));
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeFromClientConfig(clientConfig);
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig), clientConfig);
}

public Connection(final JedisSocketFactory socketFactory) {
Expand Down Expand Up @@ -373,16 +374,40 @@ protected void flush() {
}
}

@Experimental
protected Object protocolRead(RedisInputStream is) {
return Protocol.read(is);
}

@Experimental
protected void protocolReadPushes(RedisInputStream is) {
}

protected Object readProtocolWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read from a broken connection.");
}

try {
return Protocol.read(inputStream);
// Object read = Protocol.read(inputStream);
// System.out.println(redis.clients.jedis.util.SafeEncoder.encodeObject(read));
// return read;
return protocolRead(inputStream);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}

protected void readPushesWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read from a broken connection.");
}

try {
if (inputStream.available() > 0) {
protocolReadPushes(inputStream);
}
} catch (IOException e) {
broken = true;
throw new JedisConnectionException("Failed to check buffer on connection.", e);
} catch (JedisConnectionException exc) {
setBroken();
throw exc;
Expand All @@ -404,6 +429,7 @@ public List<Object> getMany(final int count) {

/**
* Check if the client name libname, libver, characters are legal
*
* @param info the name
* @return Returns true if legal, false throws exception
* @throws JedisException if characters illegal
Expand All @@ -419,7 +445,7 @@ private static boolean validateClientInfo(String info) {
return true;
}

private void initializeFromClientConfig(final JedisClientConfig config) {
protected void initializeFromClientConfig(final JedisClientConfig config) {
try {
connect();

Expand All @@ -430,12 +456,12 @@ private void initializeFromClientConfig(final JedisClientConfig config) {
final RedisCredentialsProvider redisCredentialsProvider = (RedisCredentialsProvider) credentialsProvider;
try {
redisCredentialsProvider.prepare();
helloOrAuth(protocol, redisCredentialsProvider.get());
helloAndAuth(protocol, redisCredentialsProvider.get());
} finally {
redisCredentialsProvider.cleanUp();
}
} else {
helloOrAuth(protocol, credentialsProvider != null ? credentialsProvider.get()
helloAndAuth(protocol, credentialsProvider != null ? credentialsProvider.get()
: new DefaultRedisCredentials(config.getUser(), config.getPassword()));
}

Expand All @@ -447,7 +473,9 @@ private void initializeFromClientConfig(final JedisClientConfig config) {
}

ClientSetInfoConfig setInfoConfig = config.getClientSetInfoConfig();
if (setInfoConfig == null) setInfoConfig = ClientSetInfoConfig.DEFAULT;
if (setInfoConfig == null) {
setInfoConfig = ClientSetInfoConfig.DEFAULT;
}

if (!setInfoConfig.isDisabled()) {
String libName = JedisMetaInfo.getArtifactId();
Expand Down Expand Up @@ -492,50 +520,56 @@ private void initializeFromClientConfig(final JedisClientConfig config) {
}
}

private void helloOrAuth(final RedisProtocol protocol, final RedisCredentials credentials) {

if (credentials == null || credentials.getPassword() == null) {
if (protocol != null) {
sendCommand(Command.HELLO, encode(protocol.version()));
getOne();
private void helloAndAuth(final RedisProtocol protocol, final RedisCredentials credentials) {
Map<String, Object> helloResult = null;
if (protocol != null && credentials != null && credentials.getUser() != null) {
byte[] rawPass = encodeToBytes(credentials.getPassword());
try {
helloResult = hello(encode(protocol.version()), Keyword.AUTH.getRaw(), encode(credentials.getUser()), rawPass);
} finally {
Arrays.fill(rawPass, (byte) 0); // clear sensitive data
}
return;
} else {
auth(credentials);
helloResult = protocol == null ? null : hello(encode(protocol.version()));
}
if (helloResult != null) {
server = (String) helloResult.get("server");
version = (String) helloResult.get("version");
}

// Source: https://stackoverflow.com/a/9670279/4021802
ByteBuffer passBuf = Protocol.CHARSET.encode(CharBuffer.wrap(credentials.getPassword()));
byte[] rawPass = Arrays.copyOfRange(passBuf.array(), passBuf.position(), passBuf.limit());
Arrays.fill(passBuf.array(), (byte) 0); // clear sensitive data
// clearing 'char[] credentials.getPassword()' should be
// handled in RedisCredentialsProvider.cleanUp()
}

private void auth(RedisCredentials credentials) {
if (credentials == null || credentials.getPassword() == null) {
return;
}
byte[] rawPass = encodeToBytes(credentials.getPassword());
try {
/// actual HELLO or AUTH -->
if (protocol != null) {
if (credentials.getUser() != null) {
sendCommand(Command.HELLO, encode(protocol.version()),
Keyword.AUTH.getRaw(), encode(credentials.getUser()), rawPass);
getOne(); // Map
} else {
sendCommand(Command.AUTH, rawPass);
getStatusCodeReply(); // OK
sendCommand(Command.HELLO, encode(protocol.version()));
getOne(); // Map
}
} else { // protocol == null
if (credentials.getUser() != null) {
sendCommand(Command.AUTH, encode(credentials.getUser()), rawPass);
} else {
sendCommand(Command.AUTH, rawPass);
}
getStatusCodeReply(); // OK
if (credentials.getUser() == null) {
sendCommand(Command.AUTH, rawPass);
} else {
sendCommand(Command.AUTH, encode(credentials.getUser()), rawPass);
}
/// <-- actual HELLO or AUTH
} finally {

Arrays.fill(rawPass, (byte) 0); // clear sensitive data
}
getStatusCodeReply();
}

// clearing 'char[] credentials.getPassword()' should be
// handled in RedisCredentialsProvider.cleanUp()
protected Map<String, Object> hello(byte[]... args) {
sendCommand(Command.HELLO, args);
return BuilderFactory.ENCODED_OBJECT_MAP.build(getOne());
}

protected byte[] encodeToBytes(char[] chars) {
// Source: https://stackoverflow.com/a/9670279/4021802
ByteBuffer passBuf = Protocol.CHARSET.encode(CharBuffer.wrap(chars));
byte[] rawPass = Arrays.copyOfRange(passBuf.array(), passBuf.position(), passBuf.limit());
Arrays.fill(passBuf.array(), (byte) 0); // clear sensitive data
return rawPass;
}

public String select(final int index) {
Expand Down
Loading

0 comments on commit 5442642

Please sign in to comment.