Skip to content

Commit

Permalink
Update coordinator to dedupe task names
Browse files Browse the repository at this point in the history
  • Loading branch information
jogrogan committed May 17, 2022
1 parent 6636d0f commit b926cff
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -753,6 +754,55 @@ public void testValidateNewAssignment() throws Exception {
instance.getDatastreamCache().getZkclient().close();
}

@Test
public void testDedupeNewAssignment() throws Exception {
String testCluster = "testCoordinationMaxTasksPerInstance";
Properties properties = new Properties();
int maxTasksPerInstance = 3;
properties.put(CoordinatorConfig.CONFIG_MAX_DATASTREAM_TASKS_PER_INSTANCE, String.valueOf(maxTasksPerInstance));
Coordinator instance = createCoordinator(_zkConnectionString, testCluster, properties);

//
// simulate assigning:
// to instance1: [task1, task2, task2]
// to instance2: [task1, task3, task4]
//
String connectorType = "connectorType";
DatastreamTaskImpl task1 = new DatastreamTaskImpl();
task1.setTaskPrefix("task1");
task1.setConnectorType(connectorType);

DatastreamTaskImpl task2 = new DatastreamTaskImpl();
task2.setTaskPrefix("task2");
task2.setConnectorType(connectorType);

// Different task with the same name
DatastreamTaskImpl task3 = new DatastreamTaskImpl();
task3.setTaskPrefix("task2");
task3.setConnectorType(connectorType);

DatastreamTaskImpl task4 = new DatastreamTaskImpl();
task4.setTaskPrefix("task4");
task4.setConnectorType(connectorType);

Map<String, List<DatastreamTask>> newAssignmentsByInstance = new TreeMap<>();
newAssignmentsByInstance.put("instance1", Arrays.asList(task1, task2, task2));
newAssignmentsByInstance.put("instance2", Arrays.asList(task1, task3, task4));

Map<String, List<DatastreamTask>> dedupedAssignmentMapping = instance.dedupeAssignments(newAssignmentsByInstance);

Assert.assertEquals(dedupedAssignmentMapping.get("instance1").size(), 2);
Assert.assertEquals(dedupedAssignmentMapping.get("instance1").get(0).getDatastreamTaskName(), "task1");
Assert.assertEquals(dedupedAssignmentMapping.get("instance1").get(1).getDatastreamTaskName(), "task2");
Assert.assertEquals(dedupedAssignmentMapping.get("instance2").size(), 1);
Assert.assertEquals(dedupedAssignmentMapping.get("instance2").get(0).getDatastreamTaskName(), "task4");

// Validation should pass since every instance has the allowed number of tasks per instance
instance.validateNewAssignment(dedupedAssignmentMapping);
instance.stop();
instance.getDatastreamCache().getZkclient().close();
}

/**
* testCoordinationWithStickyMulticastStrategyAndMaxTaskLimit is a test to verify that assignment cannot complete
* when the tasks per instance exceeds the configured threshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,22 @@ void validateNewAssignment(Map<String, List<DatastreamTask>> newAssignmentsByIns
}
}

/**
* Dedupe assignmentsByInstance mapping which can contain multiple DatastreamTasks with the same task id
* @param assignmentsByInstance mapping from instance to tasks
* @return Same mapping of string to DatastreamTasks with the duplicate task ids removed
*/
@VisibleForTesting
Map<String, List<DatastreamTask>> dedupeAssignments(Map<String, List<DatastreamTask>> assignmentsByInstance) {
Set<String> uniqueDatastreams = new HashSet<>();
return assignmentsByInstance.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()
.stream()
.filter(x -> uniqueDatastreams.add(x.getDatastreamTaskName()))
.collect(Collectors.toList())));
}

private Map<String, List<DatastreamTask>> performAssignment(List<String> liveInstances,
Map<String, Set<DatastreamTask>> previousAssignmentByInstance, List<DatastreamGroup> datastreamGroups) {
Map<String, List<DatastreamTask>> newAssignmentsByInstance = new HashMap<>();
Expand Down Expand Up @@ -1527,9 +1543,11 @@ private Map<String, List<DatastreamTask>> performAssignment(List<String> liveIns
}
}

validateNewAssignment(newAssignmentsByInstance);
Map<String, List<DatastreamTask>> dedupedAssignments = dedupeAssignments(newAssignmentsByInstance);

validateNewAssignment(dedupedAssignments);

return newAssignmentsByInstance;
return dedupedAssignments;
}

void performCleanupOrphanNodes() {
Expand Down

0 comments on commit b926cff

Please sign in to comment.