Skip to content

Commit

Permalink
Rebase and address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Vaibhav Maheshwari committed Jun 18, 2021
1 parent 5a6c8c7 commit e6ebe87
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*/
public class StickyMulticastStrategyFactory implements AssignmentStrategyFactory {
public static final String CFG_IMBALANCE_THRESHOLD = "imbalanceThreshold";
public static final Integer DEFAULT_IMBALANCE_THRESHOLD = 1;
public static final int DEFAULT_IMBALANCE_THRESHOLD = 1;

@Override
public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void fallbackToBaseClassWhenThroughputFetchFailsTest() {
int taskCapacityUtilizationPct = 90;
int throughputInfoFetchTimeoutMs = 1000;
int throughputInfoFetchRetryPeriodMs = 200;
ZkClient zkClient = null;
ZkClient zkClient = _zkClient;

LoadBasedPartitionAssignmentStrategy strategy = Mockito.spy(new LoadBasedPartitionAssignmentStrategy(mockProvider,
maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask,
Expand Down Expand Up @@ -158,7 +158,7 @@ public void doesntFetchPartitionInfoOnIncrementalAssignmentTest() {
int taskCapacityUtilizationPct = 90;
int throughputInfoFetchTimeoutMs = 1000;
int throughputInfoFetchRetryPeriodMs = 200;
ZkClient zkClient = null;
ZkClient zkClient = _zkClient;

LoadBasedPartitionAssignmentStrategy strategy = new LoadBasedPartitionAssignmentStrategy(mockProvider,
maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask,
Expand Down Expand Up @@ -201,7 +201,7 @@ public void updatesNumTasksAndThrowsExceptionWhenNoSufficientTasksTest() {
int taskCapacityUtilizationPct = 90;
int throughputInfoFetchTimeoutMs = 1000;
int throughputInfoFetchRetryPeriodMs = 200;
ZkClient zkClient = null;
ZkClient zkClient = _zkClient;

LoadBasedPartitionAssignmentStrategy strategy = new LoadBasedPartitionAssignmentStrategy(mockProvider,
maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class TestPartitionAssignmentStrategyConfig {
private static final String CFG_CLUSTER_NAME_VALUE = "dummyCluster";
private static final String INVALID_INTEGER_VALUE = "-1";


@Test
public void configValuesCorrectlyAssignedTest() {
Properties props = new Properties();
Expand Down Expand Up @@ -75,7 +74,7 @@ public void configValuesRevertedToEmptyWhenInvalidTest() {

PartitionAssignmentStrategyConfig config = new PartitionAssignmentStrategyConfig(props);
Assert.assertEquals(config.getMaxTasks(), Optional.empty());
Assert.assertEquals(config.getImbalanceThreshold(), (int) DEFAULT_IMBALANCE_THRESHOLD);
Assert.assertEquals(config.getImbalanceThreshold(), DEFAULT_IMBALANCE_THRESHOLD);
Assert.assertEquals(config.getMaxPartitions(), Integer.MAX_VALUE);
Assert.assertEquals(config.getPartitionsPerTask(), DEFAULT_PARTITIONS_PER_TASK);
}
Expand All @@ -84,7 +83,7 @@ public void configValuesRevertedToEmptyWhenInvalidTest() {
public void configValuesSetToDefaultWhenNotProvidedTest() {
PartitionAssignmentStrategyConfig config = new PartitionAssignmentStrategyConfig(new Properties());
Assert.assertEquals(config.getMaxTasks(), Optional.empty());
Assert.assertEquals(config.getImbalanceThreshold(), (int) DEFAULT_IMBALANCE_THRESHOLD);
Assert.assertEquals(config.getImbalanceThreshold(), DEFAULT_IMBALANCE_THRESHOLD);
Assert.assertEquals(config.getMaxPartitions(), Integer.MAX_VALUE);
Assert.assertEquals(config.getPartitionsPerTask(), DEFAULT_PARTITIONS_PER_TASK);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -288,7 +287,7 @@ public void testSameTaskIsNotAssignedToMoreThanOneInstance() {
String[] instances = new String[]{"instance1", "instance2", "instance3"};
int numDatastreams = 5;
List<DatastreamGroup> datastreams = generateDatastreams("ds", numDatastreams);
StickyMulticastStrategy strategy = createStickyMulticastStrategyObject(Optional.empty());
StickyMulticastStrategy strategy = new StickyMulticastStrategy(Optional.empty());
Map<String, Set<DatastreamTask>> assignment =
strategy.assign(datastreams, Arrays.asList(instances), new HashMap<>());
// Copying the assignment to simulate the scenario where two instances have the same task,
Expand All @@ -302,16 +301,11 @@ public void testSameTaskIsNotAssignedToMoreThanOneInstance() {
Assert.assertEquals(newAssignmentTasks.size(), instances.length * numDatastreams);
}

@NotNull
private StickyMulticastStrategy createStickyMulticastStrategyObject(Optional<Integer> maxTasks) {
return new StickyMulticastStrategy(maxTasks);
}

@Test
public void testRemoveDatastreamTasksWhenDatastreamIsDeleted() {
List<String> instances = Arrays.asList("instance1", "instance2", "instance3");
List<DatastreamGroup> datastreams = generateDatastreams("ds", 5);
StickyMulticastStrategy strategy = createStickyMulticastStrategyObject(Optional.empty());
StickyMulticastStrategy strategy = new StickyMulticastStrategy(Optional.empty());
Map<String, Set<DatastreamTask>> assignment = strategy.assign(datastreams, instances, new HashMap<>());

datastreams.remove(0);
Expand All @@ -328,7 +322,7 @@ public void testRemoveDatastreamTasksWhenDatastreamIsDeleted() {
// test with strategy where dsTaskLimitPerInstance is greater than 1
int maxTasksConfig = 12;
datastreams = generateDatastreams("ds", 5);
strategy = createStickyMulticastStrategyObject(Optional.of(maxTasksConfig));
strategy = new StickyMulticastStrategy(Optional.of(maxTasksConfig));
assignment = strategy.assign(datastreams, instances, new HashMap<>());

datastreams.remove(0);
Expand All @@ -348,7 +342,7 @@ public void testRemoveDatastreamTasksWhenDatastreamIsDeleted() {
public void testCreateNewTasksOnlyForNewDatastreamWhenDatastreamIsCreated() {
List<String> instances = Arrays.asList("instance1", "instance2", "instance3");
List<DatastreamGroup> datastreams = generateDatastreams("ds", 5);
StickyMulticastStrategy strategy = createStickyMulticastStrategyObject(Optional.empty());
StickyMulticastStrategy strategy = new StickyMulticastStrategy(Optional.empty());
Map<String, Set<DatastreamTask>> assignment = strategy.assign(datastreams, instances, new HashMap<>());

List<DatastreamGroup> newDatastreams = new ArrayList<>(datastreams);
Expand All @@ -369,7 +363,7 @@ public void testCreateNewTasksOnlyForNewDatastreamWhenDatastreamIsCreated() {
// test with strategy where dsTaskLimitPerInstance is greater than 1
int maxTasksConfig = 12;
datastreams = generateDatastreams("ds", 5);
strategy = createStickyMulticastStrategyObject(Optional.of(maxTasksConfig));
strategy = new StickyMulticastStrategy(Optional.of(maxTasksConfig));
assignment = strategy.assign(datastreams, instances, new HashMap<>());

newDatastreams = new ArrayList<>(datastreams);
Expand All @@ -394,7 +388,7 @@ public void testCreateNewTasksOnlyForNewInstanceWhenInstanceIsAdded() {
List<String> instances = Arrays.asList("instance1", "instance2", "instance3");
String instance4 = "instance4";
List<DatastreamGroup> datastreams = generateDatastreams("ds", 5);
StickyMulticastStrategy strategy = createStickyMulticastStrategyObject(Optional.empty());
StickyMulticastStrategy strategy = new StickyMulticastStrategy(Optional.empty());
Map<String, Set<DatastreamTask>> assignment = strategy.assign(datastreams, instances, new HashMap<>());
List<String> newInstances = new ArrayList<>(instances);
newInstances.add(instance4);
Expand All @@ -417,7 +411,7 @@ public void testStickyFairDistributionWhenNewInstanceIsAdded() {
String instance4 = "instance4";
List<DatastreamGroup> datastreams = generateDatastreams("ds", 5);
int maxTasksConfig = 12;
StickyMulticastStrategy strategy = createStickyMulticastStrategyObject(Optional.of(maxTasksConfig));
StickyMulticastStrategy strategy = new StickyMulticastStrategy(Optional.of(maxTasksConfig));
Map<String, Set<DatastreamTask>> assignment = strategy.assign(datastreams, instances, new HashMap<>());
List<String> newInstances = new ArrayList<>(instances);
newInstances.add(instance4);
Expand Down Expand Up @@ -484,7 +478,7 @@ public void testTriggerRebalanceWhenDeletingDatastream() {
public void testExtraTasksAreNotAssignedDuringReassignment() {
String[] instances = new String[]{"instance1"};
List<DatastreamGroup> datastreams = generateDatastreams("ds", 5);
StickyMulticastStrategy strategy = createStickyMulticastStrategyObject(Optional.of(4));
StickyMulticastStrategy strategy = new StickyMulticastStrategy(Optional.of(4));
Map<String, Set<DatastreamTask>> assignment1 =
strategy.assign(datastreams, Arrays.asList(instances), new HashMap<>());
Map<String, Set<DatastreamTask>> assignment2 =
Expand Down

0 comments on commit e6ebe87

Please sign in to comment.