diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index 73f9ccee5..f3e37084e 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -3617,8 +3617,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastream() throws Ex Properties properties = new Properties(); properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); - String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), - coordinator.getNumThroughputViolatingTopicsMetricName(), streamName); + String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(streamName); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3731,8 +3730,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreate() t Properties properties = new Properties(); properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); - String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), - coordinator.getNumThroughputViolatingTopicsMetricName(), streamName); + String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(streamName); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3778,11 +3776,9 @@ public void testThroughputViolatingTopicsHandlingForMultipleDatastreams() throws properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); String numThroughputViolatingTopicsMetricForFirstDatastream = - String.format("%s.%s.%s", Coordinator.class.getSimpleName(), - coordinator.getNumThroughputViolatingTopicsMetricName(), streamName1); + getNumThroughputViolatingTopicsMetric(streamName1); String numThroughputViolatingTopicsMetricForSecondDatastream = - String.format("%s.%s.%s", Coordinator.class.getSimpleName(), - coordinator.getNumThroughputViolatingTopicsMetricName(), streamName2); + getNumThroughputViolatingTopicsMetric(streamName2); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3903,8 +3899,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreateWith Properties properties = new Properties(); properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString()); Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties); - String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(), - coordinator.getNumThroughputViolatingTopicsMetricName(), streamName); + String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(streamName); TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false, new SourceBasedDeduper(), null); @@ -3951,6 +3946,12 @@ private BooleanSupplier validateIfViolatingTopicsAreReflectedInServer(Datastream && requestedThroughputViolatingTopics.containsAll(fetchedViolatingTopicsFromStore); } + // Formats the numThroughputViolatingTopics Metric String + private String getNumThroughputViolatingTopicsMetric(String key) { + return String.format("%s.%s.%s", Coordinator.class.getSimpleName(), key, + Coordinator.getNumThroughputViolatingTopicsMetricName()); + } + // helper method: assert that within a timeout value, the connector are assigned the specific // tasks with the specified names. private void assertConnectorAssignment(TestHookConnector connector, long timeoutMs, String... datastreamNames) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 13ab90941..980e96812 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -526,9 +526,8 @@ private void populateThroughputViolatingTopicsMap(List datastre _log.info("For datastream {}, Successfully reported throughput violating topics : {}", datastream.getName(), violatingTopics); } - _metrics.registerOrSetGauge( - String.format("%s.%s", CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM, - datastream.getName()), () -> violatingTopics.length); + _metrics.registerOrSetKeyedGauge(datastream.getName(), + CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM, () -> violatingTopics.length); })); } finally { _throughputViolatingTopicsMapWriteLock.unlock(); @@ -2321,7 +2320,7 @@ CoordinatorConfig getConfig() { } @VisibleForTesting - String getNumThroughputViolatingTopicsMetricName() { + static String getNumThroughputViolatingTopicsMetricName() { return CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM; } @@ -2478,6 +2477,10 @@ private void registerGaugeMetrics() { .put(ZK_SESSION_EXPIRED, () -> _coordinator.isZkSessionExpired() ? 1 : 0) .build(); gaugeMetrics.forEach(this::registerGauge); + + // For dynamic datastream prefixed gauge metric reporting num throughput violating topics + _metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE, + MetricsAware.KEY_REGEX + NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM))); } private void registerCounterMetrics() { @@ -2497,10 +2500,9 @@ private void registerGauge(String metricName, Supplier valueSupplier) { } // registers a new gauge or updates the supplier for the gauge if it already exists - private void registerOrSetGauge(String metricName, Supplier valueSupplier) { - _dynamicMetricsManager.setGauge(_dynamicMetricsManager.registerGauge(MODULE, metricName, valueSupplier), + private void registerOrSetKeyedGauge(String key, String metricName, Supplier valueSupplier) { + _dynamicMetricsManager.setGauge(_dynamicMetricsManager.registerGauge(MODULE, key, metricName, valueSupplier), valueSupplier); - _metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE, metricName))); } private void registerCounter(Counter metric) { diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java index e528bb7f1..c1aeeaae5 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java @@ -619,6 +619,10 @@ public static List getMetricInfos() { Arrays.asList(BrooklinHistogramInfo.PERCENTILE_50, BrooklinHistogramInfo.PERCENTILE_99, BrooklinHistogramInfo.PERCENTILE_999)))); metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + EVENTS_SEND_LATENCY_MS_STRING)); + metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING, Optional.of( + Arrays.asList(BrooklinHistogramInfo.PERCENTILE_50, BrooklinHistogramInfo.PERCENTILE_99, + BrooklinHistogramInfo.PERCENTILE_999)))); + metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING)); metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + FLUSH_LATENCY_MS_STRING)); return Collections.unmodifiableList(metrics); diff --git a/gradle/maven.gradle b/gradle/maven.gradle index 91465b20e..e6f8f7eed 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "5.3.2-SNAPSHOT" + version = "5.3.5-SNAPSHOT" } subprojects {