Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the metrics deregistration in AbstractKafkaConnector when multiple stop are called #865

Merged
merged 5 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -93,6 +94,7 @@ abstract public class AbstractKafkaBasedConnectorTask implements Runnable, Consu
protected volatile long _lastPollCompletedTimeMillis = 0;
protected final CountDownLatch _startedLatch = new CountDownLatch(1);
protected final CountDownLatch _stoppedLatch = new CountDownLatch(1);
private final AtomicBoolean _metricDeregistered = new AtomicBoolean(false);

// config
protected DatastreamTask _datastreamTask;
Expand Down Expand Up @@ -451,6 +453,13 @@ public void stop() {
_logger.info("Waking up the consumer for task {}", _taskName);
_consumer.wakeup();
}
if (!_metricDeregistered.getAndSet(true)) {
deregisterMetrics();
}
}

@VisibleForTesting
void deregisterMetrics() {
_consumerMetrics.deregisterMetrics();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public synchronized void onAssignmentChange(List<DatastreamTask> tasks) {

synchronized (_runningTasks) {
Set<DatastreamTask> toCancel = new HashSet<>(_runningTasks.keySet());
toCancel.removeAll(tasks);
tasks.forEach(toCancel::remove);

if (toCancel.size() > 0) {
// Mark the connector task as stopped so that, in case stopping the task here fails for any reason in
Expand Down Expand Up @@ -363,13 +363,12 @@ private Future<DatastreamTask> asyncStopTask(DatastreamTask task, ConnectorTaskE
private DatastreamTask stopTask(DatastreamTask datastreamTask, ConnectorTaskEntry connectorTaskEntry) {
try {
connectorTaskEntry.setPendingStop();

AbstractKafkaBasedConnectorTask connectorTask = connectorTaskEntry.getConnectorTask();
connectorTask.stop();
boolean stopped = connectorTask.awaitStop(CANCEL_TASK_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (!stopped) {
_logger.warn("Connector task for datastream task {} took longer than {} ms to stop. Interrupting the thread.",
datastreamTask, CANCEL_TASK_TIMEOUT.toMillis());
datastreamTask.getDatastreamTaskName(), CANCEL_TASK_TIMEOUT.toMillis());
connectorTaskEntry.getThread().interrupt();
// Check that the thread really got interrupted and log a message if it seems like the thread is still running.
// Threads which don't check for the interrupt status may land up running forever, and we would like to
Expand Down Expand Up @@ -398,10 +397,7 @@ private DatastreamTask stopTask(DatastreamTask datastreamTask, ConnectorTaskEntr
*/
protected boolean isTaskThreadDead(ConnectorTaskEntry connectorTaskEntry) {
Thread taskThread = connectorTaskEntry.getThread();
if (taskThread == null || !taskThread.isAlive()) {
return true;
}
return false;
return taskThread == null || !taskThread.isAlive();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ public void testOnAssignmentChangeReassignment() {
// With failStopTaskOnce set to true the AbstractKafkaBasedConnectorTask.stop is configured
// to fail the first time with InterruptedException and pass the second time.
TestKafkaConnector connector = new TestKafkaConnector(false, props, true);
DatastreamTask datastreamTask1 = new DatastreamTaskImpl();
((DatastreamTaskImpl) datastreamTask1).setTaskPrefix("testtask1");
DatastreamTaskImpl datastreamTask1 = new DatastreamTaskImpl();
datastreamTask1.setTaskPrefix("testtask1");
connector.onAssignmentChange(Collections.singletonList(datastreamTask1));
connector.start(null);

DatastreamTask datastreamTask2 = new DatastreamTaskImpl();
((DatastreamTaskImpl) datastreamTask2).setTaskPrefix("testtask2");
DatastreamTaskImpl datastreamTask2 = new DatastreamTaskImpl();
datastreamTask2.setTaskPrefix("testtask2");
// AbstractKafkaBasedConnectorTask stop should fail on this onAssignmentChange call
connector.onAssignmentChange(Collections.singletonList(datastreamTask2));
Assert.assertEquals(connector.getTasksToStopCount(), 1);
Expand All @@ -115,13 +115,13 @@ public void testOnAssignmentChangeStopTaskFailure() {
// With failStopTaskOnce set to true the AbstractKafkaBasedConnectorTask.stop is configured
// to fail the first time with InterruptedException and pass the second time.
TestKafkaConnector connector = new TestKafkaConnector(false, props, true);
DatastreamTask datastreamTask = new DatastreamTaskImpl();
((DatastreamTaskImpl) datastreamTask).setTaskPrefix("testtask1");
DatastreamTaskImpl datastreamTask = new DatastreamTaskImpl();
datastreamTask.setTaskPrefix("testtask1");
connector.onAssignmentChange(Collections.singletonList(datastreamTask));
connector.start(null);

datastreamTask = new DatastreamTaskImpl();
((DatastreamTaskImpl) datastreamTask).setTaskPrefix("testtask2");
datastreamTask.setTaskPrefix("testtask2");
// AbstractKafkaBasedConnectorTask stop should fail on this onAssignmentChange call
connector.onAssignmentChange(Collections.singletonList(datastreamTask));
Assert.assertEquals(connector.getTasksToStopCount(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,13 @@ public void testRewindWhenSkippingMessage() throws Exception {
any(Exception.class));
//Verify that we have call at least seekToLastCheckpoint twice as the skip messages also trigger this
verify(connectorTask, atLeast(2)).seekToLastCheckpoint(ImmutableSet.of(topicPartition));
verify(connectorTask, times(0)).deregisterMetrics();
connectorTask.stop();
// Verify that multiple stop requests do not result in multiple metric de-registration.
connectorTask.stop();
verify(connectorTask, times(1)).deregisterMetrics();
connectorTask.stop();
verify(connectorTask, times(1)).deregisterMetrics();
}

@Test
Expand Down