Skip to content

Commit

Permalink
Refactor Optional parameters in the constructor of strategy (linkedin…
Browse files Browse the repository at this point in the history
…#836)

There are lots of Optional fields added to the constructor of the strategy. Most of these fields are not really used as Optional. Refactoring the code to make it simpler and easier to maintain. We will make the fields Optional in future on need basis.
  • Loading branch information
vmaheshw authored and Vaibhav Maheshwari committed Mar 1, 2022
1 parent 462f1f4 commit 902d210
Show file tree
Hide file tree
Showing 16 changed files with 349 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.Collectors;

import org.apache.commons.lang3.Validate;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mockito;
import org.mockito.invocation.Invocation;
import org.slf4j.Logger;
Expand Down Expand Up @@ -88,6 +89,7 @@
import static com.linkedin.datastream.common.DatastreamMetadataConstants.CREATION_MS;
import static com.linkedin.datastream.common.DatastreamMetadataConstants.SYSTEM_DESTINATION_PREFIX;
import static com.linkedin.datastream.common.DatastreamMetadataConstants.TTL_MS;
import static com.linkedin.datastream.server.assignment.StickyMulticastStrategyFactory.DEFAULT_IMBALANCE_THRESHOLD;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyObject;
Expand Down Expand Up @@ -568,19 +570,19 @@ public void testCoordinationWithStickyMulticastStrategy() throws Exception {

TestHookConnector connector1 = new TestHookConnector("connector1", testConnectorType);
//Question why the multicast strategy is within one coordinator rather than shared between list of coordinators
instance1.addConnector(testConnectorType, connector1, new StickyMulticastStrategy(Optional.of(4), Optional.of(2)), false,
instance1.addConnector(testConnectorType, connector1, new StickyMulticastStrategy(Optional.of(4), 2), false,
new SourceBasedDeduper(), null);
instance1.start();

Coordinator instance2 = createCoordinator(_zkConnectionString, testCluster);
TestHookConnector connector2 = new TestHookConnector("connector2", testConnectorType);
instance2.addConnector(testConnectorType, connector2, new StickyMulticastStrategy(Optional.of(4), Optional.of(2)), false,
instance2.addConnector(testConnectorType, connector2, new StickyMulticastStrategy(Optional.of(4), 2), false,
new SourceBasedDeduper(), null);
instance2.start();

Coordinator instance3 = createCoordinator(_zkConnectionString, testCluster);
TestHookConnector connector3 = new TestHookConnector("connector3", testConnectorType);
instance3.addConnector(testConnectorType, connector3, new StickyMulticastStrategy(Optional.of(4), Optional.of(2)), false,
instance3.addConnector(testConnectorType, connector3, new StickyMulticastStrategy(Optional.of(4), 2), false,
new SourceBasedDeduper(), null);
instance3.start();

Expand Down Expand Up @@ -638,7 +640,7 @@ public void testCoordinationWithStickyMulticastStrategy() throws Exception {
// now add another instance, make sure it's getting rebalanced
Coordinator instance4 = createCoordinator(_zkConnectionString, testCluster);
TestHookConnector connector4 = new TestHookConnector("connector4", testConnectorType);
instance4.addConnector(testConnectorType, connector4, new StickyMulticastStrategy(Optional.of(4), Optional.of(2)), false,
instance4.addConnector(testConnectorType, connector4, new StickyMulticastStrategy(Optional.of(4), 2), false,
new SourceBasedDeduper(), null);
instance4.start();

Expand Down Expand Up @@ -719,19 +721,19 @@ public void testCoordinationWithStickyMulticastStrategyAndMaxTaskLimit() throws
Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster, properties);

TestHookConnector connector1 = new TestHookConnector("connector1", testConnectorType);
instance1.addConnector(testConnectorType, connector1, new StickyMulticastStrategy(Optional.of(4), Optional.of(2)), false,
instance1.addConnector(testConnectorType, connector1, new StickyMulticastStrategy(Optional.of(4), 2), false,
new SourceBasedDeduper(), null);
instance1.start();

Coordinator instance2 = createCoordinator(_zkConnectionString, testCluster);
TestHookConnector connector2 = new TestHookConnector("connector2", testConnectorType);
instance2.addConnector(testConnectorType, connector2, new StickyMulticastStrategy(Optional.of(4), Optional.of(2)), false,
instance2.addConnector(testConnectorType, connector2, new StickyMulticastStrategy(Optional.of(4), 2), false,
new SourceBasedDeduper(), null);
instance2.start();

Coordinator instance3 = createCoordinator(_zkConnectionString, testCluster);
TestHookConnector connector3 = new TestHookConnector("connector3", testConnectorType);
instance3.addConnector(testConnectorType, connector3, new StickyMulticastStrategy(Optional.of(4), Optional.of(2)), false,
instance3.addConnector(testConnectorType, connector3, new StickyMulticastStrategy(Optional.of(4), 2), false,
new SourceBasedDeduper(), null);
instance3.start();

Expand All @@ -754,7 +756,7 @@ public void testCoordinationWithStickyMulticastStrategyAndMaxTaskLimit() throws
// now add another instance, make sure it's getting rebalanced
Coordinator instance4 = createCoordinator(_zkConnectionString, testCluster);
TestHookConnector connector4 = new TestHookConnector("connector4", testConnectorType);
instance4.addConnector(testConnectorType, connector4, new StickyMulticastStrategy(Optional.of(4), Optional.of(2)), false,
instance4.addConnector(testConnectorType, connector4, new StickyMulticastStrategy(Optional.of(4), 2), false,
new SourceBasedDeduper(), null);
instance4.start();

Expand Down Expand Up @@ -796,21 +798,20 @@ public void testCoordinationWithPartitionAssignment() throws Exception {
TestHookConnector connector1 = createConnectorWithPartitionListener("connector1", testConnectorType, partitions, initialDelays);

//Question why the multicast strategy is within one coordinator rather than shared between list of coordinators
instance1.addConnector(testConnectorType, connector1, new StickyPartitionAssignmentStrategy(Optional.of(4),
Optional.of(2), Optional.empty(), testCluster), false, new SourceBasedDeduper(), null);
instance1.addConnector(testConnectorType, connector1, new StickyPartitionAssignmentStrategy(Optional.of(4), 2,
Integer.MAX_VALUE, testCluster), false, new SourceBasedDeduper(), null);
instance1.start();

Coordinator instance2 = createCoordinator(_zkConnectionString, testCluster);
TestHookConnector connector2 = createConnectorWithPartitionListener("connector2", testConnectorType, partitions, initialDelays);
instance2.addConnector(testConnectorType, connector2, new StickyPartitionAssignmentStrategy(Optional.of(4),
Optional.of(2), Optional.empty(), testCluster), false, new SourceBasedDeduper(), null);
instance2.addConnector(testConnectorType, connector2, new StickyPartitionAssignmentStrategy(Optional.of(4), 2,
Integer.MAX_VALUE, testCluster), false, new SourceBasedDeduper(), null);
instance2.start();

Coordinator instance3 = createCoordinator(_zkConnectionString, testCluster);
TestHookConnector connector3 = createConnectorWithPartitionListener("connector3", testConnectorType, partitions, initialDelays);
instance3.addConnector(testConnectorType, connector3, new StickyPartitionAssignmentStrategy(Optional.of(4),
Optional.of(2), Optional.empty(), testCluster), false,
new SourceBasedDeduper(), null);
instance3.addConnector(testConnectorType, connector3, new StickyPartitionAssignmentStrategy(Optional.of(4), 2,
Integer.MAX_VALUE, testCluster), false, new SourceBasedDeduper(), null);
instance3.start();

List<TestHookConnector> connectors = new ArrayList<>();
Expand Down Expand Up @@ -948,24 +949,20 @@ public void testCoordinationWithElasticTaskAssignmentPartitionAssignment() throw

int partitionsPerTask = 4;
int fullnessFactorPct = 50;
instance1.addConnector(testConnectorType, connector1, new StickyPartitionAssignmentStrategy(Optional.empty(),
Optional.empty(), Optional.empty(), true, Optional.of(partitionsPerTask),
Optional.of(fullnessFactorPct), Optional.of(zkClient), testCluster), false, new SourceBasedDeduper(), null);
instance1.addConnector(testConnectorType, connector1, createStrategy(testCluster, zkClient, partitionsPerTask, fullnessFactorPct),
false, new SourceBasedDeduper(), null);
instance1.start();

Coordinator instance2 = createCoordinator(_zkConnectionString, testCluster);
TestHookConnector connector2 = createConnectorWithPartitionListener("connector2", testConnectorType, partitions, initialDelays);
instance2.addConnector(testConnectorType, connector2, new StickyPartitionAssignmentStrategy(Optional.empty(),
Optional.empty(), Optional.empty(), true, Optional.of(partitionsPerTask),
Optional.of(fullnessFactorPct), Optional.of(zkClient), testCluster), false, new SourceBasedDeduper(), null);
instance2.addConnector(testConnectorType, connector2, createStrategy(testCluster, zkClient, partitionsPerTask, fullnessFactorPct),
false, new SourceBasedDeduper(), null);
instance2.start();

Coordinator instance3 = createCoordinator(_zkConnectionString, testCluster);
TestHookConnector connector3 = createConnectorWithPartitionListener("connector3", testConnectorType, partitions, initialDelays);
instance3.addConnector(testConnectorType, connector3, new StickyPartitionAssignmentStrategy(Optional.empty(),
Optional.empty(), Optional.empty(), true, Optional.of(partitionsPerTask),
Optional.of(fullnessFactorPct), Optional.of(zkClient), testCluster), false,
new SourceBasedDeduper(), null);
instance3.addConnector(testConnectorType, connector3, createStrategy(testCluster, zkClient, partitionsPerTask, fullnessFactorPct),
false, new SourceBasedDeduper(), null);
instance3.start();

List<TestHookConnector> connectors = new ArrayList<>();
Expand Down Expand Up @@ -1005,6 +1002,13 @@ public void testCoordinationWithElasticTaskAssignmentPartitionAssignment() throw
instance3.getDatastreamCache().getZkclient().close();
}

@NotNull
private StickyPartitionAssignmentStrategy createStrategy(String testCluster, ZkClient zkClient, int partitionsPerTask,
int fullnessFactorPct) {
return new StickyPartitionAssignmentStrategy(Optional.empty(), DEFAULT_IMBALANCE_THRESHOLD, Integer.MAX_VALUE, true,
partitionsPerTask, fullnessFactorPct, zkClient, testCluster);
}

/**
* Test Datastream create with BYOT where destination is in use by another datastream
*/
Expand Down Expand Up @@ -1203,7 +1207,7 @@ public void testValidatePartitionAssignmentSupported() throws Exception {
ZkClient zkClient = new ZkClient(_zkConnectionString);

coordinator.addConnector(connectorType1, connector1, new StickyPartitionAssignmentStrategy(Optional.of(4),
Optional.empty(), Optional.empty(), testCluster), false,
DEFAULT_IMBALANCE_THRESHOLD, Integer.MAX_VALUE, testCluster), false,
new SourceBasedDeduper(), null);
coordinator.addConnector(connectorType2, connector2, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public Map<String, Set<DatastreamTask>> assignPartitions(ClusterThroughputInfo t
// sort the current assignment's tasks on total throughput
Map<String, Integer> taskThroughputMap = new HashMap<>();
PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(
PartitionAssignmentStrategyConfig.PARTITION_BYTES_IN_KB_RATE_DEFAULT,
PartitionAssignmentStrategyConfig.PARTITION_MESSAGES_IN_RATE_DEFAULT, "");
LoadBasedPartitionAssignmentStrategyConfig.DEFAULT_PARTITION_BYTES_IN_KB_RATE,
LoadBasedPartitionAssignmentStrategyConfig.DEFAULT_PARTITION_MESSAGES_IN_RATE, "");
newPartitions.forEach((task, partitions) -> {
int totalThroughput = partitions.stream()
.mapToInt(p -> partitionInfoMap.getOrDefault(p, defaultPartitionInfo).getBytesInKBRate())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package com.linkedin.datastream.server.assignment;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -40,12 +39,6 @@
public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignmentStrategy {
private static final Logger LOG = LoggerFactory.getLogger(LoadBasedPartitionAssignmentStrategy.class.getName());

private static final int THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT = (int) Duration.ofSeconds(10).toMillis();
private static final int THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT = (int) Duration.ofSeconds(1).toMillis();

private static final int TASK_CAPACITY_MBPS_DEFAULT = 4;
private static final int TASK_CAPACITY_UTILIZATION_PCT_DEFAULT = 90;

private final PartitionThroughputProvider _throughputProvider;
private final int _taskCapacityMBps;
private final int _taskCapacityUtilizationPct;
Expand All @@ -59,21 +52,17 @@ public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignm
/**
* Creates an instance of {@link LoadBasedPartitionAssignmentStrategy}
*/
public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughputProvider,
Optional<Integer> maxTasks, Optional<Integer> imbalanceThreshold, Optional<Integer> maxPartitionPerTask,
boolean enableElasticTaskAssignment, Optional<Integer> partitionsPerTask,
Optional<Integer> partitionFullnessFactorPct, Optional<Integer> taskCapacityMBps,
Optional<Integer> taskCapacityUtilizationPct, Optional<Integer> throughputInfoFetchTimeoutMs,
Optional<Integer> throughputInfoFetchRetryPeriodMs, Optional<ZkClient> zkClient,
String clusterName) {
public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughputProvider, Optional<Integer> maxTasks,
int imbalanceThreshold, int maxPartitionPerTask, boolean enableElasticTaskAssignment, int partitionsPerTask,
int partitionFullnessFactorPct, int taskCapacityMBps, int taskCapacityUtilizationPct,
int throughputInfoFetchTimeoutMs, int throughputInfoFetchRetryPeriodMs, ZkClient zkClient, String clusterName) {
super(maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask,
partitionFullnessFactorPct, zkClient, clusterName);
_throughputProvider = throughputProvider;
_taskCapacityMBps = taskCapacityMBps.orElse(TASK_CAPACITY_MBPS_DEFAULT);
_taskCapacityUtilizationPct = taskCapacityUtilizationPct.orElse(TASK_CAPACITY_UTILIZATION_PCT_DEFAULT);
_throughputInfoFetchTimeoutMs = throughputInfoFetchTimeoutMs.orElse(THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT);
_throughputInfoFetchRetryPeriodMs = throughputInfoFetchRetryPeriodMs.
orElse(THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT);
_taskCapacityMBps = taskCapacityMBps;
_taskCapacityUtilizationPct = taskCapacityUtilizationPct;
_throughputInfoFetchTimeoutMs = throughputInfoFetchTimeoutMs;
_throughputInfoFetchRetryPeriodMs = throughputInfoFetchRetryPeriodMs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Copyright 2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server.assignment;

import java.time.Duration;
import java.util.Properties;

import com.linkedin.datastream.common.VerifiableProperties;

/**
* Configuration properties for {@link LoadBasedPartitionAssignmentStrategy} and its extensions
*/
public class LoadBasedPartitionAssignmentStrategyConfig extends PartitionAssignmentStrategyConfig {

public static final int DEFAULT_PARTITION_BYTES_IN_KB_RATE = 5;
public static final int DEFAULT_PARTITION_MESSAGES_IN_RATE = 5;

private static final int DEFAULT_THROUGHPUT_INFO_FETCH_TIMEOUT_MS = (int) Duration.ofSeconds(10).toMillis();
private static final int DEFAULT_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS = (int) Duration.ofSeconds(1).toMillis();
private static final int DEFAULT_TASK_CAPACITY_MBPS = 4;
private static final int DEFAULT_TASK_CAPACITY_UTILIZATION_PCT = 90;

private final int _taskCapacityMBps;
private final int _taskCapacityUtilizationPct;
private final int _throughputInfoFetchTimeoutMs;
private final int _throughputInfoFetchRetryPeriodMs;
/**
* Creates an instance of {@link LoadBasedPartitionAssignmentStrategyConfig}
* @param config Config properties
*/
public LoadBasedPartitionAssignmentStrategyConfig(Properties config) {
super(config);
VerifiableProperties props = new VerifiableProperties(config);
_taskCapacityMBps = props.getInt(CFG_TASK_CAPACITY_MBPS, DEFAULT_TASK_CAPACITY_MBPS);
_taskCapacityUtilizationPct = props.getIntInRange(CFG_TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_TASK_CAPACITY_UTILIZATION_PCT, 0, 100);
_throughputInfoFetchTimeoutMs = props.getInt(CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS, DEFAULT_THROUGHPUT_INFO_FETCH_TIMEOUT_MS);
_throughputInfoFetchRetryPeriodMs = props.getInt(CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS, DEFAULT_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS);
}

/**
* Gets task capacity measured in MB/sec
* @return Task capacity in MB/sec
*/
public int getTaskCapacityMBps() {
return _taskCapacityMBps;
}

/**
* Gets task capacity utilization percentage
* @return Task capacity utilization percentage
*/
public int getTaskCapacityUtilizationPct() {
return _taskCapacityUtilizationPct;
}

/**
* Gets throughput info fetch timeout in milliseconds
* @return Throughput info fetch timeout in milliseconds
*/
public int getThroughputInfoFetchTimeoutMs() {
return _throughputInfoFetchTimeoutMs;
}

/**
* Gets the throughput info fetch retry period in milliseconds
* @return Throughput info fetch retry period in milliseconds
*/
public int getThroughputInfoFetchRetryPeriodMs() {
return _throughputInfoFetchRetryPeriodMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package com.linkedin.datastream.server.assignment;

import java.util.Optional;
import java.util.Properties;

import org.slf4j.Logger;
Expand All @@ -25,11 +24,11 @@ public class LoadBasedPartitionAssignmentStrategyFactory extends StickyPartition

@Override
public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties) {
_config = new PartitionAssignmentStrategyConfig(assignmentStrategyProperties);

_config = new LoadBasedPartitionAssignmentStrategyConfig(assignmentStrategyProperties);
LoadBasedPartitionAssignmentStrategyConfig config = (LoadBasedPartitionAssignmentStrategyConfig) _config;
boolean enableElasticTaskAssignment = _config.isElasticTaskAssignmentEnabled();
// Create the zookeeper client
Optional<ZkClient> zkClient = Optional.empty();
ZkClient zkClient = null;
try {
zkClient = constructZooKeeperClient();
} catch (IllegalStateException ex) {
Expand All @@ -41,9 +40,9 @@ public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties

return new LoadBasedPartitionAssignmentStrategy(provider, _config.getMaxTasks(),
_config.getImbalanceThreshold(), _config.getMaxPartitions(), enableElasticTaskAssignment,
_config.getPartitionsPerTask(), _config.getPartitionFullnessThresholdPct(), _config.getTaskCapacityMBps(),
_config.getTaskCapacityUtilizationPct(), _config.getThroughputInfoFetchTimeoutMs(),
_config.getThroughputInfoFetchRetryPeriodMs(), zkClient, _config.getCluster());
_config.getPartitionsPerTask(), _config.getPartitionFullnessThresholdPct(), config.getTaskCapacityMBps(),
config.getTaskCapacityUtilizationPct(), config.getThroughputInfoFetchTimeoutMs(),
config.getThroughputInfoFetchRetryPeriodMs(), zkClient, _config.getCluster());
}

protected PartitionThroughputProvider constructPartitionThroughputProvider() {
Expand Down
Loading

0 comments on commit 902d210

Please sign in to comment.