Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update WeightedLoadbalanceStrategy to use a Builder #949

Merged
merged 1 commit into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Builder roundRobinLoadbalanceStrategy() {
* <p>By default, {@link RoundRobinLoadbalanceStrategy} is used.
*/
public Builder weightedLoadbalanceStrategy() {
this.loadbalanceStrategy = new WeightedLoadbalanceStrategy();
this.loadbalanceStrategy = WeightedLoadbalanceStrategy.create();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import io.rsocket.core.RSocketConnector;
import io.rsocket.plugins.RequestInterceptor;
import java.util.List;
import java.util.SplittableRandom;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import reactor.util.annotation.Nullable;
Expand All @@ -37,32 +36,13 @@ public class WeightedLoadbalanceStrategy implements ClientLoadbalanceStrategy {

private static final double EXP_FACTOR = 4.0;

private static final int EFFORT = 5;

final int effort;
final SplittableRandom splittableRandom;
final int maxPairSelectionAttempts;
final Function<RSocket, WeightedStats> weightedStatsResolver;

public WeightedLoadbalanceStrategy() {
this(new DefaultWeightedStatsResolver());
}

public WeightedLoadbalanceStrategy(Function<RSocket, WeightedStats> weightedStatsResolver) {
this(EFFORT, weightedStatsResolver);
}

public WeightedLoadbalanceStrategy(
int effort, Function<RSocket, WeightedStats> weightedStatsResolver) {
this(effort, new SplittableRandom(System.nanoTime()), weightedStatsResolver);
}

public WeightedLoadbalanceStrategy(
int effort,
SplittableRandom splittableRandom,
Function<RSocket, WeightedStats> weightedStatsResolver) {
this.effort = effort;
this.splittableRandom = splittableRandom;
this.weightedStatsResolver = weightedStatsResolver;
private WeightedLoadbalanceStrategy(
int numberOfAttempts, @Nullable Function<RSocket, WeightedStats> resolver) {
this.maxPairSelectionAttempts = numberOfAttempts;
this.weightedStatsResolver = (resolver != null ? resolver : new DefaultWeightedStatsResolver());
}

@Override
Expand All @@ -75,7 +55,7 @@ public void initialize(RSocketConnector connector) {

@Override
public RSocket select(List<RSocket> sockets) {
final int effort = this.effort;
final int numberOfAttepmts = this.maxPairSelectionAttempts;
final int size = sockets.size();

RSocket weightedRSocket;
Expand Down Expand Up @@ -103,7 +83,7 @@ public RSocket select(List<RSocket> sockets) {
RSocket rsc1 = null;
RSocket rsc2 = null;

for (int i = 0; i < effort; i++) {
for (int i = 0; i < numberOfAttepmts; i++) {
int i1 = ThreadLocalRandom.current().nextInt(size);
int i2 = ThreadLocalRandom.current().nextInt(size - 1);

Expand Down Expand Up @@ -168,30 +148,83 @@ private static double calculateFactor(final double u, final double l, final doub
return Math.pow(1 + alpha, EXP_FACTOR);
}

static class DefaultWeightedStatsResolver implements Function<RSocket, WeightedStats> {
/** Create an instance of {@link WeightedLoadbalanceStrategy} with default settings. */
public static WeightedLoadbalanceStrategy create() {
return new Builder().build();
}

/** Return a builder to create a {@link WeightedLoadbalanceStrategy} with. */
public static Builder builder() {
return new Builder();
}

/** Builder for {@link WeightedLoadbalanceStrategy}. */
public static class Builder {

private int maxPairSelectionAttempts = 5;

@Nullable private Function<RSocket, WeightedStats> weightedStatsResolver;

private Builder() {}

/**
* How many times to try to randomly select a pair of RSocket connections with non-zero
* availability. This is applicable when there are more than two connections in the pool. If the
* number of attempts is exceeded, the last selected pair is used.
*
* <p>By default this is set to 5.
*
* @param numberOfAttempts the iteration count
*/
public Builder maxPairSelectionAttempts(int numberOfAttempts) {
this.maxPairSelectionAttempts = numberOfAttempts;
return this;
}

/**
* Configure how the created {@link WeightedLoadbalanceStrategy} should find the stats for a
* given RSocket.
*
* <p>By default {@code WeightedLoadbalanceStrategy} installs a {@code RequestInterceptor} when
* {@link ClientLoadbalanceStrategy#initialize(RSocketConnector)} is called in order to keep
* track of stats.
*
* @param resolver the function to find the stats for an RSocket
*/
public Builder weightedStatsResolver(Function<RSocket, WeightedStats> resolver) {
this.weightedStatsResolver = resolver;
return this;
}

public WeightedLoadbalanceStrategy build() {
return new WeightedLoadbalanceStrategy(
this.maxPairSelectionAttempts, this.weightedStatsResolver);
}
}

private static class DefaultWeightedStatsResolver implements Function<RSocket, WeightedStats> {

final ConcurrentMap<RSocket, WeightedStatsRequestInterceptor> rsocketsInterceptors =
new ConcurrentHashMap<>();
final Map<RSocket, WeightedStats> statsMap = new ConcurrentHashMap<>();

@Override
public WeightedStats apply(RSocket rSocket) {
return rsocketsInterceptors.get(rSocket);
return statsMap.get(rSocket);
}

void init(RSocketConnector connector) {
connector.interceptors(
ir ->
ir.forRequester(
registry ->
registry.forRequester(
(Function<RSocket, ? extends RequestInterceptor>)
rSocket -> {
final WeightedStatsRequestInterceptor interceptor =
new WeightedStatsRequestInterceptor() {
@Override
public void dispose() {
rsocketsInterceptors.remove(rSocket);
statsMap.remove(rSocket);
}
};
rsocketsInterceptors.put(rSocket, interceptor);
statsMap.put(rSocket, interceptor);

return interceptor;
}));
Expand Down