From 20937a0a6d57a9e81612228801ea7541dc5c28c0 Mon Sep 17 00:00:00 2001 From: Vaibhav Maheshwari Date: Wed, 24 Nov 2021 12:29:22 -0800 Subject: [PATCH 1/2] Use topic level throughput information when partition level information is unavailable --- .../server/assignment/LoadBasedPartitionAssigner.java | 7 ++++++- .../assignment/LoadBasedTaskCountEstimator.java | 7 +++++-- .../server/TestLoadBasedTaskCountEstimator.java | 11 +++++++++++ .../src/test/resources/partitionThroughput.json | 4 ++++ 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index 37dfb2206..dd24626bc 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -95,7 +95,12 @@ public Map> assignPartitions( LoadBasedPartitionAssignmentStrategyConfig.DEFAULT_PARTITION_MESSAGES_IN_RATE, ""); newPartitions.forEach((task, partitions) -> { int totalThroughput = partitions.stream() - .mapToInt(p -> partitionInfoMap.getOrDefault(p, defaultPartitionInfo).getBytesInKBRate()) + .mapToInt(p -> { + // default partition will be -1. + String wildcardTopicPartition = p.substring(0, p.lastIndexOf('-')) + '-' + "-1"; + PartitionThroughputInfo defaultValue = partitionInfoMap.getOrDefault(wildcardTopicPartition, defaultPartitionInfo); + return partitionInfoMap.getOrDefault(p, defaultValue).getBytesInKBRate(); + }) .sum(); taskThroughputMap.put(task, totalThroughput); }); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java index d31d423bc..8d05e0c47 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java @@ -65,8 +65,11 @@ public int getTaskCount(ClusterThroughputInfo throughputInfo, List assig LoadBasedPartitionAssignmentStrategyConfig.DEFAULT_PARTITION_MESSAGES_IN_RATE, ""); // total throughput in KB/sec int totalThroughput = allPartitions.stream() - .map(p -> throughputMap.getOrDefault(p, defaultThroughputInfo)) - .mapToInt(PartitionThroughputInfo::getBytesInKBRate) + .mapToInt(p -> { + String wildcardTopicPartition = p.substring(0, p.lastIndexOf('-')) + "-" + "-1"; + PartitionThroughputInfo defaultValue = throughputMap.getOrDefault(wildcardTopicPartition, defaultThroughputInfo); + return throughputMap.getOrDefault(p, defaultValue).getBytesInKBRate(); + }) .sum(); LOG.info("Total throughput in all {} partitions: {}KB/sec", allPartitions.size(), totalThroughput); diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java index 3e8b3ab44..5d4333235 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java @@ -97,4 +97,15 @@ public void partitionsHaveDefaultWeightTest() { int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions); Assert.assertTrue(taskCount > 0); } + + @Test + public void throughputTaskEstimatorWithTopicLevelInformation() { + ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("fruit"); + List assignedPartitions = Collections.emptyList(); + List unassignedPartitions = Arrays.asList("apple-0", "apple-1", "apple-2", "banana-0"); + LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, + TASK_CAPACITY_UTILIZATION_PCT); + int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions); + Assert.assertEquals(taskCount, 4); + } } diff --git a/datastream-server/src/test/resources/partitionThroughput.json b/datastream-server/src/test/resources/partitionThroughput.json index 52bb1813f..aacc8fe06 100644 --- a/datastream-server/src/test/resources/partitionThroughput.json +++ b/datastream-server/src/test/resources/partitionThroughput.json @@ -227,6 +227,10 @@ "donut" : { "BostonCreme-1" : "bytesInKB: 5000, msgIn:200", "BostonCreme-2" : "bytesInKB: 5000, msgIn:200" + }, + "fruit" : { + "apple--1" : "bytesInKB: 10000, msgIn:300", + "apple-2" : "bytesInKb: 8000, msgIn:200" } } } From 0f18f93096907af070b472515ccc89d7d71cce54 Mon Sep 17 00:00:00 2001 From: Vaibhav Maheshwari Date: Mon, 29 Nov 2021 09:51:24 -0800 Subject: [PATCH 2/2] Refactor code --- .../server/assignment/LoadBasedPartitionAssigner.java | 9 ++++++--- .../server/assignment/LoadBasedTaskCountEstimator.java | 8 ++++++-- .../src/test/resources/partitionThroughput.json | 2 +- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index dd24626bc..9a0f9be8f 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -96,9 +96,12 @@ public Map> assignPartitions( newPartitions.forEach((task, partitions) -> { int totalThroughput = partitions.stream() .mapToInt(p -> { - // default partition will be -1. - String wildcardTopicPartition = p.substring(0, p.lastIndexOf('-')) + '-' + "-1"; - PartitionThroughputInfo defaultValue = partitionInfoMap.getOrDefault(wildcardTopicPartition, defaultPartitionInfo); + int index = p.lastIndexOf('-'); + String topic = p; + if (index > -1) { + topic = p.substring(0, index); + } + PartitionThroughputInfo defaultValue = partitionInfoMap.getOrDefault(topic, defaultPartitionInfo); return partitionInfoMap.getOrDefault(p, defaultValue).getBytesInKBRate(); }) .sum(); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java index 8d05e0c47..0279ef1c3 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java @@ -66,8 +66,12 @@ public int getTaskCount(ClusterThroughputInfo throughputInfo, List assig // total throughput in KB/sec int totalThroughput = allPartitions.stream() .mapToInt(p -> { - String wildcardTopicPartition = p.substring(0, p.lastIndexOf('-')) + "-" + "-1"; - PartitionThroughputInfo defaultValue = throughputMap.getOrDefault(wildcardTopicPartition, defaultThroughputInfo); + int index = p.lastIndexOf('-'); + String topic = p; + if (index > -1) { + topic = p.substring(0, index); + } + PartitionThroughputInfo defaultValue = throughputMap.getOrDefault(topic, defaultThroughputInfo); return throughputMap.getOrDefault(p, defaultValue).getBytesInKBRate(); }) .sum(); diff --git a/datastream-server/src/test/resources/partitionThroughput.json b/datastream-server/src/test/resources/partitionThroughput.json index aacc8fe06..a90a4c7e9 100644 --- a/datastream-server/src/test/resources/partitionThroughput.json +++ b/datastream-server/src/test/resources/partitionThroughput.json @@ -229,7 +229,7 @@ "BostonCreme-2" : "bytesInKB: 5000, msgIn:200" }, "fruit" : { - "apple--1" : "bytesInKB: 10000, msgIn:300", + "apple" : "bytesInKB: 10000, msgIn:300", "apple-2" : "bytesInKb: 8000, msgIn:200" } }