Skip to content

Commit

Permalink
Added configs for fetching throughput info (#834)
Browse files Browse the repository at this point in the history
  • Loading branch information
jzakaryan authored Jun 9, 2021
1 parent 873a25d commit 90703bd
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
* Performs partition assignment based on partition throughput information
*/
public class LoadBasedPartitionAssigner {
// TODO: move these to config class
private static final Integer DEFAULT_KB_RATE = 5;
private static final Integer DEFAULT_MESSAGE_RATE = 5;

/**
* Performs partition assignment based on partition throughput information.
* <p>
Expand Down Expand Up @@ -61,7 +57,9 @@ public Map<String, Set<DatastreamTask>> assignPartitions(ClusterThroughputInfo t

// sort the current assignment's tasks on total throughput
Map<String, Integer> taskThroughputMap = new HashMap<>();
PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(DEFAULT_KB_RATE, DEFAULT_MESSAGE_RATE, "");
PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(
PartitionAssignmentStrategyConfig.PARTITION_BYTES_IN_KB_RATE_DEFAULT,
PartitionAssignmentStrategyConfig.PARTITION_MESSAGES_IN_RATE_DEFAULT, "");
newPartitions.forEach((task, partitions) -> {
int totalThroughput = partitions.stream()
.mapToInt(p -> partitionInfoMap.getOrDefault(p, defaultPartitionInfo).getBytesInKBRate())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@
public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignmentStrategy {
private static final Logger LOG = LoggerFactory.getLogger(LoadBasedPartitionAssignmentStrategy.class.getName());

// TODO Make these constants configurable
private static final long THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT = Duration.ofSeconds(10).toMillis();
private static final long THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT = Duration.ofSeconds(1).toMillis();
private static final int THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT = (int) Duration.ofSeconds(10).toMillis();
private static final int THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT = (int) Duration.ofSeconds(1).toMillis();

private static final int TASK_CAPACITY_MBPS_DEFAULT = 4;
private static final int TASK_CAPACITY_UTILIZATION_PCT_DEFAULT = 90;
Expand All @@ -48,6 +47,8 @@ public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignm
private final DatastreamSourceClusterResolver _sourceClusterResolver;
private final int _taskCapacityMBps;
private final int _taskCapacityUtilizationPct;
private final int _throughputInfoFetchTimeoutMs;
private final int _throughputInfoFetchRetryPeriodMs;


/**
Expand All @@ -57,14 +58,19 @@ public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughp
DatastreamSourceClusterResolver sourceClusterResolver, Optional<Integer> maxTasks,
Optional<Integer> imbalanceThreshold, Optional<Integer> maxPartitionPerTask, boolean enableElasticTaskAssignment,
Optional<Integer> partitionsPerTask, Optional<Integer> partitionFullnessFactorPct,
Optional<Integer> taskCapacityMBps, Optional<Integer> taskCapacityUtilizationPct, Optional<ZkClient> zkClient,
Optional<Integer> taskCapacityMBps, Optional<Integer> taskCapacityUtilizationPct,
Optional<Integer> throughputInfoFetchTimeoutMs, Optional<Integer> throughputInfoFetchRetryPeriodMs,
Optional<ZkClient> zkClient,
String clusterName) {
super(maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask,
partitionFullnessFactorPct, zkClient, clusterName);
_throughputProvider = throughputProvider;
_sourceClusterResolver = sourceClusterResolver;
_taskCapacityMBps = taskCapacityMBps.orElse(TASK_CAPACITY_MBPS_DEFAULT);
_taskCapacityUtilizationPct = taskCapacityUtilizationPct.orElse(TASK_CAPACITY_UTILIZATION_PCT_DEFAULT);
_throughputInfoFetchTimeoutMs = throughputInfoFetchTimeoutMs.orElse(THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT);
_throughputInfoFetchRetryPeriodMs = throughputInfoFetchRetryPeriodMs.
orElse(THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT);
}

@Override
Expand Down Expand Up @@ -123,7 +129,7 @@ private Map<String, ClusterThroughputInfo> fetchPartitionThroughputInfo() {
LOG.warn("Failed to fetch partition throughput info.");
return null;
}
}, Objects::nonNull, THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT, THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT)
}, Objects::nonNull, _throughputInfoFetchRetryPeriodMs, _throughputInfoFetchTimeoutMs)
.orElseThrow(RetriesExhaustedException::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties
return new LoadBasedPartitionAssignmentStrategy(provider, clusterResolver, _config.getMaxTasks(),
_config.getImbalanceThreshold(), _config.getMaxPartitions(), enableElasticTaskAssignment,
_config.getPartitionsPerTask(), _config.getPartitionFullnessThresholdPct(), _config.getTaskCapacityMBps(),
_config.getTaskCapacityUtilizationPct(), zkClient, _config.getCluster());
_config.getTaskCapacityUtilizationPct(), _config.getThroughputInfoFetchTimeoutMs(),
_config.getThroughputInfoFetchRetryPeriodMs(), zkClient, _config.getCluster());
}

protected PartitionThroughputProvider constructPartitionThroughputProvider() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
*/
public class LoadBasedTaskCountEstimator {
private static final Logger LOG = LoggerFactory.getLogger(LoadBasedTaskCountEstimator.class.getName());
// TODO Move these to config class
private static final int BYTES_IN_KB_RATE_DEFAULT = 5;
private static final int MESSAGES_IN_RATE_DEFAULT = 5;

private final int _taskCapacityMBps;
private final int _taskCapacityUtilizationPct;

Expand Down Expand Up @@ -64,8 +60,9 @@ public int getTaskCount(ClusterThroughputInfo throughputInfo, List<String> assig
Set<String> allPartitions = new HashSet<>(assignedPartitions);
allPartitions.addAll(unassignedPartitions);

PartitionThroughputInfo defaultThroughputInfo = new PartitionThroughputInfo(BYTES_IN_KB_RATE_DEFAULT,
MESSAGES_IN_RATE_DEFAULT, "");
PartitionThroughputInfo defaultThroughputInfo = new PartitionThroughputInfo(
PartitionAssignmentStrategyConfig.PARTITION_BYTES_IN_KB_RATE_DEFAULT,
PartitionAssignmentStrategyConfig.PARTITION_MESSAGES_IN_RATE_DEFAULT, "");
// total throughput in KB/sec
int totalThroughput = allPartitions.stream()
.map(p -> throughputMap.getOrDefault(p, defaultThroughputInfo))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public final class PartitionAssignmentStrategyConfig {
public static final String CFG_MAX_PARTITION_PER_TASK = "maxPartitionsPerTask";
public static final String CFG_PARTITIONS_PER_TASK = "partitionsPerTask";
public static final String CFG_PARTITION_FULLNESS_THRESHOLD_PCT = "partitionFullnessThresholdPct";
public static final String CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS = "throughputInfoFetchTimeoutMs";
public static final String CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS = "throughputInfoFetchRetryPeriodMs";
public static final String CFG_TASK_CAPACITY_MBPS = "taskCapacityMBps";
public static final String CFG_TASK_CAPACITY_UTILIZATION_PCT = "taskCapacityUtilizationPct";
public static final String CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT = "enableElasticTaskAssignment";
Expand All @@ -30,6 +32,9 @@ public final class PartitionAssignmentStrategyConfig {
public static final String CFG_ZK_SESSION_TIMEOUT = "zkSessionTimeout";
public static final String CFG_ZK_CONNECTION_TIMEOUT = "zkConnectionTimeout";

public static final int PARTITION_BYTES_IN_KB_RATE_DEFAULT = 5;
public static final int PARTITION_MESSAGES_IN_RATE_DEFAULT = 5;

public static final boolean DEFAULT_ENABLE_ELASTIC_TASK_ASSIGNMENT = false;

private final Properties _config;
Expand All @@ -40,6 +45,8 @@ public final class PartitionAssignmentStrategyConfig {
private final Optional<Integer> _partitionFullnessThresholdPct;
private final Optional<Integer> _taskCapacityMBps;
private final Optional<Integer> _taskCapacityUtilizationPct;
private final Optional<Integer> _throughputInfoFetchTimeoutMs;
private final Optional<Integer> _throughputInfoFetchRetryPeriodMs;
private final String _cluster;
private final String _zkAddress;
private final int _zkSessionTimeout;
Expand All @@ -60,6 +67,8 @@ public PartitionAssignmentStrategyConfig(Properties config) {
int cfgPartitionFullnessThresholdPct = props.getIntInRange(CFG_PARTITION_FULLNESS_THRESHOLD_PCT, 0, 0, 100);
int cfgTaskCapacityMBps = props.getInt(CFG_TASK_CAPACITY_MBPS, 0);
int cfgTaskCapacityUtilizationPct = props.getIntInRange(CFG_TASK_CAPACITY_UTILIZATION_PCT, 0, 0, 100);
int cfgThroughputInfoFetchTimeoutMs = props.getInt(CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS, 0);
int cfgThroughputInfoFetchRetryPeriodMs = props.getInt(CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS, 0);

// Set to Optional.empty() if the value is 0
_maxTasks = cfgMaxTasks > 0 ? Optional.of(cfgMaxTasks) : Optional.empty();
Expand All @@ -74,6 +83,10 @@ public PartitionAssignmentStrategyConfig(Properties config) {
_taskCapacityMBps = cfgTaskCapacityMBps > 0 ? Optional.of(cfgTaskCapacityMBps) : Optional.empty();
_taskCapacityUtilizationPct = cfgTaskCapacityUtilizationPct > 0 ? Optional.of(cfgTaskCapacityUtilizationPct) :
Optional.empty();
_throughputInfoFetchTimeoutMs = cfgThroughputInfoFetchTimeoutMs > 0 ?
Optional.of(cfgThroughputInfoFetchTimeoutMs) : Optional.empty();
_throughputInfoFetchRetryPeriodMs = cfgThroughputInfoFetchRetryPeriodMs > 0 ?
Optional.of(cfgThroughputInfoFetchRetryPeriodMs) : Optional.empty();
_cluster = props.getString(CFG_CLUSTER_NAME, null);
_zkAddress = props.getString(CFG_ZK_ADDRESS, null);
_zkSessionTimeout = props.getInt(CFG_ZK_SESSION_TIMEOUT, ZkClient.DEFAULT_SESSION_TIMEOUT);
Expand Down Expand Up @@ -136,6 +149,22 @@ public Optional<Integer> getTaskCapacityUtilizationPct() {
return _taskCapacityUtilizationPct;
}

/**
* Gets throughput info fetch timeout in milliseconds
* @return Throughput info fetch timeout in milliseconds
*/
public Optional<Integer> getThroughputInfoFetchTimeoutMs() {
return _throughputInfoFetchTimeoutMs;
}

/**
* Gets the throughput info fetch retry period in milliseconds
* @return Throughput info fetch retry period in milliseconds
*/
public Optional<Integer> getThroughputInfoFetchRetryPeriodMs() {
return _throughputInfoFetchRetryPeriodMs;
}

/**
* Gets cluster
* @return Cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class TestPartitionAssignmentStrategyConfig {
private static final String CFG_MAX_TASKS_VALUE = "20";
private static final String CFG_PARTITIONS_PER_TASK_VALUE = "15";
private static final String CFG_PARTITIONS_FULLNESS_THRESHOLD_PCT_VALUE = "75";
private static final String CFG_TASK_CAPACITY_MBPS_VALUE = "5";
private static final String CFG_TASK_CAPACITY_UTILIZATION_PCT_VALUE = "82";
private static final String CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS_VALUE = "1100";
private static final String CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_VALUE = "1200";
private static final String CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT_VALUE = "true";
private static final String CFG_ZK_ADDRESS_VALUE = "dummyZk";
private static final String CFG_ZK_SESSION_TIMEOUT_VALUE = "1000";
Expand All @@ -42,6 +46,13 @@ public void configValuesCorrectlyAssignedTest() {
props.setProperty(PartitionAssignmentStrategyConfig.CFG_PARTITIONS_PER_TASK, CFG_PARTITIONS_PER_TASK_VALUE);
props.setProperty(PartitionAssignmentStrategyConfig.CFG_PARTITION_FULLNESS_THRESHOLD_PCT,
CFG_PARTITIONS_FULLNESS_THRESHOLD_PCT_VALUE);
props.setProperty(PartitionAssignmentStrategyConfig.CFG_TASK_CAPACITY_MBPS, CFG_TASK_CAPACITY_MBPS_VALUE);
props.setProperty(PartitionAssignmentStrategyConfig.CFG_TASK_CAPACITY_UTILIZATION_PCT,
CFG_TASK_CAPACITY_UTILIZATION_PCT_VALUE);
props.setProperty(PartitionAssignmentStrategyConfig.CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS,
CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS_VALUE);
props.setProperty(PartitionAssignmentStrategyConfig.CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS,
CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_VALUE);
props.setProperty(PartitionAssignmentStrategyConfig.CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT,
CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT_VALUE);
props.setProperty(PartitionAssignmentStrategyConfig.CFG_ZK_ADDRESS, CFG_ZK_ADDRESS_VALUE);
Expand All @@ -56,6 +67,13 @@ public void configValuesCorrectlyAssignedTest() {
Assert.assertEquals(config.getMaxPartitions(), Optional.of(Integer.parseInt(CFG_MAX_PARTITION_PER_TASK_VALUE)));
Assert.assertEquals(config.getPartitionFullnessThresholdPct(),
Optional.of(Integer.parseInt(CFG_PARTITIONS_FULLNESS_THRESHOLD_PCT_VALUE)));
Assert.assertEquals(config.getTaskCapacityMBps(), Optional.of(Integer.parseInt(CFG_TASK_CAPACITY_MBPS_VALUE)));
Assert.assertEquals(config.getTaskCapacityUtilizationPct(),
Optional.of(Integer.parseInt(CFG_TASK_CAPACITY_UTILIZATION_PCT_VALUE)));
Assert.assertEquals(config.getThroughputInfoFetchTimeoutMs(),
Optional.of(Integer.parseInt(CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS_VALUE)));
Assert.assertEquals(config.getThroughputInfoFetchRetryPeriodMs(),
Optional.of(Integer.parseInt(CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_VALUE)));
Assert.assertEquals(config.isElasticTaskAssignmentEnabled(),
Boolean.parseBoolean(CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT_VALUE));
Assert.assertEquals(config.getZkAddress(), CFG_ZK_ADDRESS_VALUE);
Expand Down

0 comments on commit 90703bd

Please sign in to comment.