From 80522b6d6ee7bd198ac3e3c27aaa991a1f92e8c3 Mon Sep 17 00:00:00 2001 From: Vaibhav Maheshwari Date: Fri, 5 Nov 2021 09:50:16 -0700 Subject: [PATCH] Fix the metrics deregistration in AbstractKafkaConnector when multiple stop are called --- .../kafka/AbstractKafkaBasedConnectorTask.java | 9 +++++++++ .../connectors/kafka/AbstractKafkaConnector.java | 10 +++------- .../kafka/TestAbstractKafkaConnector.java | 14 +++++++------- .../connectors/kafka/TestKafkaConnectorTask.java | 6 ++++++ 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index a0e6797ef..a9ecff0b8 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.StringUtils; @@ -93,6 +94,7 @@ abstract public class AbstractKafkaBasedConnectorTask implements Runnable, Consu protected volatile long _lastPollCompletedTimeMillis = 0; protected final CountDownLatch _startedLatch = new CountDownLatch(1); protected final CountDownLatch _stoppedLatch = new CountDownLatch(1); + private final AtomicBoolean _metricDeregistered = new AtomicBoolean(false); // config protected DatastreamTask _datastreamTask; @@ -451,6 +453,13 @@ public void stop() { _logger.info("Waking up the consumer for task {}", _taskName); _consumer.wakeup(); } + if (!_metricDeregistered.getAndSet(true)) { + deregisterMetrics(); + } + } + + @VisibleForTesting + void deregisterMetrics() { _consumerMetrics.deregisterMetrics(); } diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java index 71daaf564..1fcc8689b 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java @@ -155,7 +155,7 @@ public synchronized void onAssignmentChange(List tasks) { synchronized (_runningTasks) { Set toCancel = new HashSet<>(_runningTasks.keySet()); - toCancel.removeAll(tasks); + tasks.forEach(toCancel::remove); if (toCancel.size() > 0) { // Mark the connector task as stopped so that, in case stopping the task here fails for any reason in @@ -363,13 +363,12 @@ private Future asyncStopTask(DatastreamTask task, ConnectorTaskE private DatastreamTask stopTask(DatastreamTask datastreamTask, ConnectorTaskEntry connectorTaskEntry) { try { connectorTaskEntry.setPendingStop(); - AbstractKafkaBasedConnectorTask connectorTask = connectorTaskEntry.getConnectorTask(); connectorTask.stop(); boolean stopped = connectorTask.awaitStop(CANCEL_TASK_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); if (!stopped) { _logger.warn("Connector task for datastream task {} took longer than {} ms to stop. Interrupting the thread.", - datastreamTask, CANCEL_TASK_TIMEOUT.toMillis()); + datastreamTask.getDatastreamTaskName(), CANCEL_TASK_TIMEOUT.toMillis()); connectorTaskEntry.getThread().interrupt(); // Check that the thread really got interrupted and log a message if it seems like the thread is still running. // Threads which don't check for the interrupt status may land up running forever, and we would like to @@ -398,10 +397,7 @@ private DatastreamTask stopTask(DatastreamTask datastreamTask, ConnectorTaskEntr */ protected boolean isTaskThreadDead(ConnectorTaskEntry connectorTaskEntry) { Thread taskThread = connectorTaskEntry.getThread(); - if (taskThread == null || !taskThread.isAlive()) { - return true; - } - return false; + return taskThread == null || !taskThread.isAlive(); } /** diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java index c7262960b..17e242b0b 100644 --- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java +++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java @@ -85,13 +85,13 @@ public void testOnAssignmentChangeReassignment() { // With failStopTaskOnce set to true the AbstractKafkaBasedConnectorTask.stop is configured // to fail the first time with InterruptedException and pass the second time. TestKafkaConnector connector = new TestKafkaConnector(false, props, true); - DatastreamTask datastreamTask1 = new DatastreamTaskImpl(); - ((DatastreamTaskImpl) datastreamTask1).setTaskPrefix("testtask1"); + DatastreamTaskImpl datastreamTask1 = new DatastreamTaskImpl(); + datastreamTask1.setTaskPrefix("testtask1"); connector.onAssignmentChange(Collections.singletonList(datastreamTask1)); connector.start(null); - DatastreamTask datastreamTask2 = new DatastreamTaskImpl(); - ((DatastreamTaskImpl) datastreamTask2).setTaskPrefix("testtask2"); + DatastreamTaskImpl datastreamTask2 = new DatastreamTaskImpl(); + datastreamTask2.setTaskPrefix("testtask2"); // AbstractKafkaBasedConnectorTask stop should fail on this onAssignmentChange call connector.onAssignmentChange(Collections.singletonList(datastreamTask2)); Assert.assertEquals(connector.getTasksToStopCount(), 1); @@ -115,13 +115,13 @@ public void testOnAssignmentChangeStopTaskFailure() { // With failStopTaskOnce set to true the AbstractKafkaBasedConnectorTask.stop is configured // to fail the first time with InterruptedException and pass the second time. TestKafkaConnector connector = new TestKafkaConnector(false, props, true); - DatastreamTask datastreamTask = new DatastreamTaskImpl(); - ((DatastreamTaskImpl) datastreamTask).setTaskPrefix("testtask1"); + DatastreamTaskImpl datastreamTask = new DatastreamTaskImpl(); + datastreamTask.setTaskPrefix("testtask1"); connector.onAssignmentChange(Collections.singletonList(datastreamTask)); connector.start(null); datastreamTask = new DatastreamTaskImpl(); - ((DatastreamTaskImpl) datastreamTask).setTaskPrefix("testtask2"); + datastreamTask.setTaskPrefix("testtask2"); // AbstractKafkaBasedConnectorTask stop should fail on this onAssignmentChange call connector.onAssignmentChange(Collections.singletonList(datastreamTask)); Assert.assertEquals(connector.getTasksToStopCount(), 1); diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestKafkaConnectorTask.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestKafkaConnectorTask.java index 37fe61923..0b099c00e 100644 --- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestKafkaConnectorTask.java +++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestKafkaConnectorTask.java @@ -374,7 +374,13 @@ public void testRewindWhenSkippingMessage() throws Exception { any(Exception.class)); //Verify that we have call at least seekToLastCheckpoint twice as the skip messages also trigger this verify(connectorTask, atLeast(2)).seekToLastCheckpoint(ImmutableSet.of(topicPartition)); + verify(connectorTask, times(0)).deregisterMetrics(); connectorTask.stop(); + // Verify that multiple stop requests do not result in multiple metric de-registration. + connectorTask.stop(); + verify(connectorTask, times(1)).deregisterMetrics(); + connectorTask.stop(); + verify(connectorTask, times(1)).deregisterMetrics(); } @Test