Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added log messages to debug assignPartitions #951

Merged
merged 2 commits into from
Aug 7, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ public LoadBasedPartitionAssigner(int defaultPartitionBytesInKBRate, int default
public Map<String, Set<DatastreamTask>> assignPartitions(
ClusterThroughputInfo throughputInfo, Map<String, Set<DatastreamTask>> currentAssignment,
List<String> unassignedPartitions, DatastreamGroupPartitionsMetadata partitionMetadata, int maxPartitionsPerTask) {
LOG.info("START: assignPartitions");
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
String datastreamGroupName = partitionMetadata.getDatastreamGroup().getName();
Map<String, PartitionThroughputInfo> partitionInfoMap = new HashMap<>(throughputInfo.getPartitionInfoMap());
Set<String> tasksWithChangedPartition = new HashSet<>();

LOG.info("Filtering out tasks for the current datastream {}", datastreamGroupName);
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
// filter out all the tasks for the current datastream group, and retain assignments in a map
Map<String, Set<String>> newPartitionAssignmentMap = new HashMap<>();
currentAssignment.values().forEach(tasks ->
Expand All @@ -103,6 +105,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
validatePartitionCountAndThrow(datastreamGroupName, numTasks, numPartitions, maxPartitionsPerTask);

// sort the current assignment's tasks on total throughput
LOG.info("Extracting throughput info for partitions which have the throughput data (recognized partitions)");
Map<String, Integer> taskThroughputMap = new HashMap<>();
PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(_defaultPartitionBytesInKBRate,
_defaultPartitionMsgsInRate, "");
Expand Down Expand Up @@ -137,13 +140,15 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
}
}

LOG.info("Sorting recognized partitions on byte rate");
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
// sort unassigned partitions with throughput info on throughput
recognizedPartitions.sort((p1, p2) -> {
Integer p1KBRate = partitionInfoMap.get(p1).getBytesInKBRate();
Integer p2KBRate = partitionInfoMap.get(p2).getBytesInKBRate();
return p1KBRate.compareTo(p2KBRate);
});

LOG.info("Building a priority min queue with tasks based on throughput");
// build a priority queue of tasks based on throughput
// only add tasks that can accommodate more partitions in the queue
List<String> tasks = newPartitionAssignmentMap.keySet().stream()
Expand All @@ -152,6 +157,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
PriorityQueue<String> taskQueue = new PriorityQueue<>(Comparator.comparing(taskThroughputMap::get));
taskQueue.addAll(tasks);

LOG.info("Assigning partitions to the tasks from the priority queue");
// assign partitions with throughput info one by one, by putting the heaviest partition in the lightest task
while (recognizedPartitions.size() > 0 && taskQueue.size() > 0) {
String heaviestPartition = recognizedPartitions.remove(recognizedPartitions.size() - 1);
Expand All @@ -168,6 +174,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
}

// assign unrecognized partitions with round-robin
LOG.info("Assigning unrecognized partitions with round-robin");
Map<String, Integer> unrecognizedPartitionCountPerTask = new HashMap<>();
Collections.shuffle(unrecognizedPartitions);
int index = 0;
Expand All @@ -181,6 +188,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
}

// build the new assignment using the new partitions for the affected datastream's tasks
LOG.info("Finishing building new assignment");
Map<String, Set<DatastreamTask>> newAssignments = currentAssignment.entrySet().stream()
.collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().stream()
.map(task -> {
Expand Down Expand Up @@ -208,6 +216,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
LOG.info("Assignment stats for {}. Min partitions across tasks: {}, max partitions across tasks: {}", taskPrefix,
stats.getMin(), stats.getMax());

LOG.info("END: assignPartitions");
return newAssignments;
}

Expand Down