Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datastream stop transition redesign #842

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
"name" : "force",
"type" : "boolean",
"default" : "false",
"doc" : "whether or not to resume all datastreams within the given datastream's group"
"doc" : "whether or not to stop all datastreams within the given datastream's group"
} ]
} ]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ record Datastream {
PAUSED
DELETING
STOPPED
STOPPING
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"type" : {
"type" : "enum",
"name" : "DatastreamStatus",
"symbols" : [ "INITIALIZING", "READY", "PAUSED", "DELETING", "STOPPED" ]
"symbols" : [ "INITIALIZING", "READY", "PAUSED", "DELETING", "STOPPED", "STOPPING" ]
},
"doc" : "Status of the datastream",
"symbolDocs" : {
Expand Down Expand Up @@ -187,7 +187,7 @@
"name" : "force",
"type" : "boolean",
"default" : "false",
"doc" : "whether or not to resume all datastreams within the given datastream's group"
"doc" : "whether or not to stop all datastreams within the given datastream's group"
} ]
} ]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class DatastreamServer {
private final Map<String, String> _bootstrapConnectors;

private Coordinator _coordinator;
private Properties _properties;
private DatastreamStore _datastreamStore;
private DatastreamJettyStandaloneLauncher _jettyLauncher;
private JmxReporter _jmxReporter;
Expand Down Expand Up @@ -130,7 +131,8 @@ public class DatastreamServer {
public DatastreamServer(Properties properties) throws DatastreamException {
LOG.info("Start to initialize DatastreamServer. Properties: " + properties);
LOG.info("Creating coordinator.");
VerifiableProperties verifiableProperties = new VerifiableProperties(properties);
_properties = properties;
VerifiableProperties verifiableProperties = new VerifiableProperties(_properties);

HashSet<String> connectorTypes = new HashSet<>(verifiableProperties.getStringList(CONFIG_CONNECTOR_NAMES,
Collections.emptyList()));
Expand All @@ -148,7 +150,7 @@ public DatastreamServer(Properties properties) throws DatastreamException {
throw new DatastreamRuntimeException(errorMessage);
}

CoordinatorConfig coordinatorConfig = new CoordinatorConfig(properties);
CoordinatorConfig coordinatorConfig = new CoordinatorConfig(_properties);

LOG.info("Setting up DMS endpoint server.");
ZkClient zkClient = new ZkClient(coordinatorConfig.getZkAddress(), coordinatorConfig.getZkSessionTimeout(),
Expand Down Expand Up @@ -226,6 +228,10 @@ public DatastreamStore getDatastreamStore() {
return _datastreamStore;
}

public Properties getProperties() {
return _properties;
}

public int getHttpPort() {
return _httpPort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
Expand All @@ -37,6 +38,7 @@
import com.linkedin.datastream.common.DatastreamStatus;
import com.linkedin.datastream.common.DatastreamUtils;
import com.linkedin.datastream.common.JsonUtils;
import com.linkedin.datastream.common.PollUtils;
import com.linkedin.datastream.common.RestliUtils;
import com.linkedin.datastream.metrics.BrooklinGaugeInfo;
import com.linkedin.datastream.metrics.BrooklinMeterInfo;
Expand All @@ -57,6 +59,7 @@
import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.PathKeys;
import com.linkedin.restli.server.ResourceLevel;
import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.Action;
import com.linkedin.restli.server.annotations.ActionParam;
Expand All @@ -70,7 +73,6 @@

import static com.linkedin.datastream.common.DatastreamMetadataConstants.NUM_TASKS;


/**
* Resources classes are used by rest.li to process corresponding HTTP request.
* Note that rest.li will instantiate an object each time it processes a request.
Expand All @@ -97,12 +99,20 @@ public class DatastreamResources extends CollectionResourceTemplate<String, Data
private static final String CREATE_CALL_LATENCY_MS_STRING = "createCallLatencyMs";
private static final String DELETE_CALL_LATENCY_MS_STRING = "deleteCallLatencyMs";

// To support retries on the request timeouts
public static final String CONFIG_STOP_TRANSITION_TIMEOUT_MS = "stopTransitionTimeoutMs";
public static final String CONFIG_STOP_TRANSITION_RETRY_PERIOD_MS = "stopTransitionRetryPeriodMs";
private static final Long STOP_TRANSITION_TIMEOUT_MS_DEFAULT = Duration.ofMillis(60000).toMillis();
private static final Long STOP_TRANSITION_RETRY_PERIOD_MS_DEFAULT = Duration.ofMillis(1000).toMillis();

private final DatastreamStore _store;
private final Coordinator _coordinator;
private final ErrorLogger _errorLogger;

private final DynamicMetricsManager _dynamicMetricsManager;

private final Duration _stopTransitionTimeoutMs;
private final Duration _stopTransitionRetryPeriodMs;

/**
* Constructor for DatastreamResources
* @param datastreamServer the datastream server
Expand All @@ -117,13 +127,46 @@ public DatastreamResources(DatastreamServer datastreamServer) {
* @param coordinator the server coordinator
*/
public DatastreamResources(DatastreamStore store, Coordinator coordinator) {
this(store, coordinator, null);
}

/**
* Constructor for DatastreamResources
* @param store the datastream store
* @param coordinator the server coordinator
* @param properties the properties of datastream server
*/
public DatastreamResources(DatastreamStore store, Coordinator coordinator, Properties properties) {
_store = store;
_coordinator = coordinator;
_errorLogger = new ErrorLogger(LOG, _coordinator.getInstanceName());

_dynamicMetricsManager = DynamicMetricsManager.getInstance();
_dynamicMetricsManager.registerGauge(CLASS_NAME, CREATE_CALL_LATENCY_MS_STRING, CREATE_CALL_LATENCY_MS_SUPPLIER);
_dynamicMetricsManager.registerGauge(CLASS_NAME, DELETE_CALL_LATENCY_MS_STRING, DELETE_CALL_LATENCY_MS_SUPPLIER);

// fetching configs from properties if present otherwise using default configs
long stopTransitionTimeoutMs = Objects.nonNull(properties) ? Long.parseLong(
(String) properties.getOrDefault(CONFIG_STOP_TRANSITION_TIMEOUT_MS,
String.valueOf(STOP_TRANSITION_TIMEOUT_MS_DEFAULT))) : STOP_TRANSITION_TIMEOUT_MS_DEFAULT;

long stopTransitionRetryPeriodMs = Objects.nonNull(properties) ? Long.parseLong(
(String) properties.getOrDefault(CONFIG_STOP_TRANSITION_RETRY_PERIOD_MS,
String.valueOf(STOP_TRANSITION_RETRY_PERIOD_MS_DEFAULT))) : STOP_TRANSITION_RETRY_PERIOD_MS_DEFAULT;

// validating the fetched configs otherwise falling back to using default configs
if (stopTransitionRetryPeriodMs > 0 && stopTransitionTimeoutMs > stopTransitionRetryPeriodMs) {
_stopTransitionTimeoutMs = Duration.ofMillis(stopTransitionTimeoutMs);
_stopTransitionRetryPeriodMs = Duration.ofMillis(stopTransitionRetryPeriodMs);
} else {
LOG.warn("Illegal configurations provided, stopTransitionTimeoutMs={} stopTransitionRetryPeriodMs={}. Falling back to using default configurations",
stopTransitionTimeoutMs, stopTransitionRetryPeriodMs);
_stopTransitionTimeoutMs = Duration.ofMillis(STOP_TRANSITION_TIMEOUT_MS_DEFAULT);
_stopTransitionRetryPeriodMs = Duration.ofMillis(STOP_TRANSITION_RETRY_PERIOD_MS_DEFAULT);
}

LOG.info("Datastream resources, created with stopTransitionRetryPeriodMs={} stopTransitionTimeoutMs={}",
_stopTransitionRetryPeriodMs, _stopTransitionTimeoutMs);
}

/**
Expand Down Expand Up @@ -388,7 +431,7 @@ public ActionResult<Void> movePartitions(@PathKeysParam PathKeys pathKeys,
/**
* Stop a datastream
* @param pathKeys resource key containing the datastream name
* @param force whether or not to resume all datastreams within the given datastream's group
* @param force whether or not to stop all datastreams within the given datastream's group
* @return result HTTP status
*/
@Action(name = "stop", resourceLevel = ResourceLevel.ENTITY)
Expand All @@ -404,30 +447,49 @@ public ActionResult<Void> stop(@PathKeysParam PathKeys pathKeys,
"Datastream to stopped does not exist: " + datastreamName);
}

if (!DatastreamStatus.READY.equals(datastream.getStatus()) && !DatastreamStatus.PAUSED.equals(datastream.getStatus())) {
if (DatastreamStatus.STOPPED.equals(datastream.getStatus())) {
LOG.info("Datastream {} is already in STOPPED state", datastreamName);
return new ActionResult<>(HttpStatus.S_200_OK);
}

if (!DatastreamStatus.READY.equals(datastream.getStatus()) && !DatastreamStatus.PAUSED.equals(
datastream.getStatus()) && !DatastreamStatus.STOPPING.equals(datastream.getStatus())) {
_errorLogger.logAndThrowRestLiServiceException(HttpStatus.S_405_METHOD_NOT_ALLOWED,
"Can only pause a datastream in READY/PAUSED state: " + datastreamName);
String.format("Datastream %s is in %s state. Can only stop a datastream in READY/PAUSED state", datastreamName, datastream.getStatus()));
}

List<Datastream> datastreamsToStop =
force ? getGroupedDatastreams(datastream) : Collections.singletonList(datastream);
LOG.info("Stop datastreams {}", datastreamsToStop);
for (Datastream d : datastreamsToStop) {
try {
if (DatastreamStatus.READY.equals(datastream.getStatus()) || DatastreamStatus.PAUSED.equals(datastream.getStatus())) {
d.setStatus(DatastreamStatus.STOPPED);
if (DatastreamStatus.READY.equals(d.getStatus()) || DatastreamStatus.PAUSED.equals(d.getStatus())) {
d.setStatus(DatastreamStatus.STOPPING);
_store.updateDatastream(d.getName(), d, true);
_store.deleteDatastreamNumTasks(d.getName());
} else if (DatastreamStatus.STOPPING.equals(d.getStatus())) {
// this check helps in preventing any datastream from being stuck in STOPPING state indefinitely
LOG.warn("Datastream {} is already in {} state. Notifying leader to initiate transition", d,
d.getStatus());
_store.updateDatastream(d.getName(), d, true);
_store.deleteDatastreamNumTasks(d.getName());
} else {
LOG.warn("Cannot stop datastream {}, as it is not in READY/PAUSED state. State: {}", d, datastream.getStatus());
LOG.warn("Cannot stop datastream {}, as it is not in READY/PAUSED state. State: {}", d, d.getStatus());
}
} catch (DatastreamException e) {
_errorLogger.logAndThrowRestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
"Could not update datastream to STOPPED state: " + d.getName(), e);
"Could not update datastream to STOPPING state: " + d.getName(), e);
}
}

LOG.info("Completed request for stopping datastream {}", datastream);
// polls until the leader transitions the state of the datastream to STOPPED state
PollUtils.poll(() -> datastreamsToStop.stream()
.allMatch(ds -> _store.getDatastream(ds.getName()).getStatus().equals(DatastreamStatus.STOPPED)),
allStopped -> allStopped, _stopTransitionRetryPeriodMs.toMillis(), _stopTransitionTimeoutMs.toMillis())
.orElseThrow(() -> new RestLiServiceException(HttpStatus.S_408_REQUEST_TIMEOUT,
String.format("Stop request timed out for datastream: %s", datastreamName)));

LOG.info("Completed request for stopping datastream {}", _store.getDatastream(datastream.getName()));

return new ActionResult<>(HttpStatus.S_200_OK);
}
Expand Down
Loading