diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 95bf8ecb6..13ab90941 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -971,12 +971,16 @@ void maybeClaimAssignmentTokensForStoppingStreams(List newAssign map(DatastreamTask::getId).collect(Collectors.toSet()); // TODO Evaluate whether we need to optimize here and make this call for each datastream - if (PollUtils.poll(() -> connectorTasksHaveStopped(connector, stoppingDatastreamTasks), - _config.getTaskStopCheckRetryPeriodMs(), _config.getTaskStopCheckTimeoutMs())) { - _adapter.claimAssignmentTokensForDatastreams(stoppingStreams, _adapter.getInstanceName(), false); - } else { - _log.warn("Connector {} failed to stop its tasks in {}ms. No assignment tokens will be claimed", - connector, _config.getTaskStopCheckTimeoutMs()); + try { + if (PollUtils.poll(() -> connectorTasksHaveStopped(connector, stoppingDatastreamTasks), + _config.getTaskStopCheckRetryPeriodMs(), _config.getTaskStopCheckTimeoutMs())) { + _adapter.claimAssignmentTokensForDatastreams(stoppingStreams, _adapter.getInstanceName(), false); + } else { + _log.warn("Connector {} failed to stop its tasks in {}ms. No assignment tokens will be claimed", + connector, _config.getTaskStopCheckTimeoutMs()); + } + } catch (Exception ex) { + _log.error("Failed to claim assignment tokens for stopping streams:", ex); } } else { _log.info("No streams have been inferred as stopping for connector {} and no assignment tokens will be claimed", @@ -1470,7 +1474,7 @@ private void handleLeaderDoAssignment(boolean isNewlyElectedLeader) { // assignment and do remove and add zNodes accordingly. In the case of ZooKeeper failure (when // it failed to create or delete zNodes), we will do our best to continue the current process // and schedule a retry. The retry should be able to diff the remaining ZooKeeper work - if (_config.getEnableAssignmentTokens()) { + if (_config.getEnableAssignmentTokens() && !stoppingDatastreamGroups.isEmpty()) { _adapter.updateAllAssignmentsAndIssueTokens(newAssignmentsByInstance, stoppingDatastreamGroups); try { _tokenClaimExecutor.submit(() -> waitForStopToPropagateAndMarkDatastreamsStopped(stoppingDatastreamGroups, @@ -1524,7 +1528,10 @@ private void scheduleLeaderDoAssignmentRetry(boolean isNewlyElectedLeader) { @VisibleForTesting void waitForStopToPropagateAndMarkDatastreamsStopped(List stoppingDatastreamGroups, boolean isNewlyElectedLeader) { - _log.info("waitForStopToPropagateAndMarkDatastreamsStopped started in thread {}", Thread.currentThread().getName()); + List streamNames = stoppingDatastreamGroups.stream().map(DatastreamGroup::getName).collect( + Collectors.toList()); + _log.info("waitForStopToPropagateAndMarkDatastreamsStopped started in thread {} for streams {}", + Thread.currentThread().getName(), streamNames); // Poll the zookeeper to ensure that hosts claimed assignment tokens for stopping streams Set failedStreams = Collections.emptySet(); if (_config.getEnableAssignmentTokens() && @@ -1548,7 +1555,6 @@ void waitForStopToPropagateAndMarkDatastreamsStopped(List stopp _config.getStopPropagationTimeoutMs(), failedStreams, hosts); } revokeUnclaimedAssignmentTokens(unclaimedTokens, stoppingDatastreamGroups); - _log.info("waitForStopToPropagateAndMarkDatastreamsStopped stopped in thread {}", Thread.currentThread().getName()); } // TODO Explore if the STOPPING -> STOPPED transition can be converted into an event type and scheduled in the event queue @@ -1557,7 +1563,7 @@ void waitForStopToPropagateAndMarkDatastreamsStopped(List stopp _config.getMarkDatastreamsStoppedRetryPeriodMs(), _config.getMarkDatastreamsStoppedTimeoutMs())) { _log.error("Failed to mark streams STOPPED within {}ms. Giving up.", _config.getMarkDatastreamsStoppedTimeoutMs()); } - _log.info("Executing waitForStopToPropagateAndMarkDatastreamsStopped in thread {}", Thread.currentThread().getName()); + _log.info("waitForStopToPropagateAndMarkDatastreamsStopped finished in thread {}", Thread.currentThread().getName()); } private boolean markDatastreamsStopped(List stoppingDatastreamGroups, Set failedStreams) { @@ -1573,6 +1579,7 @@ private boolean markDatastreamsStopped(List stoppingDatastreamG if (stoppingStreams.contains(datastream.getName()) && (forceStop || !failedStreams.contains(datastream.getName()))) { datastream.setStatus(DatastreamStatus.STOPPED); + _log.info("Transitioned datastream {} to STOPPED state", datastream.getName()); if (!_adapter.updateDatastream(datastream)) { _log.error("Failed to update datastream: {} to stopped state", datastream.getName()); success = false;