Skip to content

Commit

Permalink
Refactor WeightedLoadbalanceStrategy to use a Builder
Browse files Browse the repository at this point in the history
Signed-off-by: Rossen Stoyanchev <rstoyanchev@vmware.com>
  • Loading branch information
rstoyanchev committed Oct 26, 2020
1 parent e57621c commit be1207b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Builder roundRobinLoadbalanceStrategy() {
* <p>By default, {@link RoundRobinLoadbalanceStrategy} is used.
*/
public Builder weightedLoadbalanceStrategy() {
this.loadbalanceStrategy = new WeightedLoadbalanceStrategy();
this.loadbalanceStrategy = WeightedLoadbalanceStrategy.create();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import io.rsocket.core.RSocketConnector;
import io.rsocket.plugins.RequestInterceptor;
import java.util.List;
import java.util.SplittableRandom;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import reactor.util.annotation.Nullable;
Expand All @@ -37,32 +36,13 @@ public class WeightedLoadbalanceStrategy implements ClientLoadbalanceStrategy {

private static final double EXP_FACTOR = 4.0;

private static final int EFFORT = 5;

final int effort;
final SplittableRandom splittableRandom;
final int maxPairSelectionAttempts;
final Function<RSocket, WeightedStats> weightedStatsResolver;

public WeightedLoadbalanceStrategy() {
this(new DefaultWeightedStatsResolver());
}

public WeightedLoadbalanceStrategy(Function<RSocket, WeightedStats> weightedStatsResolver) {
this(EFFORT, weightedStatsResolver);
}

public WeightedLoadbalanceStrategy(
int effort, Function<RSocket, WeightedStats> weightedStatsResolver) {
this(effort, new SplittableRandom(System.nanoTime()), weightedStatsResolver);
}

public WeightedLoadbalanceStrategy(
int effort,
SplittableRandom splittableRandom,
Function<RSocket, WeightedStats> weightedStatsResolver) {
this.effort = effort;
this.splittableRandom = splittableRandom;
this.weightedStatsResolver = weightedStatsResolver;
private WeightedLoadbalanceStrategy(
int numberOfAttempts, @Nullable Function<RSocket, WeightedStats> resolver) {
this.maxPairSelectionAttempts = numberOfAttempts;
this.weightedStatsResolver = (resolver != null ? resolver : new DefaultWeightedStatsResolver());
}

@Override
Expand All @@ -75,7 +55,7 @@ public void initialize(RSocketConnector connector) {

@Override
public RSocket select(List<RSocket> sockets) {
final int effort = this.effort;
final int numberOfAttepmts = this.maxPairSelectionAttempts;
final int size = sockets.size();

RSocket weightedRSocket;
Expand Down Expand Up @@ -103,7 +83,7 @@ public RSocket select(List<RSocket> sockets) {
RSocket rsc1 = null;
RSocket rsc2 = null;

for (int i = 0; i < effort; i++) {
for (int i = 0; i < numberOfAttepmts; i++) {
int i1 = ThreadLocalRandom.current().nextInt(size);
int i2 = ThreadLocalRandom.current().nextInt(size - 1);

Expand Down Expand Up @@ -168,30 +148,83 @@ private static double calculateFactor(final double u, final double l, final doub
return Math.pow(1 + alpha, EXP_FACTOR);
}

static class DefaultWeightedStatsResolver implements Function<RSocket, WeightedStats> {
/** Create an instance of {@link WeightedLoadbalanceStrategy} with default settings. */
public static WeightedLoadbalanceStrategy create() {
return new Builder().build();
}

/** Return a builder to create a {@link WeightedLoadbalanceStrategy} with. */
public static Builder builder() {
return new Builder();
}

/** Builder for {@link WeightedLoadbalanceStrategy}. */
public static class Builder {

private int maxPairSelectionAttempts = 5;

@Nullable private Function<RSocket, WeightedStats> weightedStatsResolver;

private Builder() {}

/**
* How many times to try to randomly select a pair of RSocket connections with non-zero
* availability. This is applicable when there are more than two connections in the pool. If the
* number of attempts is exceeded, the last selected pair is used.
*
* <p>By default this is set to 5.
*
* @param numberOfAttempts the iteration count
*/
public Builder maxPairSelectionAttempts(int numberOfAttempts) {
this.maxPairSelectionAttempts = numberOfAttempts;
return this;
}

/**
* Configure how the created {@link WeightedLoadbalanceStrategy} should find the stats for a
* given RSocket.
*
* <p>By default {@code WeightedLoadbalanceStrategy} installs a {@code RequestInterceptor} when
* {@link ClientLoadbalanceStrategy#initialize(RSocketConnector)} is called in order to keep
* track of stats.
*
* @param resolver the function to find the stats for an RSocket
*/
public Builder weightedStatsResolver(Function<RSocket, WeightedStats> resolver) {
this.weightedStatsResolver = resolver;
return this;
}

public WeightedLoadbalanceStrategy build() {
return new WeightedLoadbalanceStrategy(
this.maxPairSelectionAttempts, this.weightedStatsResolver);
}
}

private static class DefaultWeightedStatsResolver implements Function<RSocket, WeightedStats> {

final ConcurrentMap<RSocket, WeightedStatsRequestInterceptor> rsocketsInterceptors =
new ConcurrentHashMap<>();
final Map<RSocket, WeightedStats> statsMap = new ConcurrentHashMap<>();

@Override
public WeightedStats apply(RSocket rSocket) {
return rsocketsInterceptors.get(rSocket);
return statsMap.get(rSocket);
}

void init(RSocketConnector connector) {
connector.interceptors(
ir ->
ir.forRequester(
registry ->
registry.forRequester(
(Function<RSocket, ? extends RequestInterceptor>)
rSocket -> {
final WeightedStatsRequestInterceptor interceptor =
new WeightedStatsRequestInterceptor() {
@Override
public void dispose() {
rsocketsInterceptors.remove(rSocket);
statsMap.remove(rSocket);
}
};
rsocketsInterceptors.put(rSocket, interceptor);
statsMap.put(rSocket, interceptor);

return interceptor;
}));
Expand Down

0 comments on commit be1207b

Please sign in to comment.