diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index 37dfb2206..9a0f9be8f 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -95,7 +95,15 @@ public Map> assignPartitions( LoadBasedPartitionAssignmentStrategyConfig.DEFAULT_PARTITION_MESSAGES_IN_RATE, ""); newPartitions.forEach((task, partitions) -> { int totalThroughput = partitions.stream() - .mapToInt(p -> partitionInfoMap.getOrDefault(p, defaultPartitionInfo).getBytesInKBRate()) + .mapToInt(p -> { + int index = p.lastIndexOf('-'); + String topic = p; + if (index > -1) { + topic = p.substring(0, index); + } + PartitionThroughputInfo defaultValue = partitionInfoMap.getOrDefault(topic, defaultPartitionInfo); + return partitionInfoMap.getOrDefault(p, defaultValue).getBytesInKBRate(); + }) .sum(); taskThroughputMap.put(task, totalThroughput); }); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java index d31d423bc..0279ef1c3 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java @@ -65,8 +65,15 @@ public int getTaskCount(ClusterThroughputInfo throughputInfo, List assig LoadBasedPartitionAssignmentStrategyConfig.DEFAULT_PARTITION_MESSAGES_IN_RATE, ""); // total throughput in KB/sec int totalThroughput = allPartitions.stream() - .map(p -> throughputMap.getOrDefault(p, defaultThroughputInfo)) - .mapToInt(PartitionThroughputInfo::getBytesInKBRate) + .mapToInt(p -> { + int index = p.lastIndexOf('-'); + String topic = p; + if (index > -1) { + topic = p.substring(0, index); + } + PartitionThroughputInfo defaultValue = throughputMap.getOrDefault(topic, defaultThroughputInfo); + return throughputMap.getOrDefault(p, defaultValue).getBytesInKBRate(); + }) .sum(); LOG.info("Total throughput in all {} partitions: {}KB/sec", allPartitions.size(), totalThroughput); diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java index 3e8b3ab44..5d4333235 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java @@ -97,4 +97,15 @@ public void partitionsHaveDefaultWeightTest() { int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions); Assert.assertTrue(taskCount > 0); } + + @Test + public void throughputTaskEstimatorWithTopicLevelInformation() { + ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("fruit"); + List assignedPartitions = Collections.emptyList(); + List unassignedPartitions = Arrays.asList("apple-0", "apple-1", "apple-2", "banana-0"); + LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, + TASK_CAPACITY_UTILIZATION_PCT); + int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions); + Assert.assertEquals(taskCount, 4); + } } diff --git a/datastream-server/src/test/resources/partitionThroughput.json b/datastream-server/src/test/resources/partitionThroughput.json index 52bb1813f..a90a4c7e9 100644 --- a/datastream-server/src/test/resources/partitionThroughput.json +++ b/datastream-server/src/test/resources/partitionThroughput.json @@ -227,6 +227,10 @@ "donut" : { "BostonCreme-1" : "bytesInKB: 5000, msgIn:200", "BostonCreme-2" : "bytesInKB: 5000, msgIn:200" + }, + "fruit" : { + "apple" : "bytesInKB: 10000, msgIn:300", + "apple-2" : "bytesInKb: 8000, msgIn:200" } } }