From 0f1900debcc23d8240c2458e1e9582bdeeee50e6 Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Wed, 31 May 2023 16:17:26 -0700 Subject: [PATCH 1/3] Fix Metric Registration Issue for Throughput Violations Reporting --- .../datastream/server/TestCoordinator.java | 10 +++++----- .../linkedin/datastream/server/Coordinator.java | 14 ++++++++------ .../linkedin/datastream/server/EventProducer.java | 4 ++++ gradle/maven.gradle | 2 +- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index 73f9ccee5..4ce748675 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -3618,7 +3618,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastream() throws Ex properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), - coordinator.getNumThroughputViolatingTopicsMetricName(), streamName); + streamName, coordinator.getNumThroughputViolatingTopicsMetricName()); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3732,7 +3732,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreate() t properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), - coordinator.getNumThroughputViolatingTopicsMetricName(), streamName); + streamName, coordinator.getNumThroughputViolatingTopicsMetricName()); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3779,10 +3779,10 @@ public void testThroughputViolatingTopicsHandlingForMultipleDatastreams() throws Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); String numThroughputViolatingTopicsMetricForFirstDatastream = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), - coordinator.getNumThroughputViolatingTopicsMetricName(), streamName1); + streamName1, coordinator.getNumThroughputViolatingTopicsMetricName()); String numThroughputViolatingTopicsMetricForSecondDatastream = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), - coordinator.getNumThroughputViolatingTopicsMetricName(), streamName2); + streamName2, coordinator.getNumThroughputViolatingTopicsMetricName()); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3904,7 +3904,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreateWith properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), - coordinator.getNumThroughputViolatingTopicsMetricName(), streamName); + streamName, coordinator.getNumThroughputViolatingTopicsMetricName()); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 13ab90941..2845e708e 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -526,9 +526,8 @@ private void populateThroughputViolatingTopicsMap(List datastre _log.info("For datastream {}, Successfully reported throughput violating topics : {}", datastream.getName(), violatingTopics); } - _metrics.registerOrSetGauge( - String.format("%s.%s", CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM, - datastream.getName()), () -> violatingTopics.length); + _metrics.registerOrSetKeyedGauge(datastream.getName(), + CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM, () -> violatingTopics.length); })); } finally { _throughputViolatingTopicsMapWriteLock.unlock(); @@ -2478,6 +2477,10 @@ private void registerGaugeMetrics() { .put(ZK_SESSION_EXPIRED, () -> _coordinator.isZkSessionExpired() ? 1 : 0) .build(); gaugeMetrics.forEach(this::registerGauge); + + // For dynamic datastream prefixed gauge metric reporting num throughput violating topics + _metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE, + MetricsAware.KEY_REGEX + NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM))); } private void registerCounterMetrics() { @@ -2497,10 +2500,9 @@ private void registerGauge(String metricName, Supplier valueSupplier) { } // registers a new gauge or updates the supplier for the gauge if it already exists - private void registerOrSetGauge(String metricName, Supplier valueSupplier) { - _dynamicMetricsManager.setGauge(_dynamicMetricsManager.registerGauge(MODULE, metricName, valueSupplier), + private void registerOrSetKeyedGauge(String key, String metricName, Supplier valueSupplier) { + _dynamicMetricsManager.setGauge(_dynamicMetricsManager.registerGauge(MODULE, key, metricName, valueSupplier), valueSupplier); - _metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE, metricName))); } private void registerCounter(Counter metric) { diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java index e528bb7f1..c1aeeaae5 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java @@ -619,6 +619,10 @@ public static List getMetricInfos() { Arrays.asList(BrooklinHistogramInfo.PERCENTILE_50, BrooklinHistogramInfo.PERCENTILE_99, BrooklinHistogramInfo.PERCENTILE_999)))); metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + EVENTS_SEND_LATENCY_MS_STRING)); + metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING, Optional.of( + Arrays.asList(BrooklinHistogramInfo.PERCENTILE_50, BrooklinHistogramInfo.PERCENTILE_99, + BrooklinHistogramInfo.PERCENTILE_999)))); + metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING)); metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + FLUSH_LATENCY_MS_STRING)); return Collections.unmodifiableList(metrics); diff --git a/gradle/maven.gradle b/gradle/maven.gradle index 91465b20e..e6f8f7eed 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "5.3.2-SNAPSHOT" + version = "5.3.5-SNAPSHOT" } subprojects { From bca82756a3174959bcefd0b68039780b5844f77c Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Wed, 31 May 2023 16:47:32 -0700 Subject: [PATCH 2/3] Converted the metric formatting logic to a function in the tests --- .../datastream/server/TestCoordinator.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index 4ce748675..17d522092 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -3617,7 +3617,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastream() throws Ex Properties properties = new Properties(); properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); - String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), + String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(Coordinator.class.getSimpleName(), streamName, coordinator.getNumThroughputViolatingTopicsMetricName()); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, @@ -3731,7 +3731,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreate() t Properties properties = new Properties(); properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); - String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), + String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(Coordinator.class.getSimpleName(), streamName, coordinator.getNumThroughputViolatingTopicsMetricName()); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, @@ -3778,10 +3778,10 @@ public void testThroughputViolatingTopicsHandlingForMultipleDatastreams() throws properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); String numThroughputViolatingTopicsMetricForFirstDatastream = - String.format("%s.%s.%s", Coordinator.class.getSimpleName(), + getNumThroughputViolatingTopicsMetric(Coordinator.class.getSimpleName(), streamName1, coordinator.getNumThroughputViolatingTopicsMetricName()); String numThroughputViolatingTopicsMetricForSecondDatastream = - String.format("%s.%s.%s", Coordinator.class.getSimpleName(), + getNumThroughputViolatingTopicsMetric(Coordinator.class.getSimpleName(), streamName2, coordinator.getNumThroughputViolatingTopicsMetricName()); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, @@ -3903,7 +3903,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreateWith Properties properties = new Properties(); properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); - String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), + String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(Coordinator.class.getSimpleName(), streamName, coordinator.getNumThroughputViolatingTopicsMetricName()); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, @@ -3951,6 +3951,11 @@ private BooleanSupplier validateIfViolatingTopicsAreReflectedInServer(Datastream && requestedThroughputViolatingTopics.containsAll(fetchedViolatingTopicsFromStore); } + // Formats the numThroughputViolatingTopics Metric String + private String getNumThroughputViolatingTopicsMetric(String className, String key, String metricName) { + return String.format("%s.%s.%s", className, key, metricName); + } + // helper method: assert that within a timeout value, the connector are assigned the specific // tasks with the specified names. private void assertConnectorAssignment(TestHookConnector connector, long timeoutMs, String... datastreamNames) From efcd92264647a514d8b648f6e554e934b7e2dff8 Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Wed, 31 May 2023 17:07:56 -0700 Subject: [PATCH 3/3] Make the metric provider helper function more specific --- .../datastream/server/TestCoordinator.java | 20 ++++++++----------- .../datastream/server/Coordinator.java | 2 +- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index 17d522092..f3e37084e 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -3617,8 +3617,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastream() throws Ex Properties properties = new Properties(); properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); - String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(Coordinator.class.getSimpleName(), - streamName, coordinator.getNumThroughputViolatingTopicsMetricName()); + String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(streamName); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3731,8 +3730,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreate() t Properties properties = new Properties(); properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); - String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(Coordinator.class.getSimpleName(), - streamName, coordinator.getNumThroughputViolatingTopicsMetricName()); + String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(streamName); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3778,11 +3776,9 @@ public void testThroughputViolatingTopicsHandlingForMultipleDatastreams() throws properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); String numThroughputViolatingTopicsMetricForFirstDatastream = - getNumThroughputViolatingTopicsMetric(Coordinator.class.getSimpleName(), - streamName1, coordinator.getNumThroughputViolatingTopicsMetricName()); + getNumThroughputViolatingTopicsMetric(streamName1); String numThroughputViolatingTopicsMetricForSecondDatastream = - getNumThroughputViolatingTopicsMetric(Coordinator.class.getSimpleName(), - streamName2, coordinator.getNumThroughputViolatingTopicsMetricName()); + getNumThroughputViolatingTopicsMetric(streamName2); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3903,8 +3899,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreateWith Properties properties = new Properties(); properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); - String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(Coordinator.class.getSimpleName(), - streamName, coordinator.getNumThroughputViolatingTopicsMetricName()); + String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(streamName); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3952,8 +3947,9 @@ private BooleanSupplier validateIfViolatingTopicsAreReflectedInServer(Datastream } // Formats the numThroughputViolatingTopics Metric String - private String getNumThroughputViolatingTopicsMetric(String className, String key, String metricName) { - return String.format("%s.%s.%s", className, key, metricName); + private String getNumThroughputViolatingTopicsMetric(String key) { + return String.format("%s.%s.%s", Coordinator.class.getSimpleName(), key, + Coordinator.getNumThroughputViolatingTopicsMetricName()); } // helper method: assert that within a timeout value, the connector are assigned the specific diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 2845e708e..980e96812 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -2320,7 +2320,7 @@ CoordinatorConfig getConfig() { } @VisibleForTesting - String getNumThroughputViolatingTopicsMetricName() { + static String getNumThroughputViolatingTopicsMetricName() { return CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM; }