diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java index f9173cdbf..39534f6f3 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java @@ -182,7 +182,8 @@ private ClusterThroughputInfo fetchPartitionThroughputInfo(DatastreamGroup datas */ @Override public List getMetricInfos() { - List metricInfos = new ArrayList<>(); + List baseStrategyMetricInfos = super.getMetricInfos(); + List metricInfos = new ArrayList<>(baseStrategyMetricInfos); metricInfos.add(new BrooklinMeterInfo(CLASS_NAME + "." + THROUGHPUT_INFO_FETCH_RATE)); metricInfos.addAll(_assigner.getMetricInfos()); return Collections.unmodifiableList(metricInfos); diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssignmentStrategy.java b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssignmentStrategy.java index 003ae297f..8685c4357 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssignmentStrategy.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssignmentStrategy.java @@ -14,6 +14,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; import org.mockito.Mockito; @@ -173,6 +174,12 @@ public void fallbackToBaseClassWhenThroughputFetchFailsTest() { Mockito.verify(mockProvider, atLeastOnce()).getThroughputInfo(any(DatastreamGroup.class)); Mockito.verify(strategy, never()).doAssignment(anyObject(), anyObject(), anyObject(), anyObject()); Assert.assertNotNull(newAssignment); + + // Verify that metrics in both StickyPartitionAssignmentStrategy and LoadBasedPartitionAssignmentStrategy are + // properly registered + Predicate metricFilter = s -> s.startsWith(LoadBasedPartitionAssignmentStrategy.class.getSimpleName()) || + s.startsWith(StickyPartitionAssignmentStrategy.class.getSimpleName()); + MetricsTestUtils.verifyMetrics(strategy, DynamicMetricsManager.getInstance(), metricFilter); } @Test