Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored StickyPartitionAssignmentStrategy and implemented task estimation logic in LoadBasedPartitionAssignmentStrategy #835

Merged
merged 12 commits into from
Jun 17, 2021

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Map;

import com.linkedin.datastream.server.ClusterThroughputInfo;
import com.linkedin.datastream.server.DatastreamGroup;


/**
Expand All @@ -23,6 +24,13 @@ public interface PartitionThroughputProvider {
*/
ClusterThroughputInfo getThroughputInfo(String clusterName);

/**
* Retrieves per-partition throughput information for the given datastream group
* @param datastreamGroup Datastream group
* @return Throughput information for the provided datastream group
*/
ClusterThroughputInfo getThroughputInfo(DatastreamGroup datastreamGroup);

/**
* Retrieves per-partition throughput information for all clusters
* @return A map, where keys are cluster names and values are throughput information for the cluster
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

import com.linkedin.datastream.common.PollUtils;
import com.linkedin.datastream.common.RetriesExhaustedException;
import com.linkedin.datastream.common.zk.ZkClient;
import com.linkedin.datastream.server.ClusterThroughputInfo;
import com.linkedin.datastream.server.DatastreamGroup;
import com.linkedin.datastream.server.DatastreamGroupPartitionsMetadata;
import com.linkedin.datastream.server.DatastreamSourceClusterResolver;
import com.linkedin.datastream.server.DatastreamTask;
import com.linkedin.datastream.server.Pair;
import com.linkedin.datastream.server.providers.PartitionThroughputProvider;
Expand All @@ -44,28 +47,28 @@ public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignm
private static final int TASK_CAPACITY_UTILIZATION_PCT_DEFAULT = 90;

private final PartitionThroughputProvider _throughputProvider;
private final DatastreamSourceClusterResolver _sourceClusterResolver;
private final int _taskCapacityMBps;
private final int _taskCapacityUtilizationPct;
private final int _throughputInfoFetchTimeoutMs;
private final int _throughputInfoFetchRetryPeriodMs;

// TODO Make these configurable
private final boolean _enableThroughputBasedPartitionAssignment = true;
private final boolean _enablePartitionNumBasedTaskCountEstimation = true;

/**
* Creates an instance of {@link LoadBasedPartitionAssignmentStrategy}
*/
public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughputProvider,
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
DatastreamSourceClusterResolver sourceClusterResolver, Optional<Integer> maxTasks,
Optional<Integer> imbalanceThreshold, Optional<Integer> maxPartitionPerTask, boolean enableElasticTaskAssignment,
Optional<Integer> partitionsPerTask, Optional<Integer> partitionFullnessFactorPct,
Optional<Integer> taskCapacityMBps, Optional<Integer> taskCapacityUtilizationPct,
Optional<Integer> throughputInfoFetchTimeoutMs, Optional<Integer> throughputInfoFetchRetryPeriodMs,
Optional<ZkClient> zkClient,
Optional<Integer> maxTasks, Optional<Integer> imbalanceThreshold, Optional<Integer> maxPartitionPerTask,
boolean enableElasticTaskAssignment, Optional<Integer> partitionsPerTask,
Optional<Integer> partitionFullnessFactorPct, Optional<Integer> taskCapacityMBps,
Optional<Integer> taskCapacityUtilizationPct, Optional<Integer> throughputInfoFetchTimeoutMs,
Optional<Integer> throughputInfoFetchRetryPeriodMs, Optional<ZkClient> zkClient,
String clusterName) {
super(maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask,
partitionFullnessFactorPct, zkClient, clusterName);
_throughputProvider = throughputProvider;
_sourceClusterResolver = sourceClusterResolver;
_taskCapacityMBps = taskCapacityMBps.orElse(TASK_CAPACITY_MBPS_DEFAULT);
_taskCapacityUtilizationPct = taskCapacityUtilizationPct.orElse(TASK_CAPACITY_UTILIZATION_PCT_DEFAULT);
_throughputInfoFetchTimeoutMs = throughputInfoFetchTimeoutMs.orElse(THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT);
Expand All @@ -77,56 +80,91 @@ public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughp
public Map<String, Set<DatastreamTask>> assignPartitions(Map<String, Set<DatastreamTask>> currentAssignment,
DatastreamGroupPartitionsMetadata datastreamPartitions) {
DatastreamGroup datastreamGroup = datastreamPartitions.getDatastreamGroup();
String datastreamGroupName = datastreamGroup.getName();
Pair<List<String>, Integer> assignedPartitionsAndTaskCount = getAssignedPartitionsAndTaskCountForDatastreamGroup(
currentAssignment, datastreamGroupName);
List<String> assignedPartitions = assignedPartitionsAndTaskCount.getKey();

// Do throughput based assignment only initially, when no partitions have been assigned yet
if (!assignedPartitions.isEmpty()) {
return super.assignPartitions(currentAssignment, datastreamPartitions);
}

Map<String, ClusterThroughputInfo> partitionThroughputInfo;
// Attempting to retrieve partition throughput info with a fallback mechanism to StickyPartitionAssignmentStrategy
try {
partitionThroughputInfo = fetchPartitionThroughputInfo();
} catch (RetriesExhaustedException ex) {
LOG.warn("Attempts to fetch partition throughput timed out. Falling back to regular partition assignment strategy");
// For throughput based partition-assignment to kick in, the following conditions must be met:
// (1) Elastic task assignment must be enabled through configuration
// (2) Throughput-based task assignment must be enabled through configuration
boolean enableElasticTaskAssignment = isElasticTaskAssignmentEnabled(datastreamGroup);
if (!enableElasticTaskAssignment || !_enableThroughputBasedPartitionAssignment) {
LOG.info("Throughput based elastic task assignment not enabled. Falling back to sticky partition assignment.");
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
LOG.info("enableElasticTaskAssignment: {}, enableThroughputBasedPartitionAssignment {}",
enableElasticTaskAssignment, _enableThroughputBasedPartitionAssignment);
return super.assignPartitions(currentAssignment, datastreamPartitions);
}

String datastreamGroupName = datastreamGroup.getName();
Pair<List<String>, Integer> assignedPartitionsAndTaskCount = getAssignedPartitionsAndTaskCountForDatastreamGroup(
currentAssignment, datastreamGroupName);
List<String> assignedPartitions = assignedPartitionsAndTaskCount.getKey();
int taskCount = assignedPartitionsAndTaskCount.getValue();
LOG.info("Old partition assignment info, assignment: {}", currentAssignment);
Validate.isTrue(taskCount > 0, String.format("No tasks found for datastream group %s", datastreamGroup));
Validate.isTrue(currentAssignment.size() > 0,
"Zero tasks assigned. Retry leader partition assignment.");
"Zero tasks assigned. Retry leader partition assignment");

// Calculating unassigned partitions
List<String> unassignedPartitions = new ArrayList<>(datastreamPartitions.getPartitions());
unassignedPartitions.removeAll(assignedPartitions);

// Resolving cluster name from datastream group
String clusterName = _sourceClusterResolver.getSourceCluster(datastreamPartitions.getDatastreamGroup());
ClusterThroughputInfo clusterThroughputInfo = partitionThroughputInfo.get(clusterName);
ClusterThroughputInfo clusterThroughputInfo = new ClusterThroughputInfo(StringUtils.EMPTY, Collections.emptyMap());
if (assignedPartitions.isEmpty()) {
try {
// Attempting to retrieve partition throughput info on initial assignment
clusterThroughputInfo = fetchPartitionThroughputInfo(datastreamGroup);
} catch (RetriesExhaustedException ex) {
LOG.warn("Attempts to fetch partition throughput timed out");
LOG.info("Throughput information unavailable during initial assignment. Falling back to sticky partition assignment");
return super.assignPartitions(currentAssignment, datastreamPartitions);
}

// TODO Get task count estimate and perform elastic task count validation
// TODO Get task count estimate based on throughput and pick a winner
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(_taskCapacityMBps, _taskCapacityUtilizationPct);
int maxTaskCount = estimator.getTaskCount(clusterThroughputInfo, Collections.emptyList(), Collections.emptyList());
LOG.info("Max task count obtained from estimator: {}", maxTaskCount);
// Task count update happens only on initial assignment (when datastream makes the STOPPED -> READY transition).
// The calculation is based on the maximum of:
// (1) Tasks already allocated for the datastream
// (2) Partition number based estimate, if the appropriate config is enabled
// (3) Throughput based task count estimate
int numTasksNeeded = taskCount;
if (_enablePartitionNumBasedTaskCountEstimation) {
numTasksNeeded = getTaskCountEstimateBasedOnNumPartitions(datastreamPartitions, taskCount);
}

// TODO Get unassigned partitions
// Calculating unassigned partitions
List<String> unassignedPartitions = new ArrayList<>();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(_taskCapacityMBps, _taskCapacityUtilizationPct);
numTasksNeeded = Math.max(numTasksNeeded, estimator.getTaskCount(clusterThroughputInfo, assignedPartitions,
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
unassignedPartitions));

// Task count is validated against max tasks config
numTasksNeeded = validateNumTasksAgainstMaxTasks(datastreamPartitions, numTasksNeeded);
if (numTasksNeeded > taskCount) {
updateNumTasksAndForceTaskCreation(datastreamPartitions, numTasksNeeded, taskCount);
}
}

// TODO Implement metrics
// Doing assignment
LoadBasedPartitionAssigner partitionAssigner = new LoadBasedPartitionAssigner();
return partitionAssigner.assignPartitions(clusterThroughputInfo, currentAssignment,
Map<String, Set<DatastreamTask>> newAssignment = doAssignment(clusterThroughputInfo, currentAssignment,
unassignedPartitions, datastreamPartitions);
partitionSanityChecks(newAssignment, datastreamPartitions);
return newAssignment;
}

@VisibleForTesting
Map<String, Set<DatastreamTask>> doAssignment(ClusterThroughputInfo clusterThroughputInfo,
Map<String, Set<DatastreamTask>> currentAssignment, List<String> unassignedPartitions,
DatastreamGroupPartitionsMetadata datastreamPartitions) {
LoadBasedPartitionAssigner partitionAssigner = new LoadBasedPartitionAssigner();
Map<String, Set<DatastreamTask>> assignment = partitionAssigner.assignPartitions(clusterThroughputInfo,
currentAssignment, unassignedPartitions, datastreamPartitions);
LOG.info("new assignment info, assignment: {}", assignment);
return assignment;
}

private Map<String, ClusterThroughputInfo> fetchPartitionThroughputInfo() {
private ClusterThroughputInfo fetchPartitionThroughputInfo(DatastreamGroup datastreamGroup) {
AtomicInteger attemptNum = new AtomicInteger(0);
return PollUtils.poll(() -> {
try {
return _throughputProvider.getThroughputInfo();
return _throughputProvider.getThroughputInfo(datastreamGroup);
} catch (Exception ex) {
// TODO print exception and retry count
LOG.warn("Failed to fetch partition throughput info.");
attemptNum.set(attemptNum.get() + 1);
LOG.warn(String.format("Failed to fetch partition throughput info on attempt %d", attemptNum.get()), ex);
return null;
}
}, Objects::nonNull, _throughputInfoFetchRetryPeriodMs, _throughputInfoFetchTimeoutMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.slf4j.LoggerFactory;

import com.linkedin.datastream.common.zk.ZkClient;
import com.linkedin.datastream.server.DatastreamSourceClusterResolver;
import com.linkedin.datastream.server.DummyDatastreamSourceClusterResolver;
import com.linkedin.datastream.server.api.strategy.AssignmentStrategy;
import com.linkedin.datastream.server.providers.NoOpPartitionThroughputProvider;
import com.linkedin.datastream.server.providers.PartitionThroughputProvider;
Expand All @@ -40,9 +38,8 @@ public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties
}

PartitionThroughputProvider provider = constructPartitionThroughputProvider();
DatastreamSourceClusterResolver clusterResolver = constructDatastreamSourceClusterResolver();

return new LoadBasedPartitionAssignmentStrategy(provider, clusterResolver, _config.getMaxTasks(),
return new LoadBasedPartitionAssignmentStrategy(provider, _config.getMaxTasks(),
_config.getImbalanceThreshold(), _config.getMaxPartitions(), enableElasticTaskAssignment,
_config.getPartitionsPerTask(), _config.getPartitionFullnessThresholdPct(), _config.getTaskCapacityMBps(),
_config.getTaskCapacityUtilizationPct(), _config.getThroughputInfoFetchTimeoutMs(),
Expand All @@ -52,8 +49,4 @@ public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties
protected PartitionThroughputProvider constructPartitionThroughputProvider() {
return new NoOpPartitionThroughputProvider();
}

protected DatastreamSourceClusterResolver constructDatastreamSourceClusterResolver() {
return new DummyDatastreamSourceClusterResolver();
}
}
Loading