From 1d4002576f59ae5a702186614cacee2f8c308cd6 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Thu, 3 Aug 2023 14:33:45 -0700 Subject: [PATCH 1/2] Added log messages to debug assignPartitions --- .../server/assignment/LoadBasedPartitionAssigner.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index 3f27a5e9e..ba8460317 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -80,10 +80,12 @@ public LoadBasedPartitionAssigner(int defaultPartitionBytesInKBRate, int default public Map> assignPartitions( ClusterThroughputInfo throughputInfo, Map> currentAssignment, List unassignedPartitions, DatastreamGroupPartitionsMetadata partitionMetadata, int maxPartitionsPerTask) { + LOG.info("START: assignPartitions"); String datastreamGroupName = partitionMetadata.getDatastreamGroup().getName(); Map partitionInfoMap = new HashMap<>(throughputInfo.getPartitionInfoMap()); Set tasksWithChangedPartition = new HashSet<>(); + LOG.info("Filtering out tasks for the current datastream {}", datastreamGroupName); // filter out all the tasks for the current datastream group, and retain assignments in a map Map> newPartitionAssignmentMap = new HashMap<>(); currentAssignment.values().forEach(tasks -> @@ -103,6 +105,7 @@ public Map> assignPartitions( validatePartitionCountAndThrow(datastreamGroupName, numTasks, numPartitions, maxPartitionsPerTask); // sort the current assignment's tasks on total throughput + LOG.info("Extracting throughput info for partitions which have the throughput data (recognized partitions)"); Map taskThroughputMap = new HashMap<>(); PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(_defaultPartitionBytesInKBRate, _defaultPartitionMsgsInRate, ""); @@ -137,6 +140,7 @@ public Map> assignPartitions( } } + LOG.info("Sorting recognized partitions on byte rate"); // sort unassigned partitions with throughput info on throughput recognizedPartitions.sort((p1, p2) -> { Integer p1KBRate = partitionInfoMap.get(p1).getBytesInKBRate(); @@ -144,6 +148,7 @@ public Map> assignPartitions( return p1KBRate.compareTo(p2KBRate); }); + LOG.info("Building a priority min queue with tasks based on throughput"); // build a priority queue of tasks based on throughput // only add tasks that can accommodate more partitions in the queue List tasks = newPartitionAssignmentMap.keySet().stream() @@ -152,6 +157,7 @@ public Map> assignPartitions( PriorityQueue taskQueue = new PriorityQueue<>(Comparator.comparing(taskThroughputMap::get)); taskQueue.addAll(tasks); + LOG.info("Assigning partitions to the tasks from the priority queue"); // assign partitions with throughput info one by one, by putting the heaviest partition in the lightest task while (recognizedPartitions.size() > 0 && taskQueue.size() > 0) { String heaviestPartition = recognizedPartitions.remove(recognizedPartitions.size() - 1); @@ -168,6 +174,7 @@ public Map> assignPartitions( } // assign unrecognized partitions with round-robin + LOG.info("Assigning unrecognized partitions with round-robin"); Map unrecognizedPartitionCountPerTask = new HashMap<>(); Collections.shuffle(unrecognizedPartitions); int index = 0; @@ -181,6 +188,7 @@ public Map> assignPartitions( } // build the new assignment using the new partitions for the affected datastream's tasks + LOG.info("Finishing building new assignment"); Map> newAssignments = currentAssignment.entrySet().stream() .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().stream() .map(task -> { @@ -208,6 +216,7 @@ public Map> assignPartitions( LOG.info("Assignment stats for {}. Min partitions across tasks: {}, max partitions across tasks: {}", taskPrefix, stats.getMin(), stats.getMax()); + LOG.info("END: assignPartitions"); return newAssignments; } From a360db4fb791cf9c1e4aad3f556bba4dbc57615c Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Fri, 4 Aug 2023 10:49:42 -0700 Subject: [PATCH 2/2] Minor improvements --- .../server/assignment/LoadBasedPartitionAssigner.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index ba8460317..8feea662f 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -80,12 +80,11 @@ public LoadBasedPartitionAssigner(int defaultPartitionBytesInKBRate, int default public Map> assignPartitions( ClusterThroughputInfo throughputInfo, Map> currentAssignment, List unassignedPartitions, DatastreamGroupPartitionsMetadata partitionMetadata, int maxPartitionsPerTask) { - LOG.info("START: assignPartitions"); String datastreamGroupName = partitionMetadata.getDatastreamGroup().getName(); + LOG.info("START: assignPartitions for datasteam={}", datastreamGroupName); Map partitionInfoMap = new HashMap<>(throughputInfo.getPartitionInfoMap()); Set tasksWithChangedPartition = new HashSet<>(); - LOG.info("Filtering out tasks for the current datastream {}", datastreamGroupName); // filter out all the tasks for the current datastream group, and retain assignments in a map Map> newPartitionAssignmentMap = new HashMap<>(); currentAssignment.values().forEach(tasks -> @@ -216,7 +215,7 @@ public Map> assignPartitions( LOG.info("Assignment stats for {}. Min partitions across tasks: {}, max partitions across tasks: {}", taskPrefix, stats.getMin(), stats.getMax()); - LOG.info("END: assignPartitions"); + LOG.info("END: assignPartitions for datastream={}", datastreamGroupName); return newAssignments; }