diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index f2c6c625ce..2e80433a6a 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -1232,7 +1232,7 @@ public RedisFuture hscan(KeyValueStreamingChannel channe @Override public RedisFuture hscanNovalues(KeyStreamingChannel channel, K key, ScanCursor scanCursor, - ScanArgs scanArgs) { + ScanArgs scanArgs) { return dispatch(commandBuilder.hscanNoValuesStreaming(channel, key, scanCursor, scanArgs)); } @@ -2001,6 +2001,11 @@ public RedisFuture> spop(K key, long count) { return dispatch(commandBuilder.spop(key, count)); } + @Override + public RedisFuture spublish(K shardChannel, V message) { + return dispatch(commandBuilder.spublish(shardChannel, message)); + } + @Override public RedisFuture srandmember(K key) { return dispatch(commandBuilder.srandmember(key)); diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index 47b473495e..734076fe36 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -2080,6 +2080,11 @@ public Flux spop(K key, long count) { return createDissolvingFlux(() -> commandBuilder.spop(key, count)); } + @Override + public Mono spublish(K shardChannel, V message) { + return createMono(() -> commandBuilder.spublish(shardChannel, message)); + } + @Override public Mono srandmember(K key) { return createMono(() -> commandBuilder.srandmember(key)); diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index 3d3d2be0be..7880540cf7 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -2703,6 +2703,13 @@ Command> spop(K key, long count) { return createCommand(SPOP, new ValueSetOutput<>(codec), args); } + Command spublish(K shardChannel, V message) { + LettuceAssert.notNull(shardChannel, "ShardChannel " + MUST_NOT_BE_NULL); + + CommandArgs args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message); + return createCommand(SPUBLISH, new IntegerOutput<>(codec), args); + } + Command srandmember(K key) { notNullKey(key); diff --git a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java index c4eb965a80..76aac4f8ea 100644 --- a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java @@ -102,6 +102,16 @@ public interface BaseRedisAsyncCommands { */ RedisFuture pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + RedisFuture spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java index 051340ea99..24d9989efa 100644 --- a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java @@ -102,6 +102,16 @@ public interface BaseRedisReactiveCommands { */ Mono pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Mono spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java index b0ebe918be..ce942334ed 100644 --- a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java @@ -101,6 +101,16 @@ public interface BaseRedisCommands { */ Long pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Long spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java index ce5abb67e0..747018ca60 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java @@ -167,6 +167,11 @@ public void punsubscribed(K pattern, long count) { notifications.punsubscribed(getNode(), pattern, count); } + @Override + public void smessage(K shardChannel, V message) { + notifications.smessage(getNode(), shardChannel, message); + } + @Override public void ssubscribed(K channel, long count) { notifications.ssubscribed(getNode(), channel, count); diff --git a/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java b/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java index 5679e04c8c..9288dd8f85 100644 --- a/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java +++ b/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java @@ -88,6 +88,9 @@ protected void notifyListeners(PubSubMessage output) { case unsubscribe: multicast.unsubscribed(clusterNode, output.channel(), output.count()); break; + case smessage: + multicast.smessage(clusterNode, output.channel(), output.body()); + break; case ssubscribe: multicast.ssubscribed(clusterNode, output.channel(), output.count()); break; @@ -192,6 +195,12 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) { clusterListeners.forEach(listener -> listener.punsubscribed(node, pattern, count)); } + @Override + public void smessage(RedisClusterNode node, K shardChannel, V message) { + getListeners().forEach(listener -> listener.smessage(shardChannel, message)); + clusterListeners.forEach(listener -> listener.smessage(node, shardChannel, message)); + } + @Override public void ssubscribed(RedisClusterNode node, K channel, long count) { getListeners().forEach(listener -> listener.ssubscribed(channel, count)); diff --git a/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java index 059ddfba48..dbdc2744f4 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java @@ -102,6 +102,16 @@ public interface BaseNodeSelectionAsyncCommands { */ AsyncExecutions pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + AsyncExecutions spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java index e9072268d4..a1b8922c8d 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java @@ -97,6 +97,16 @@ public interface BaseNodeSelectionCommands { */ Executions pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Executions spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java index c38e672ee3..554efa3009 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java @@ -42,6 +42,11 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) { // empty adapter method } + @Override + public void smessage(RedisClusterNode node, K shardChannel, V message) { + // empty adapter method + } + @Override public void ssubscribed(RedisClusterNode node, K channel, long count) { // empty adapter method diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java index 650dc233ca..482453df7c 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java @@ -67,6 +67,18 @@ public interface RedisClusterPubSubListener { */ void punsubscribed(RedisClusterNode node, K pattern, long count); + /** + * Message received from a shard channel subscription. + * + * @param node the {@link RedisClusterNode} from which the {@code message} originates. + * @param shardChannel shard channel. + * @param message Message. + * @since 7.0 + */ + default void smessage(RedisClusterNode node, K shardChannel, V message){ + message(node, shardChannel, message); + } + /** * Subscribed to a shard channel. * diff --git a/src/main/java/io/lettuce/core/protocol/CommandType.java b/src/main/java/io/lettuce/core/protocol/CommandType.java index 7a9d87524d..b92eaf2519 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandType.java +++ b/src/main/java/io/lettuce/core/protocol/CommandType.java @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword { // Pub/Sub - PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, + PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, SPUBLISH, // Sets diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java index c733435cbc..8c486770ec 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java @@ -98,6 +98,19 @@ final Command punsubscribe(K... patterns) { return pubSubCommand(PUNSUBSCRIBE, new PubSubOutput<>(codec), patterns); } + Command spublish(K shardChannel, V message) { + CommandArgs args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message); + return createCommand(SPUBLISH, new IntegerOutput<>(codec), args); + } + + @SafeVarargs + final Command ssubscribe(K... shardChannels) { + LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY); + + CommandArgs args = new CommandArgs<>(codec).addKeys(shardChannels); + return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args); + } + @SafeVarargs final Command subscribe(K... channels) { LettuceAssert.notEmpty(channels, "Channels " + MUST_NOT_BE_EMPTY); @@ -110,14 +123,6 @@ final Command unsubscribe(K... channels) { return pubSubCommand(UNSUBSCRIBE, new PubSubOutput<>(codec), channels); } - @SafeVarargs - final Command ssubscribe(K... shardChannels) { - LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY); - - CommandArgs args = new CommandArgs<>(codec).addKeys(shardChannels); - return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args); - } - @SafeVarargs final Command pubSubCommand(CommandType type, CommandOutput output, K... keys) { return new Command<>(type, output, new PubSubCommandArgs<>(codec).addKeys(keys)); diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java b/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java index febb31f2c0..6b6b267fe2 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java @@ -263,6 +263,9 @@ protected void notifyListeners(PubSubMessage message) { case unsubscribe: listener.unsubscribed(message.channel(), message.count()); break; + case smessage: + listener.smessage(message.channel(), message.body()); + break; case ssubscribe: listener.ssubscribed(message.channel(), message.count()); break; diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java b/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java index 1bcb61b7a2..18c19ccddb 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java @@ -38,7 +38,7 @@ public class PubSubOutput extends CommandOutput implements PubSub public enum Type { - message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe; + message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe, smessage; private final static Set names = new HashSet<>(); @@ -108,6 +108,7 @@ private void handleOutput(ByteBuffer bytes) { pattern = codec.decodeKey(bytes); break; } + case smessage: case message: if (channel == null) { channel = codec.decodeKey(bytes); diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java index ef16fed6fb..828a85743f 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java @@ -59,6 +59,11 @@ public void punsubscribed(K pattern, long count) { // empty adapter method } + @Override + public void smessage(K shardChannel, V message) { + // empty adapter method + } + @Override public void ssubscribed(K shardChannel, long count) { // empty adapter method diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java index f69f24fbba..20515e60fd 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java @@ -100,6 +100,11 @@ public RedisFuture> pubsubShardNumsub(K... shardChannels) { return dispatch(commandBuilder.pubsubShardNumsub(shardChannels)); } + @Override + public RedisFuture spublish(K shardChannel, V message) { + return dispatch(commandBuilder.spublish(shardChannel, message)); + } + @Override @SuppressWarnings("unchecked") public RedisFuture ssubscribe(K... channels) { diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java index 71b5a6ad13..ccc1f0e1e1 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java @@ -88,4 +88,15 @@ default void ssubscribed(K shardChannel, long count) { subscribed(shardChannel, count); } + /** + * Message received from a shard channel subscription. + * + * @param shardChannel shard channel. + * @param message Message. + * @since 7.0 + */ + default void smessage(K shardChannel, V message) { + message(shardChannel, message); + } + } diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java index 218fbe9c77..adff1b58fa 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java @@ -159,6 +159,11 @@ public Mono> pubsubShardNumsub(K... shardChannels) { return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels)); } + @Override + public Mono spublish(K shardChannel, V message) { + return createMono(() -> commandBuilder.publish(shardChannel, message)); + } + @Override public Mono ssubscribe(K... shardChannels) { return createFlux(() -> commandBuilder.ssubscribe(shardChannels)).then(); diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt index 37ba6c789f..df829da7b5 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt @@ -102,6 +102,16 @@ interface BaseRedisCoroutinesCommands { */ suspend fun pubsubNumpat(): Long + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + suspend fun spublish(shardChannel: K, message: V): Long? + /** * Echo the given string. * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt index 910fae9317..6351169241 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt @@ -60,6 +60,8 @@ internal class BaseRedisCoroutinesCommandsImpl(internal val op override suspend fun pubsubNumpat(): Long = ops.pubsubNumpat().awaitSingle() + override suspend fun spublish(shardChannel: K, message: V): Long? = ops.spublish(shardChannel, message).awaitFirstOrNull() + override suspend fun echo(msg: V): V = ops.echo(msg).awaitSingle() override suspend fun role(): List = ops.role().asFlow().toList() diff --git a/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java b/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java index 40d5165f16..27c5723f0f 100644 --- a/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java +++ b/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java @@ -102,6 +102,16 @@ public interface BaseRedisCommands { */ Long pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Long spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index c4a808fad9..772af41dab 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -17,11 +17,13 @@ import io.lettuce.core.RedisURI; import io.lettuce.core.TestSupport; import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection; +import io.lettuce.core.cluster.pubsub.api.async.RedisClusterPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.reactive.NodeSelectionPubSubReactiveCommands; import io.lettuce.core.cluster.pubsub.api.reactive.PubSubReactiveNodeSelection; import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands; @@ -45,6 +47,8 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport { private final RedisClusterClient clusterClient; + private final RedisClusterClient clusterClientWithNoRedirects; + private final PubSubTestListener connectionListener = new PubSubTestListener(); private final PubSubTestListener nodeListener = new PubSubTestListener(); @@ -57,9 +61,16 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport { String shardChannel = "shard-channel"; + String shardMessage = "shard msg!"; + + String shardTestChannel = "shard-test-channel"; + @Inject - RedisClusterPubSubConnectionIntegrationTests(RedisClusterClient clusterClient) { + RedisClusterPubSubConnectionIntegrationTests(RedisClusterClient clusterClient, RedisClusterClient clusterClient2) { this.clusterClient = clusterClient; + ClusterClientOptions.Builder builder = ClusterClientOptions.builder().maxRedirects(0); + clusterClient2.setOptions(builder.build()); + this.clusterClientWithNoRedirects = clusterClient2; } @BeforeEach @@ -94,15 +105,16 @@ void testRegularClientPubSubChannels() { } @Test + @EnabledOnCommand("SSUBSCRIBE") void testRegularClientPubSubShardChannels() { pubSubConnection.sync().ssubscribe(shardChannel); Integer clusterKeyslot = connection.sync().clusterKeyslot(shardChannel).intValue(); - RedisCommands rightSlot = - connection.sync().nodes(node -> node.getSlots().contains(clusterKeyslot)).commands(0); - RedisCommands wrongSlot = - connection.sync().nodes(node -> !node.getSlots().contains(clusterKeyslot)).commands(0); + RedisCommands rightSlot = connection.sync().nodes(node -> node.getSlots().contains(clusterKeyslot)) + .commands(0); + RedisCommands wrongSlot = connection.sync().nodes(node -> !node.getSlots().contains(clusterKeyslot)) + .commands(0); List channelsOnSubscribedNode = rightSlot.pubsubShardChannels(); assertThat(channelsOnSubscribedNode).hasSize(1); @@ -113,7 +125,7 @@ void testRegularClientPubSubShardChannels() { @Test @EnabledOnCommand("SSUBSCRIBE") - void subscribeToShardChannel(){ + void subscribeToShardChannel() { pubSubConnection.sync().ssubscribe(shardChannel); Wait.untilEquals(1L, connectionListener.getShardCounts()::poll).waitOrTimeout(); @@ -126,13 +138,79 @@ void subscribeToShardChannelViaReplica() { int clusterKeyslot = connection.sync().clusterKeyslot(shardChannel).intValue(); String thisNode = connection.getPartitions().getPartitionBySlot(clusterKeyslot).getNodeId(); - RedisPubSubAsyncCommands replica = - pubSubConnection.async().nodes(node -> thisNode.equals(node.getSlaveOf())).commands(0); + RedisPubSubAsyncCommands replica = pubSubConnection.async() + .nodes(node -> thisNode.equals(node.getSlaveOf())).commands(0); replica.ssubscribe(shardChannel); Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); } + @Test + @EnabledOnCommand("SSUBSCRIBE") + void publishToShardChannel() throws Exception { + pubSubConnection.addListener(connectionListener); + pubSubConnection.async().ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + + pubSubConnection.async().spublish(shardChannel, shardMessage); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + } + + @Test + @EnabledOnCommand("SSUBSCRIBE") + void publishToShardChannelViaDifferentEndpoints() throws Exception { + pubSubConnection.addListener(connectionListener); + pubSubConnection.async().ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + + pubSubConnection.async().ssubscribe(shardTestChannel); + Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + + pubSubConnection.async().spublish(shardChannel, shardMessage); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + + pubSubConnection.async().spublish(shardTestChannel, shardMessage); + Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + } + + @Test + @EnabledOnCommand("SSUBSCRIBE") + void publishToShardChannelViaNewClient() throws Exception { + pubSubConnection.addListener(connectionListener); + pubSubConnection.async().ssubscribe(shardChannel); + + StatefulRedisClusterPubSubConnection newPubsub = clusterClientWithNoRedirects.connectPubSub(); + newPubsub.async().spublish(shardChannel, shardMessage); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + newPubsub.close(); + } + + @Test + @EnabledOnCommand("SSUBSCRIBE") + void publishToShardChannelViaNewClientWithNoRedirects() throws Exception { + pubSubConnection.addListener(connectionListener); + pubSubConnection.async().ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + + pubSubConnection.async().ssubscribe(shardTestChannel); + Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + + RedisClusterPubSubAsyncCommands cmd = clusterClientWithNoRedirects.connectPubSub().async(); + + cmd.spublish(shardChannel, shardMessage); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + + cmd.spublish(shardTestChannel, shardMessage); + Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + cmd.getStatefulConnection().close(); + } + @Test void myIdWorksAfterDisconnect() throws InterruptedException { diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java index 5fe358def3..c6fd838050 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java @@ -32,6 +32,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.assertj.core.util.Arrays; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -53,6 +54,7 @@ import io.lettuce.test.Wait; import io.lettuce.test.WithPassword; import io.lettuce.test.condition.EnabledOnCommand; +import io.lettuce.test.resource.DefaultRedisClient; import io.lettuce.test.resource.FastShutdown; import io.lettuce.test.resource.TestClientResources; @@ -74,6 +76,8 @@ class PubSubCommandTest extends AbstractRedisClientTest { BlockingQueue channels = listener.getChannels(); + BlockingQueue shardChannels = listener.getShardChannels(); + BlockingQueue patterns = listener.getPatterns(); BlockingQueue messages = listener.getMessages(); @@ -88,6 +92,8 @@ class PubSubCommandTest extends AbstractRedisClientTest { String message = "msg!"; + String shardMessage = "shard msg!"; + @BeforeEach void openPubSubConnection() { try { @@ -221,6 +227,29 @@ void message() throws Exception { assertThat(messages.take()).isEqualTo(message); } + @Test + @EnabledOnCommand("SSUBSCRIBE") + void messageToShardChannel() throws Exception { + pubsub.ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); + + redis.spublish(shardChannel, shardMessage); + Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, messages::poll).waitOrTimeout(); + } + + @Test + @EnabledOnCommand("SSUBSCRIBE") + void messageToShardChannelViaNewClient() throws Exception { + pubsub.ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); + + RedisPubSubAsyncCommands redis = DefaultRedisClient.get().connectPubSub().async(); + redis.spublish(shardChannel, shardMessage); + Wait.untilEquals(shardMessage, messages::poll).waitOrTimeout(); + Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); + } + @Test @EnabledOnCommand("ACL") void messageAsPushMessage() throws Exception { @@ -362,40 +391,35 @@ void pubsubNumsub() { @Test @EnabledOnCommand("SPUBLISH") void pubsubShardChannels() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // TestFutures.awaitOrTimeout(pubsub.ssubscribe(channel)); + TestFutures.awaitOrTimeout(pubsub.ssubscribe(shardChannel)); List result = redis.pubsubShardChannels(); - // assertThat(result).contains(channel); + assertThat(result).contains(shardChannel); } @Test @EnabledOnCommand("SPUBLISH") void pubsubMultipleShardChannels() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // TestFutures.awaitOrTimeout(pubsub.ssubscribe(channel, "channel1", "channel3")); + TestFutures.awaitOrTimeout(pubsub.ssubscribe(shardChannel, "channel1", "channel3")); List result = redis.pubsubShardChannels(); - // assertThat(result).contains(channel, "channel1", "channel3"); + assertThat(result).contains(shardChannel, "channel1", "channel3"); } @Test @EnabledOnCommand("SPUBLISH") void pubsubShardChannelsWithArg() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // TestFutures.awaitOrTimeout(pubsub.ssubscribe(channel)); - List result = redis.pubsubShardChannels(pattern); - // assertThat(result).contains(channel); + TestFutures.awaitOrTimeout(pubsub.ssubscribe(shardChannel)); + List result = redis.pubsubShardChannels(shardChannel); + assertThat(result).contains(shardChannel); } @Test @EnabledOnCommand("SPUBLISH") void pubsubShardNumsub() { - // TODO After we have SSUBSCRIBE implement a step to subscribe to a shard channel - // Depends on https://github.com/lettuce-io/lettuce-core/issues/2758 + TestFutures.awaitOrTimeout(pubsub.ssubscribe(shardChannel)); Map result = redis.pubsubShardNumsub(shardChannel); - assertThat(result.getOrDefault(shardChannel, 0L)).isEqualTo(0); - // TODO verify that the channel from step 1 is the one returned by the command + assertThat(result.keySet()).contains(shardChannel); } @Test diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java index 86a96a2562..2c93374fa9 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java @@ -56,17 +56,25 @@ class PubSubReactiveTest extends AbstractRedisClientTest implements RedisPubSubListener { private RedisPubSubReactiveCommands pubsub; + private RedisPubSubReactiveCommands pubsub2; private BlockingQueue channels; + private BlockingQueue shardChannels; + private BlockingQueue patterns; + private BlockingQueue messages; + private BlockingQueue counts; private String shardChannel = "shard-channel"; + private String channel = "channel0"; + private String pattern = "channel*"; + private String message = "msg!"; @BeforeEach @@ -262,45 +270,40 @@ void pubsubNumsub() { @Test @EnabledOnCommand("SPUBLISH") void pubsubShardChannels() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // block(pubsub.ssubscribe(channel)); + block(pubsub.ssubscribe(shardChannel)); List result = block(pubsub2.pubsubShardChannels().collectList()); - // assertThat(result).contains(channel); + assertThat(result).contains(shardChannel); } @Test @EnabledOnCommand("SPUBLISH") void pubsubShardMultipleChannels() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // StepVerifier.create(pubsub.ssubscribe(channel, "channel1", "channel3")).verifyComplete(); + StepVerifier.create(pubsub.ssubscribe(shardChannel, "channel1", "channel3")).verifyComplete(); - // StepVerifier.create(pubsub2.pubsubShardChannels().collectList()) - // .consumeNextWith(actual -> assertThat(actual).contains(channel, "channel1", "channel3")).verifyComplete(); + StepVerifier.create(pubsub2.pubsubShardChannels().collectList()) + .consumeNextWith(actual -> assertThat(actual).contains(shardChannel, "channel1", "channel3")).verifyComplete(); } @Test @EnabledOnCommand("SPUBLISH") void pubsubShardChannelsWithArg() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // StepVerifier.create(pubsub.ssubscribe(channel)).verifyComplete(); - // Wait.untilTrue(() -> mono(pubsub2.pubsubShardChannels(pattern).filter(s -> channel.equals(s))) != null).waitOrTimeout(); + StepVerifier.create(pubsub.ssubscribe(shardChannel)).verifyComplete(); + Wait.untilTrue(() -> mono(pubsub2.pubsubShardChannels(shardChannel).filter(s -> shardChannel.equals(s))) != null) + .waitOrTimeout(); - String result = mono(pubsub2.pubsubShardChannels(pattern).filter(s -> channel.equals(s))); - // assertThat(result).isEqualToIgnoringCase(channel); + String result = mono(pubsub2.pubsubShardChannels(shardChannel).filter(s -> shardChannel.equals(s))); + assertThat(result).isEqualToIgnoringCase(shardChannel); } @Test @EnabledOnCommand("SPUBLISH") void pubsubShardNumsub() { - - // TODO After we have SSUBSCRIBE implement a step to subscribe to a shard channel - // Depends on https://github.com/lettuce-io/lettuce-core/issues/2758 + StepVerifier.create(pubsub.ssubscribe(shardChannel)).verifyComplete(); Wait.untilEquals(1, () -> block(pubsub2.pubsubShardNumsub(shardChannel)).size()).waitOrTimeout(); Map result = block(pubsub2.pubsubShardNumsub(shardChannel)); - assertThat(result.getOrDefault(shardChannel, 0L)).isEqualTo(0); - // TODO verify that the channel from step 1 is the one returned by the command + assertThat(result.getOrDefault(shardChannel, 0L)).isEqualTo(1); } @Test @@ -314,6 +317,7 @@ void pubsubNumpat() { Long result = block(pubsub2.pubsubNumpat()); assertThat(result.longValue()).isGreaterThan(0); } + @Test void punsubscribe() throws Exception { @@ -420,6 +424,7 @@ void adapter() throws Exception { final BlockingQueue localCounts = LettuceFactories.newBlockingQueue(); RedisPubSubAdapter adapter = new RedisPubSubAdapter() { + @Override public void subscribed(String channel, long count) { super.subscribed(channel, count); @@ -431,6 +436,7 @@ public void unsubscribed(String channel, long count) { super.unsubscribed(channel, count); localCounts.add(count); } + }; pubsub.getStatefulConnection().addListener(adapter); @@ -519,4 +525,5 @@ T mono(Flux flux) { List all(Flux flux) { return flux.collectList().block(); } + } diff --git a/src/test/java/io/lettuce/core/support/PubSubTestListener.java b/src/test/java/io/lettuce/core/support/PubSubTestListener.java index d89c9a7810..1afcb42766 100644 --- a/src/test/java/io/lettuce/core/support/PubSubTestListener.java +++ b/src/test/java/io/lettuce/core/support/PubSubTestListener.java @@ -30,10 +30,15 @@ public class PubSubTestListener implements RedisPubSubListener { private BlockingQueue channels = LettuceFactories.newBlockingQueue(); + private BlockingQueue patterns = LettuceFactories.newBlockingQueue(); + private BlockingQueue messages = LettuceFactories.newBlockingQueue(); + private BlockingQueue counts = LettuceFactories.newBlockingQueue(); + private BlockingQueue shardChannels = LettuceFactories.newBlockingQueue(); + private BlockingQueue shardCounts = LettuceFactories.newBlockingQueue(); // RedisPubSubListener implementation @@ -51,6 +56,12 @@ public void message(String pattern, String channel, String message) { messages.add(message); } + @Override + public void smessage(String channel, String message) { + shardChannels.add(channel); + messages.add(message); + } + @Override public void subscribed(String channel, long count) { channels.add(channel); @@ -116,4 +127,5 @@ public void clear() { counts.clear(); shardCounts.clear(); } + }