From 59bc9a24d514095e6de706a6b0790996dcbdbfd3 Mon Sep 17 00:00:00 2001 From: Vaibhav Maheshwari Date: Tue, 19 Oct 2021 00:46:09 -0700 Subject: [PATCH 1/3] Add Stats to DatastreamTaskImpl --- .../datastream/server/DatastreamTaskImpl.java | 19 +++- .../LoadBasedPartitionAssigner.java | 89 ++++++++++++++++++- .../datastream/server/zk/ZkAdapter.java | 6 ++ .../TestLoadBasedPartitionAssigner.java | 4 + 4 files changed, 114 insertions(+), 4 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/DatastreamTaskImpl.java b/datastream-server/src/main/java/com/linkedin/datastream/server/DatastreamTaskImpl.java index 8708ea66d..9ff6a9135 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/DatastreamTaskImpl.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/DatastreamTaskImpl.java @@ -81,7 +81,24 @@ public class DatastreamTaskImpl implements DatastreamTask { // TODO: Investigate the requirement to populate both _partition and _partitionV2 and cleanup if required. private List _partitionsV2; - private List _dependencies; + private final List _dependencies; + + @JsonIgnore + public String getStats() { + return _stats; + } + + @JsonIgnore + public String getStatsFromZK() { + return getState("stats"); + } + + @JsonIgnore + public void setStats(String stats) { + _stats = stats; + } + + private String _stats = ""; // Status to indicate if instance has hooked up and process this object private ZkAdapter _zkAdapter; diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index e7ae68562..54346d813 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -5,6 +5,7 @@ */ package com.linkedin.datastream.server.assignment; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -19,12 +20,14 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.linkedin.datastream.common.DatastreamRuntimeException; +import com.linkedin.datastream.common.JsonUtils; import com.linkedin.datastream.metrics.BrooklinGaugeInfo; import com.linkedin.datastream.metrics.BrooklinMetricInfo; import com.linkedin.datastream.metrics.DynamicMetricsManager; @@ -158,13 +161,33 @@ public Map> assignPartitions( .map(task -> { int partitionCount = newPartitions.containsKey(task.getId()) ? newPartitions.get(task.getId()).size() : task.getPartitionsV2().size(); + minPartitionsAcrossTasks.set(Math.min(minPartitionsAcrossTasks.get(), partitionCount)); maxPartitionsAcrossTasks.set(Math.max(maxPartitionsAcrossTasks.get(), partitionCount)); if (tasksWithChangedPartition.contains(task.getId())) { - return new DatastreamTaskImpl((DatastreamTaskImpl) task, newPartitions.get(task.getId())); + DatastreamTaskImpl newTask = new DatastreamTaskImpl((DatastreamTaskImpl) task, newPartitions.get(task.getId())); + /*String stats = String.format("{\'throughput\':%d, \'numPartitions\':%d, \'partitionsWithUnknownThroughputRate\': %d}", + throughputRate, partitionCount, partitionCount); + task.setStats(stats);//saveState("throughputMetadata", throughputRate.toString()); */ + PartitionAssignmentStatPerTask stat = PartitionAssignmentStatPerTask.fromJson(((DatastreamTaskImpl) task).getStats()); + if (partitionInfoMap.isEmpty()) { + stat.isThroughputRateLatest = false; + } else { + stat.throughputRate = taskThroughputMap.get(task.getId()); + stat.isThroughputRateLatest = true; + } + stat.totalPartitions = partitionCount; + //TODO: Update this metric. + stat.partitionsWithUnknownThroughput = 0; + try { + newTask.setStats(stat.toJson()); + } catch (IOException e) { + LOG.error("Exception while saving the stats to Json", e); + } + return newTask; } - return task; - }).collect(Collectors.toSet()); + return task; + }).collect(Collectors.toSet()); newAssignments.put(instance, newTasks); }); @@ -246,6 +269,66 @@ void unregisterMetricsForDatastream(String datastream) { DYNAMIC_METRICS_MANAGER.unregisterMetric(CLASS_NAME, datastream, MAX_PARTITIONS_ACROSS_TASKS); } + private static class PartitionAssignmentStatPerTask { + private int throughputRate; + private int totalPartitions; + private int partitionsWithUnknownThroughput; + private boolean isThroughputRateLatest; + + //getters and setters required for fromJson and toJson + public int getThroughputRate() { + return throughputRate; + } + + public void setThroughputRate(int throughputRate) { + this.throughputRate = throughputRate; + } + + public int getTotalPartitions() { + return totalPartitions; + } + + public void setTotalPartitions(int totalPartitions) { + this.totalPartitions = totalPartitions; + } + + public int getPartitionsWithUnknownThroughput() { + return partitionsWithUnknownThroughput; + } + + public void setPartitionsWithUnknownThroughput(int partitionsWithUnknownThroughput) { + this.partitionsWithUnknownThroughput = partitionsWithUnknownThroughput; + } + + public boolean getIsThroughputRateLatest() { + return isThroughputRateLatest; + } + + public void setIsThroughputRateLatest(boolean isThroughputRateLatest) { + this.isThroughputRateLatest = isThroughputRateLatest; + } + + /** + * Construct PartitionAssignmentStatPerTask from json string + * @param json JSON string of the PartitionAssignmentStatPerTask + */ + public static PartitionAssignmentStatPerTask fromJson(String json) { + PartitionAssignmentStatPerTask stat = new PartitionAssignmentStatPerTask(); + if (StringUtils.isNotEmpty(json)) { + stat = JsonUtils.fromJson(json, PartitionAssignmentStatPerTask.class); + } + LOG.info("Loaded existing PartitionAssignmentStatPerTask: {}", stat); + return stat; + } + + /** + * Get PartitionAssignmentStatPerTask serialized as JSON + */ + public String toJson() throws IOException { + return JsonUtils.toJson(this); + } + + } /** * Encapsulates assignment metrics for a single datastream group */ diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java index a4c165952..cd3916ea2 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java @@ -33,6 +33,7 @@ import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.exception.ZkException; import org.I0Itec.zkclient.exception.ZkNoNodeException; +import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; @@ -681,6 +682,11 @@ private void addTaskNodes(String instance, DatastreamTaskImpl task) { KeyBuilder.datastreamTaskState(_cluster, task.getConnectorType(), task.getDatastreamTaskName()); _zkclient.ensurePath(taskStatePath); + // save the task stats. + if (!StringUtils.isEmpty(task.getStats())) { + task.saveState("stats", task.getStats()); + } + String instancePath = KeyBuilder.instanceAssignment(_cluster, instance, name); String json = ""; try { diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java index 9811c487b..428ae2790 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java @@ -87,6 +87,10 @@ public void assignFromScratchTest() { Assert.assertTrue(assignedPartitions.contains("P1")); Assert.assertTrue(assignedPartitions.contains("P2")); Assert.assertTrue(assignedPartitions.contains("P3")); + + System.out.println(((DatastreamTaskImpl) task1).getStats()); + System.out.println(((DatastreamTaskImpl) task2).getStats()); + System.out.println(((DatastreamTaskImpl) task3).getStats()); } @Test From 77d46357cef21c3c37e96ea6d25237bbc7db209b Mon Sep 17 00:00:00 2001 From: Vaibhav Maheshwari Date: Tue, 19 Oct 2021 01:31:57 -0700 Subject: [PATCH 2/3] Add tests --- .../LoadBasedPartitionAssigner.java | 43 +++++++++++-------- .../TestLoadBasedPartitionAssigner.java | 24 +++++++++-- 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index 54346d813..001ec6d1b 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -141,6 +141,7 @@ public Map> assignPartitions( } // assign unrecognized partitions with round-robin + Map unrecognizedPartitionCountPerTask = new HashMap<>(); Collections.shuffle(unrecognizedPartitions); int index = 0; for (String partition : unrecognizedPartitions) { @@ -149,6 +150,7 @@ public Map> assignPartitions( newPartitions.get(currentTask).add(partition); tasksWithChangedPartition.add(currentTask); index = (index + 1) % tasks.size(); + unrecognizedPartitionCountPerTask.put(currentTask, unrecognizedPartitionCountPerTask.getOrDefault(currentTask, 0) + 1); } AtomicInteger minPartitionsAcrossTasks = new AtomicInteger(Integer.MAX_VALUE); @@ -166,24 +168,7 @@ public Map> assignPartitions( maxPartitionsAcrossTasks.set(Math.max(maxPartitionsAcrossTasks.get(), partitionCount)); if (tasksWithChangedPartition.contains(task.getId())) { DatastreamTaskImpl newTask = new DatastreamTaskImpl((DatastreamTaskImpl) task, newPartitions.get(task.getId())); - /*String stats = String.format("{\'throughput\':%d, \'numPartitions\':%d, \'partitionsWithUnknownThroughputRate\': %d}", - throughputRate, partitionCount, partitionCount); - task.setStats(stats);//saveState("throughputMetadata", throughputRate.toString()); */ - PartitionAssignmentStatPerTask stat = PartitionAssignmentStatPerTask.fromJson(((DatastreamTaskImpl) task).getStats()); - if (partitionInfoMap.isEmpty()) { - stat.isThroughputRateLatest = false; - } else { - stat.throughputRate = taskThroughputMap.get(task.getId()); - stat.isThroughputRateLatest = true; - } - stat.totalPartitions = partitionCount; - //TODO: Update this metric. - stat.partitionsWithUnknownThroughput = 0; - try { - newTask.setStats(stat.toJson()); - } catch (IOException e) { - LOG.error("Exception while saving the stats to Json", e); - } + saveStats(partitionInfoMap, taskThroughputMap, unrecognizedPartitionCountPerTask, task, partitionCount, newTask); return newTask; } return task; @@ -202,6 +187,26 @@ public Map> assignPartitions( return newAssignments; } + private void saveStats(Map partitionInfoMap, Map taskThroughputMap, + Map unrecognizedPartitionCountPerTask, DatastreamTask task, int partitionCount, + DatastreamTaskImpl newTask) { + PartitionAssignmentStatPerTask stat = PartitionAssignmentStatPerTask.fromJson(((DatastreamTaskImpl) task).getStats()); + if (partitionInfoMap.isEmpty()) { + stat.isThroughputRateLatest = false; + } else { + stat.throughputRate = taskThroughputMap.get(task.getId()); + stat.isThroughputRateLatest = true; + } + stat.totalPartitions = partitionCount; + // ignores the partitions removed. This value will be approximate. + stat.partitionsWithUnknownThroughput += unrecognizedPartitionCountPerTask.getOrDefault(task.getId(), 0); + try { + newTask.setStats(stat.toJson()); + } catch (IOException e) { + LOG.error("Exception while saving the stats to Json for task {}", task.getId(), e); + } + } + private void validatePartitionCountAndThrow(String datastream, int numTasks, int numPartitions, int maxPartitionsPerTask) { // conversion to long to avoid integer overflow @@ -269,7 +274,7 @@ void unregisterMetricsForDatastream(String datastream) { DYNAMIC_METRICS_MANAGER.unregisterMetric(CLASS_NAME, datastream, MAX_PARTITIONS_ACROSS_TASKS); } - private static class PartitionAssignmentStatPerTask { + static class PartitionAssignmentStatPerTask { private int throughputRate; private int totalPartitions; private int partitionsWithUnknownThroughput; diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java index 428ae2790..3cffb75e9 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java @@ -88,9 +88,12 @@ public void assignFromScratchTest() { Assert.assertTrue(assignedPartitions.contains("P2")); Assert.assertTrue(assignedPartitions.contains("P3")); - System.out.println(((DatastreamTaskImpl) task1).getStats()); - System.out.println(((DatastreamTaskImpl) task2).getStats()); - System.out.println(((DatastreamTaskImpl) task3).getStats()); + String stats = (((DatastreamTaskImpl) task1).getStats()); + LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask statObj = LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask.fromJson(stats); + Assert.assertTrue(statObj.getIsThroughputRateLatest()); + Assert.assertEquals(statObj.getTotalPartitions(), 1); + Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 0); + Assert.assertEquals(statObj.getThroughputRate(), 5); } @Test @@ -138,6 +141,13 @@ public void newAssignmentRetainsTasksFromOtherDatastreamsTest() { DatastreamTask newTask = (DatastreamTask) allTasks.toArray()[0]; Assert.assertEquals(newTask.getPartitionsV2().size(), 1); Assert.assertTrue(newTask.getPartitionsV2().contains("P3")); + + String stats = (((DatastreamTaskImpl) newTask).getStats()); + LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask statObj = LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask.fromJson(stats); + Assert.assertTrue(statObj.getIsThroughputRateLatest()); + Assert.assertEquals(statObj.getTotalPartitions(), 1); + Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 0); + Assert.assertEquals(statObj.getThroughputRate(), 5); } @Test @@ -166,6 +176,14 @@ public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() { // verify that tasks got 2 partition each Assert.assertEquals(task1.getPartitionsV2().size(), 2); Assert.assertEquals(task2.getPartitionsV2().size(), 2); + + + String stats = (((DatastreamTaskImpl) task1).getStats()); + LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask statObj = LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask.fromJson(stats); + Assert.assertFalse(statObj.getIsThroughputRateLatest()); + Assert.assertEquals(statObj.getTotalPartitions(), 2); + Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 2); + Assert.assertEquals(statObj.getThroughputRate(), 0); } @Test From 3e9f174fc8465c8280429a67c2aed12fd95664f8 Mon Sep 17 00:00:00 2001 From: Vaibhav Maheshwari Date: Thu, 21 Oct 2021 16:00:51 -0700 Subject: [PATCH 3/3] Address comments --- .../datastream/server/assignment/LoadBasedPartitionAssigner.java | 1 - .../server/assignment/TestLoadBasedPartitionAssigner.java | 1 - 2 files changed, 2 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index 001ec6d1b..37dfb2206 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -332,7 +332,6 @@ public static PartitionAssignmentStatPerTask fromJson(String json) { public String toJson() throws IOException { return JsonUtils.toJson(this); } - } /** * Encapsulates assignment metrics for a single datastream group diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java index 3cffb75e9..0b662016b 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java @@ -177,7 +177,6 @@ public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() { Assert.assertEquals(task1.getPartitionsV2().size(), 2); Assert.assertEquals(task2.getPartitionsV2().size(), 2); - String stats = (((DatastreamTaskImpl) task1).getStats()); LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask statObj = LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask.fromJson(stats); Assert.assertFalse(statObj.getIsThroughputRateLatest());