Skip to content

Commit

Permalink
Minor improvements for assignment tokens feature (#941)
Browse files Browse the repository at this point in the history
* Minor improvements for assignment tokens feature

* Remved unnecessary empty line

* Fixed checkstyle violation
  • Loading branch information
jzakaryan authored May 31, 2023
1 parent 6f59d2a commit 2238d82
Showing 1 changed file with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -971,12 +971,16 @@ void maybeClaimAssignmentTokensForStoppingStreams(List<DatastreamTask> newAssign
map(DatastreamTask::getId).collect(Collectors.toSet());

// TODO Evaluate whether we need to optimize here and make this call for each datastream
if (PollUtils.poll(() -> connectorTasksHaveStopped(connector, stoppingDatastreamTasks),
_config.getTaskStopCheckRetryPeriodMs(), _config.getTaskStopCheckTimeoutMs())) {
_adapter.claimAssignmentTokensForDatastreams(stoppingStreams, _adapter.getInstanceName(), false);
} else {
_log.warn("Connector {} failed to stop its tasks in {}ms. No assignment tokens will be claimed",
connector, _config.getTaskStopCheckTimeoutMs());
try {
if (PollUtils.poll(() -> connectorTasksHaveStopped(connector, stoppingDatastreamTasks),
_config.getTaskStopCheckRetryPeriodMs(), _config.getTaskStopCheckTimeoutMs())) {
_adapter.claimAssignmentTokensForDatastreams(stoppingStreams, _adapter.getInstanceName(), false);
} else {
_log.warn("Connector {} failed to stop its tasks in {}ms. No assignment tokens will be claimed",
connector, _config.getTaskStopCheckTimeoutMs());
}
} catch (Exception ex) {
_log.error("Failed to claim assignment tokens for stopping streams:", ex);
}
} else {
_log.info("No streams have been inferred as stopping for connector {} and no assignment tokens will be claimed",
Expand Down Expand Up @@ -1470,7 +1474,7 @@ private void handleLeaderDoAssignment(boolean isNewlyElectedLeader) {
// assignment and do remove and add zNodes accordingly. In the case of ZooKeeper failure (when
// it failed to create or delete zNodes), we will do our best to continue the current process
// and schedule a retry. The retry should be able to diff the remaining ZooKeeper work
if (_config.getEnableAssignmentTokens()) {
if (_config.getEnableAssignmentTokens() && !stoppingDatastreamGroups.isEmpty()) {
_adapter.updateAllAssignmentsAndIssueTokens(newAssignmentsByInstance, stoppingDatastreamGroups);
try {
_tokenClaimExecutor.submit(() -> waitForStopToPropagateAndMarkDatastreamsStopped(stoppingDatastreamGroups,
Expand Down Expand Up @@ -1524,7 +1528,10 @@ private void scheduleLeaderDoAssignmentRetry(boolean isNewlyElectedLeader) {
@VisibleForTesting
void waitForStopToPropagateAndMarkDatastreamsStopped(List<DatastreamGroup> stoppingDatastreamGroups,
boolean isNewlyElectedLeader) {
_log.info("waitForStopToPropagateAndMarkDatastreamsStopped started in thread {}", Thread.currentThread().getName());
List<String> streamNames = stoppingDatastreamGroups.stream().map(DatastreamGroup::getName).collect(
Collectors.toList());
_log.info("waitForStopToPropagateAndMarkDatastreamsStopped started in thread {} for streams {}",
Thread.currentThread().getName(), streamNames);
// Poll the zookeeper to ensure that hosts claimed assignment tokens for stopping streams
Set<String> failedStreams = Collections.emptySet();
if (_config.getEnableAssignmentTokens() &&
Expand All @@ -1548,7 +1555,6 @@ void waitForStopToPropagateAndMarkDatastreamsStopped(List<DatastreamGroup> stopp
_config.getStopPropagationTimeoutMs(), failedStreams, hosts);
}
revokeUnclaimedAssignmentTokens(unclaimedTokens, stoppingDatastreamGroups);
_log.info("waitForStopToPropagateAndMarkDatastreamsStopped stopped in thread {}", Thread.currentThread().getName());
}

// TODO Explore if the STOPPING -> STOPPED transition can be converted into an event type and scheduled in the event queue
Expand All @@ -1557,7 +1563,7 @@ void waitForStopToPropagateAndMarkDatastreamsStopped(List<DatastreamGroup> stopp
_config.getMarkDatastreamsStoppedRetryPeriodMs(), _config.getMarkDatastreamsStoppedTimeoutMs())) {
_log.error("Failed to mark streams STOPPED within {}ms. Giving up.", _config.getMarkDatastreamsStoppedTimeoutMs());
}
_log.info("Executing waitForStopToPropagateAndMarkDatastreamsStopped in thread {}", Thread.currentThread().getName());
_log.info("waitForStopToPropagateAndMarkDatastreamsStopped finished in thread {}", Thread.currentThread().getName());
}

private boolean markDatastreamsStopped(List<DatastreamGroup> stoppingDatastreamGroups, Set<String> failedStreams) {
Expand All @@ -1573,6 +1579,7 @@ private boolean markDatastreamsStopped(List<DatastreamGroup> stoppingDatastreamG
if (stoppingStreams.contains(datastream.getName()) &&
(forceStop || !failedStreams.contains(datastream.getName()))) {
datastream.setStatus(DatastreamStatus.STOPPED);
_log.info("Transitioned datastream {} to STOPPED state", datastream.getName());
if (!_adapter.updateDatastream(datastream)) {
_log.error("Failed to update datastream: {} to stopped state", datastream.getName());
success = false;
Expand Down

0 comments on commit 2238d82

Please sign in to comment.