Skip to content

Commit

Permalink
Refine command outputs to capture whether a segment has been received…
Browse files Browse the repository at this point in the history
… instead of relying on the deserialized value state #2498
  • Loading branch information
mp911de committed Sep 1, 2023
1 parent b48f75a commit 91b3b1f
Show file tree
Hide file tree
Showing 21 changed files with 152 additions and 43 deletions.
12 changes: 11 additions & 1 deletion src/main/java/io/lettuce/core/output/ClaimedMessagesOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ public class ClaimedMessagesOutput<K, V> extends CommandOutput<K, V, ClaimedMess

private String id;

private boolean hasId;

private K key;

private boolean hasKey;

private Map<K, V> body;

private boolean bodyReceived;
Expand Down Expand Up @@ -75,8 +79,9 @@ public void set(ByteBuffer bytes) {
return;
}

if (key == null) {
if (!hasKey) {
bodyReceived = true;
hasKey = true;

if (bytes == null) {
return;
Expand All @@ -92,6 +97,7 @@ public void set(ByteBuffer bytes) {

body.put(key, bytes == null ? null : codec.decodeValue(bytes));
key = null;
hasKey = false;
}

@Override
Expand All @@ -101,15 +107,19 @@ public void complete(int depth) {
messages.add(new StreamMessage<>(stream, id, body == null ? Collections.emptyMap() : body));
bodyReceived = false;
key = null;
hasKey = false;
body = null;
id = null;
hasId = false;
}

if (depth == 2 && justId) {
messages.add(new StreamMessage<>(stream, id, null));
key = null;
hasKey = false;
body = null;
id = null;
hasId = false;
}

if (depth == 0) {
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/io/lettuce/core/output/GenericMapOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*/
public class GenericMapOutput<K, V> extends CommandOutput<K, V, Map<K, Object>> {

boolean hasKey;
private K key;

public GenericMapOutput(RedisCodec<K, V> codec) {
Expand All @@ -41,14 +42,16 @@ public GenericMapOutput(RedisCodec<K, V> codec) {
@Override
public void set(ByteBuffer bytes) {

if (key == null) {
if (!hasKey) {
key = (bytes == null) ? null : codec.decodeKey(bytes);
hasKey = true;
return;
}

Object value = (bytes == null) ? null : codec.decodeValue(bytes);
output.put(key, value);
key = null;
hasKey = false;
}

@Override
Expand All @@ -60,27 +63,31 @@ public void setBigNumber(ByteBuffer bytes) {
@SuppressWarnings("unchecked")
public void set(long integer) {

if (key == null) {
if (!hasKey) {
key = (K) Long.valueOf(integer);
hasKey = true;
return;
}

V value = (V) Long.valueOf(integer);
output.put(key, value);
key = null;
hasKey = false;
}

@Override
public void set(double number) {

if (key == null) {
if (!hasKey) {
key = (K) Double.valueOf(number);
hasKey = true;
return;
}

Object value = Double.valueOf(number);
output.put(key, value);
key = null;
hasKey = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.lettuce.core.output;

import static java.lang.Double.parseDouble;
import static java.lang.Double.*;

import java.nio.ByteBuffer;
import java.util.Collections;
Expand All @@ -35,6 +35,8 @@ public class GeoCoordinatesListOutput<K, V> extends CommandOutput<K, V, List<Geo

private Double x;

boolean hasX;

private boolean initialized;

private Subscriber<GeoCoordinates> subscriber;
Expand All @@ -59,13 +61,15 @@ public void set(ByteBuffer bytes) {
@Override
public void set(double number) {

if (x == null) {
if (!hasX) {
x = number;
hasX = true;
return;
}

subscriber.onNext(output, new GeoCoordinates(x, number));
x = null;
hasX = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.lettuce.core.output;

import static java.lang.Double.parseDouble;
import static java.lang.Double.*;

import java.nio.ByteBuffer;
import java.util.Collections;
Expand All @@ -34,6 +34,8 @@
public class GeoCoordinatesValueListOutput<K, V> extends CommandOutput<K, V, List<Value<GeoCoordinates>>>
implements StreamingOutput<Value<GeoCoordinates>> {

boolean hasX;

private Double x;

private boolean initialized;
Expand All @@ -60,13 +62,15 @@ public void set(ByteBuffer bytes) {
@Override
public void set(double number) {

if (x == null) {
if (!hasX) {
x = number;
hasX = true;
return;
}

subscriber.onNext(output, Value.fromNullable(new GeoCoordinates(x, number)));
x = null;
hasX = false;
}

@Override
Expand Down
29 changes: 21 additions & 8 deletions src/main/java/io/lettuce/core/output/GeoWithinListOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.lettuce.core.output;

import static java.lang.Double.parseDouble;
import static java.lang.Double.*;

import java.nio.ByteBuffer;
import java.util.List;
Expand All @@ -35,19 +35,25 @@ public class GeoWithinListOutput<K, V> extends CommandOutput<K, V, List<GeoWithi

private V member;

private boolean hasMember;

private Double distance;

private boolean hasDistance;

private Long geohash;

private GeoCoordinates coordinates;

private Double x;

private boolean withDistance;
private boolean hasX;

private final boolean withDistance;

private boolean withHash;
private final boolean withHash;

private boolean withCoordinates;
private final boolean withCoordinates;

private Subscriber<GeoWithin<V>> subscriber;

Expand All @@ -62,8 +68,9 @@ public GeoWithinListOutput(RedisCodec<K, V> codec, boolean withDistance, boolean
@Override
public void set(long integer) {

if (member == null) {
if (!hasMember) {
member = (V) (Long) integer;
hasMember = true;
return;
}

Expand All @@ -75,8 +82,9 @@ public void set(long integer) {
@Override
public void set(ByteBuffer bytes) {

if (member == null) {
if (!hasMember) {
member = codec.decodeValue(bytes);
hasMember = true;
return;
}

Expand All @@ -88,15 +96,17 @@ public void set(ByteBuffer bytes) {
public void set(double number) {

if (withDistance) {
if (distance == null) {
if (!hasDistance) {
distance = number;
hasDistance = true;
return;
}
}

if (withCoordinates) {
if (x == null) {
if (!hasX) {
x = number;
hasX = true;
return;
}

Expand All @@ -111,10 +121,13 @@ public void complete(int depth) {
subscriber.onNext(output, new GeoWithin<V>(member, distance, geohash, coordinates));

member = null;
hasMember = false;
distance = null;
hasDistance = false;
geohash = null;
coordinates = null;
x = null;
hasX = false;
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/main/java/io/lettuce/core/output/KeyValueListOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class KeyValueListOutput<K, V> extends CommandOutput<K, V, List<KeyValue<

private K key;

private boolean hasKey;

public KeyValueListOutput(RedisCodec<K, V> codec) {
super(codec, Collections.emptyList());
setSubscriber(ListSubscriber.instance());
Expand All @@ -62,13 +64,15 @@ public KeyValueListOutput(RedisCodec<K, V> codec, Iterable<K> keys) {
public void set(ByteBuffer bytes) {

if (keys == null) {
if (key == null) {
if (!hasKey) {
key = codec.decodeKey(bytes);
hasKey = true;
return;
}

K key = this.key;
this.key = null;
this.hasKey = false;
subscriber.onNext(output, KeyValue.fromNullable(key, bytes == null ? null : codec.decodeValue(bytes)));

} else {
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/io/lettuce/core/output/KeyValueOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class KeyValueOutput<K, V> extends CommandOutput<K, V, KeyValue<K, V>> {

private K key;

private boolean hasKey;

public KeyValueOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
Expand All @@ -40,8 +42,9 @@ public KeyValueOutput(RedisCodec<K, V> codec) {
public void set(ByteBuffer bytes) {

if (bytes != null) {
if (key == null) {
if (!hasKey) {
key = codec.decodeKey(bytes);
hasKey = true;
} else {
V value = codec.decodeValue(bytes);
output = KeyValue.fromNullable(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class KeyValueScanStreamingOutput<K, V> extends ScanOutput<K, V, StreamSc

private K key;

private boolean hasKey;

private KeyValueStreamingChannel<K, V> channel;

public KeyValueScanStreamingOutput(RedisCodec<K, V> codec, KeyValueStreamingChannel<K, V> channel) {
Expand All @@ -42,8 +44,9 @@ public KeyValueScanStreamingOutput(RedisCodec<K, V> codec, KeyValueStreamingChan
@Override
protected void setOutput(ByteBuffer bytes) {

if (key == null) {
if (!hasKey) {
key = codec.decodeKey(bytes);
hasKey = true;
return;
}

Expand All @@ -52,6 +55,7 @@ protected void setOutput(ByteBuffer bytes) {
channel.onKeyValue(key, value);
output.setCount(output.getCount() + 1);
key = null;
hasKey = false;
}

}
Loading

0 comments on commit 91b3b1f

Please sign in to comment.