Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SPUBLISH #2838

Merged
merged 4 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,7 @@ public RedisFuture<StreamScanCursor> hscan(KeyValueStreamingChannel<K, V> channe

@Override
public RedisFuture<StreamScanCursor> hscanNovalues(KeyStreamingChannel<K> channel, K key, ScanCursor scanCursor,
ScanArgs scanArgs) {
ScanArgs scanArgs) {
tishun marked this conversation as resolved.
Show resolved Hide resolved
return dispatch(commandBuilder.hscanNoValuesStreaming(channel, key, scanCursor, scanArgs));
}

Expand Down Expand Up @@ -2001,6 +2001,11 @@ public RedisFuture<Set<V>> spop(K key, long count) {
return dispatch(commandBuilder.spop(key, count));
}

@Override
public RedisFuture<Long> spublish(K shardChannel, V message) {
return dispatch(commandBuilder.spublish(shardChannel, message));
}

@Override
public RedisFuture<V> srandmember(K key) {
return dispatch(commandBuilder.srandmember(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2080,6 +2080,11 @@ public Flux<V> spop(K key, long count) {
return createDissolvingFlux(() -> commandBuilder.spop(key, count));
}

@Override
public Mono<Long> spublish(K shardChannel, V message) {
return createMono(() -> commandBuilder.spublish(shardChannel, message));
}

@Override
public Mono<V> srandmember(K key) {
return createMono(() -> commandBuilder.srandmember(key));
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2703,6 +2703,13 @@ Command<K, V, Set<V>> spop(K key, long count) {
return createCommand(SPOP, new ValueSetOutput<>(codec), args);
}

Command<K, V, Long> spublish(K shardChannel, V message) {
LettuceAssert.notNull(shardChannel, "ShardChannel " + MUST_NOT_BE_NULL);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message);
return createCommand(SPUBLISH, new IntegerOutput<>(codec), args);
}

Command<K, V, V> srandmember(K key) {
notNullKey(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public interface BaseRedisAsyncCommands<K, V> {
*/
RedisFuture<Long> pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
RedisFuture<Long> spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public interface BaseRedisReactiveCommands<K, V> {
*/
Mono<Long> pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Mono<Long> spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ public interface BaseRedisCommands<K, V> {
*/
Long pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Long spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public void punsubscribed(K pattern, long count) {
notifications.punsubscribed(getNode(), pattern, count);
}

@Override
public void smessage(K shardChannel, V message) {
notifications.smessage(getNode(), shardChannel, message);
}

@Override
public void ssubscribed(K channel, long count) {
notifications.ssubscribed(getNode(), channel, count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ protected void notifyListeners(PubSubMessage<K, V> output) {
case unsubscribe:
multicast.unsubscribed(clusterNode, output.channel(), output.count());
break;
case smessage:
multicast.smessage(clusterNode, output.channel(), output.body());
break;
case ssubscribe:
multicast.ssubscribed(clusterNode, output.channel(), output.count());
break;
Expand Down Expand Up @@ -192,6 +195,12 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) {
clusterListeners.forEach(listener -> listener.punsubscribed(node, pattern, count));
}

@Override
public void smessage(RedisClusterNode node, K shardChannel, V message) {
getListeners().forEach(listener -> listener.smessage(shardChannel, message));
clusterListeners.forEach(listener -> listener.smessage(node, shardChannel, message));
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
getListeners().forEach(listener -> listener.ssubscribed(channel, count));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public interface BaseNodeSelectionAsyncCommands<K, V> {
*/
AsyncExecutions<Long> pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
AsyncExecutions<Long> spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ public interface BaseNodeSelectionCommands<K, V> {
*/
Executions<Long> pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Executions<Long> spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) {
// empty adapter method
}

@Override
public void smessage(RedisClusterNode node, K shardChannel, V message) {
// empty adapter method
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
// empty adapter method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ public interface RedisClusterPubSubListener<K, V> {
*/
void punsubscribed(RedisClusterNode node, K pattern, long count);

/**
* Message received from a shard channel subscription.
*
* @param node the {@link RedisClusterNode} from which the {@code message} originates.
* @param shardChannel shard channel.
* @param message Message.
* @since 7.0
*/
default void smessage(RedisClusterNode node, K shardChannel, V message){
message(node, shardChannel, message);
}

/**
* Subscribed to a shard channel.
*
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandType.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword {

// Pub/Sub

PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE,
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, SPUBLISH,

// Sets

Expand Down
21 changes: 13 additions & 8 deletions src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ final Command<K, V, V> punsubscribe(K... patterns) {
return pubSubCommand(PUNSUBSCRIBE, new PubSubOutput<>(codec), patterns);
}

Command<K, V, Long> spublish(K shardChannel, V message) {
CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message);
return createCommand(SPUBLISH, new IntegerOutput<>(codec), args);
}

@SafeVarargs
final Command<K, V, V> ssubscribe(K... shardChannels) {
LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKeys(shardChannels);
return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args);
}

@SafeVarargs
final Command<K, V, V> subscribe(K... channels) {
LettuceAssert.notEmpty(channels, "Channels " + MUST_NOT_BE_EMPTY);
Expand All @@ -110,14 +123,6 @@ final Command<K, V, V> unsubscribe(K... channels) {
return pubSubCommand(UNSUBSCRIBE, new PubSubOutput<>(codec), channels);
}

@SafeVarargs
final Command<K, V, V> ssubscribe(K... shardChannels) {
LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKeys(shardChannels);
return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args);
}

@SafeVarargs
final <T> Command<K, V, T> pubSubCommand(CommandType type, CommandOutput<K, V, T> output, K... keys) {
return new Command<>(type, output, new PubSubCommandArgs<>(codec).addKeys(keys));
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ protected void notifyListeners(PubSubMessage<K, V> message) {
case unsubscribe:
listener.unsubscribed(message.channel(), message.count());
break;
case smessage:
listener.smessage(message.channel(), message.body());
break;
case ssubscribe:
listener.ssubscribed(message.channel(), message.count());
break;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/lettuce/core/pubsub/PubSubOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PubSubOutput<K, V> extends CommandOutput<K, V, V> implements PubSub

public enum Type {

message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe;
message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe, smessage;

private final static Set<String> names = new HashSet<>();

Expand Down Expand Up @@ -108,6 +108,7 @@ private void handleOutput(ByteBuffer bytes) {
pattern = codec.decodeKey(bytes);
break;
}
case smessage:
case message:
if (channel == null) {
channel = codec.decodeKey(bytes);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public void punsubscribed(K pattern, long count) {
// empty adapter method
}

@Override
public void smessage(K shardChannel, V message) {
// empty adapter method
}

@Override
public void ssubscribed(K shardChannel, long count) {
// empty adapter method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public RedisFuture<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return dispatch(commandBuilder.pubsubShardNumsub(shardChannels));
}

@Override
public RedisFuture<Long> spublish(K shardChannel, V message) {
return dispatch(commandBuilder.spublish(shardChannel, message));
}

@Override
@SuppressWarnings("unchecked")
public RedisFuture<Void> ssubscribe(K... channels) {
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,15 @@ default void ssubscribed(K shardChannel, long count) {
subscribed(shardChannel, count);
}

/**
* Message received from a shard channel subscription.
*
* @param shardChannel shard channel.
* @param message Message.
* @since 7.0
*/
default void smessage(K shardChannel, V message) {
message(shardChannel, message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public Mono<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels));
}

@Override
public Mono<Long> spublish(K shardChannel, V message) {
return createMono(() -> commandBuilder.publish(shardChannel, message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should point to spublish, not publish

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created a bug issue: #2971

}

@Override
public Mono<Void> ssubscribe(K... shardChannels) {
return createFlux(() -> commandBuilder.ssubscribe(shardChannels)).then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ interface BaseRedisCoroutinesCommands<K : Any, V : Any> {
*/
suspend fun pubsubNumpat(): Long

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
suspend fun spublish(shardChannel: K, message: V): Long?

/**
* Echo the given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ internal class BaseRedisCoroutinesCommandsImpl<K : Any, V : Any>(internal val op

override suspend fun pubsubNumpat(): Long = ops.pubsubNumpat().awaitSingle()

override suspend fun spublish(shardChannel: K, message: V): Long? = ops.spublish(shardChannel, message).awaitFirstOrNull()

override suspend fun echo(msg: V): V = ops.echo(msg).awaitSingle()

override suspend fun role(): List<Any> = ops.role().asFlow().toList()
Expand Down
10 changes: 10 additions & 0 deletions src/main/templates/io/lettuce/core/api/BaseRedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public interface BaseRedisCommands<K, V> {
*/
Long pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Long spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
Loading
Loading