Skip to content

Commit

Permalink
Fix Metric Registration Issue for Throughput Violations Reporting (#942)
Browse files Browse the repository at this point in the history
* Fix Metric Registration Issue for Throughput Violations Reporting

* Converted the metric formatting logic to a function in the tests

* Make the metric provider helper function more specific

---------

Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn2.linkedin.biz>
  • Loading branch information
shrinandthakkar and Shrinand Thakkar authored Jun 1, 2023
1 parent 2238d82 commit f5d99f6
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3617,8 +3617,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastream() throws Ex
Properties properties = new Properties();
properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString());
Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties);
String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(),
coordinator.getNumThroughputViolatingTopicsMetricName(), streamName);
String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(streamName);
TestHookConnector connector1 = new TestHookConnector("connector1", connectorType);
coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
Expand Down Expand Up @@ -3731,8 +3730,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreate() t
Properties properties = new Properties();
properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString());
Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties);
String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(),
coordinator.getNumThroughputViolatingTopicsMetricName(), streamName);
String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(streamName);
TestHookConnector connector1 = new TestHookConnector("connector1", connectorType);
coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
Expand Down Expand Up @@ -3778,11 +3776,9 @@ public void testThroughputViolatingTopicsHandlingForMultipleDatastreams() throws
properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString());
Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties);
String numThroughputViolatingTopicsMetricForFirstDatastream =
String.format("%s.%s.%s", Coordinator.class.getSimpleName(),
coordinator.getNumThroughputViolatingTopicsMetricName(), streamName1);
getNumThroughputViolatingTopicsMetric(streamName1);
String numThroughputViolatingTopicsMetricForSecondDatastream =
String.format("%s.%s.%s", Coordinator.class.getSimpleName(),
coordinator.getNumThroughputViolatingTopicsMetricName(), streamName2);
getNumThroughputViolatingTopicsMetric(streamName2);
TestHookConnector connector1 = new TestHookConnector("connector1", connectorType);
coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
Expand Down Expand Up @@ -3903,8 +3899,7 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreateWith
Properties properties = new Properties();
properties.put(CoordinatorConfig.CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, Boolean.TRUE.toString());
Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster, properties);
String numThroughputViolatingTopicsMetric = String.format("%s.%s.%s", Coordinator.class.getSimpleName(),
coordinator.getNumThroughputViolatingTopicsMetricName(), streamName);
String numThroughputViolatingTopicsMetric = getNumThroughputViolatingTopicsMetric(streamName);
TestHookConnector connector1 = new TestHookConnector("connector1", connectorType);
coordinator.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
Expand Down Expand Up @@ -3951,6 +3946,12 @@ private BooleanSupplier validateIfViolatingTopicsAreReflectedInServer(Datastream
&& requestedThroughputViolatingTopics.containsAll(fetchedViolatingTopicsFromStore);
}

// Formats the numThroughputViolatingTopics Metric String
private String getNumThroughputViolatingTopicsMetric(String key) {
return String.format("%s.%s.%s", Coordinator.class.getSimpleName(), key,
Coordinator.getNumThroughputViolatingTopicsMetricName());
}

// helper method: assert that within a timeout value, the connector are assigned the specific
// tasks with the specified names.
private void assertConnectorAssignment(TestHookConnector connector, long timeoutMs, String... datastreamNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,8 @@ private void populateThroughputViolatingTopicsMap(List<DatastreamGroup> datastre
_log.info("For datastream {}, Successfully reported throughput violating topics : {}", datastream.getName(),
violatingTopics);
}
_metrics.registerOrSetGauge(
String.format("%s.%s", CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM,
datastream.getName()), () -> violatingTopics.length);
_metrics.registerOrSetKeyedGauge(datastream.getName(),
CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM, () -> violatingTopics.length);
}));
} finally {
_throughputViolatingTopicsMapWriteLock.unlock();
Expand Down Expand Up @@ -2321,7 +2320,7 @@ CoordinatorConfig getConfig() {
}

@VisibleForTesting
String getNumThroughputViolatingTopicsMetricName() {
static String getNumThroughputViolatingTopicsMetricName() {
return CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM;
}

Expand Down Expand Up @@ -2478,6 +2477,10 @@ private void registerGaugeMetrics() {
.put(ZK_SESSION_EXPIRED, () -> _coordinator.isZkSessionExpired() ? 1 : 0)
.build();
gaugeMetrics.forEach(this::registerGauge);

// For dynamic datastream prefixed gauge metric reporting num throughput violating topics
_metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE,
MetricsAware.KEY_REGEX + NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM)));
}

private void registerCounterMetrics() {
Expand All @@ -2497,10 +2500,9 @@ private void registerGauge(String metricName, Supplier<?> valueSupplier) {
}

// registers a new gauge or updates the supplier for the gauge if it already exists
private <T> void registerOrSetGauge(String metricName, Supplier<T> valueSupplier) {
_dynamicMetricsManager.setGauge(_dynamicMetricsManager.registerGauge(MODULE, metricName, valueSupplier),
private <T> void registerOrSetKeyedGauge(String key, String metricName, Supplier<T> valueSupplier) {
_dynamicMetricsManager.setGauge(_dynamicMetricsManager.registerGauge(MODULE, key, metricName, valueSupplier),
valueSupplier);
_metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE, metricName)));
}

private void registerCounter(Counter metric) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,10 @@ public static List<BrooklinMetricInfo> getMetricInfos() {
Arrays.asList(BrooklinHistogramInfo.PERCENTILE_50, BrooklinHistogramInfo.PERCENTILE_99,
BrooklinHistogramInfo.PERCENTILE_999))));
metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + EVENTS_SEND_LATENCY_MS_STRING));
metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING, Optional.of(
Arrays.asList(BrooklinHistogramInfo.PERCENTILE_50, BrooklinHistogramInfo.PERCENTILE_99,
BrooklinHistogramInfo.PERCENTILE_999))));
metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING));
metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + FLUSH_LATENCY_MS_STRING));

return Collections.unmodifiableList(metrics);
Expand Down
2 changes: 1 addition & 1 deletion gradle/maven.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allprojects {
version = "5.3.2-SNAPSHOT"
version = "5.3.5-SNAPSHOT"
}

subprojects {
Expand Down

0 comments on commit f5d99f6

Please sign in to comment.