diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index 3f27a5e9e..8feea662f 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -81,6 +81,7 @@ public Map> assignPartitions( ClusterThroughputInfo throughputInfo, Map> currentAssignment, List unassignedPartitions, DatastreamGroupPartitionsMetadata partitionMetadata, int maxPartitionsPerTask) { String datastreamGroupName = partitionMetadata.getDatastreamGroup().getName(); + LOG.info("START: assignPartitions for datasteam={}", datastreamGroupName); Map partitionInfoMap = new HashMap<>(throughputInfo.getPartitionInfoMap()); Set tasksWithChangedPartition = new HashSet<>(); @@ -103,6 +104,7 @@ public Map> assignPartitions( validatePartitionCountAndThrow(datastreamGroupName, numTasks, numPartitions, maxPartitionsPerTask); // sort the current assignment's tasks on total throughput + LOG.info("Extracting throughput info for partitions which have the throughput data (recognized partitions)"); Map taskThroughputMap = new HashMap<>(); PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(_defaultPartitionBytesInKBRate, _defaultPartitionMsgsInRate, ""); @@ -137,6 +139,7 @@ public Map> assignPartitions( } } + LOG.info("Sorting recognized partitions on byte rate"); // sort unassigned partitions with throughput info on throughput recognizedPartitions.sort((p1, p2) -> { Integer p1KBRate = partitionInfoMap.get(p1).getBytesInKBRate(); @@ -144,6 +147,7 @@ public Map> assignPartitions( return p1KBRate.compareTo(p2KBRate); }); + LOG.info("Building a priority min queue with tasks based on throughput"); // build a priority queue of tasks based on throughput // only add tasks that can accommodate more partitions in the queue List tasks = newPartitionAssignmentMap.keySet().stream() @@ -152,6 +156,7 @@ public Map> assignPartitions( PriorityQueue taskQueue = new PriorityQueue<>(Comparator.comparing(taskThroughputMap::get)); taskQueue.addAll(tasks); + LOG.info("Assigning partitions to the tasks from the priority queue"); // assign partitions with throughput info one by one, by putting the heaviest partition in the lightest task while (recognizedPartitions.size() > 0 && taskQueue.size() > 0) { String heaviestPartition = recognizedPartitions.remove(recognizedPartitions.size() - 1); @@ -168,6 +173,7 @@ public Map> assignPartitions( } // assign unrecognized partitions with round-robin + LOG.info("Assigning unrecognized partitions with round-robin"); Map unrecognizedPartitionCountPerTask = new HashMap<>(); Collections.shuffle(unrecognizedPartitions); int index = 0; @@ -181,6 +187,7 @@ public Map> assignPartitions( } // build the new assignment using the new partitions for the affected datastream's tasks + LOG.info("Finishing building new assignment"); Map> newAssignments = currentAssignment.entrySet().stream() .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().stream() .map(task -> { @@ -208,6 +215,7 @@ public Map> assignPartitions( LOG.info("Assignment stats for {}. Min partitions across tasks: {}, max partitions across tasks: {}", taskPrefix, stats.getMin(), stats.getMax()); + LOG.info("END: assignPartitions for datastream={}", datastreamGroupName); return newAssignments; }