Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stats to DatastreamTaskImpl #855

Merged
merged 5 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,24 @@ public class DatastreamTaskImpl implements DatastreamTask {
// TODO: Investigate the requirement to populate both _partition and _partitionV2 and cleanup if required.
private List<String> _partitionsV2;

private List<String> _dependencies;
private final List<String> _dependencies;

@JsonIgnore
public String getStats() {
return _stats;
}

@JsonIgnore
public String getStatsFromZK() {
return getState("stats");
}

@JsonIgnore
public void setStats(String stats) {
_stats = stats;
}

private String _stats = "";

// Status to indicate if instance has hooked up and process this object
private ZkAdapter _zkAdapter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.linkedin.datastream.server.assignment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -19,12 +20,14 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

import com.linkedin.datastream.common.DatastreamRuntimeException;
import com.linkedin.datastream.common.JsonUtils;
import com.linkedin.datastream.metrics.BrooklinGaugeInfo;
import com.linkedin.datastream.metrics.BrooklinMetricInfo;
import com.linkedin.datastream.metrics.DynamicMetricsManager;
Expand Down Expand Up @@ -138,6 +141,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
}

// assign unrecognized partitions with round-robin
Map<String, Integer> unrecognizedPartitionCountPerTask = new HashMap<>();
Collections.shuffle(unrecognizedPartitions);
int index = 0;
for (String partition : unrecognizedPartitions) {
Expand All @@ -146,6 +150,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
newPartitions.get(currentTask).add(partition);
tasksWithChangedPartition.add(currentTask);
index = (index + 1) % tasks.size();
unrecognizedPartitionCountPerTask.put(currentTask, unrecognizedPartitionCountPerTask.getOrDefault(currentTask, 0) + 1);
}

AtomicInteger minPartitionsAcrossTasks = new AtomicInteger(Integer.MAX_VALUE);
Expand All @@ -158,13 +163,16 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
.map(task -> {
int partitionCount = newPartitions.containsKey(task.getId()) ? newPartitions.get(task.getId()).size() :
task.getPartitionsV2().size();

minPartitionsAcrossTasks.set(Math.min(minPartitionsAcrossTasks.get(), partitionCount));
maxPartitionsAcrossTasks.set(Math.max(maxPartitionsAcrossTasks.get(), partitionCount));
if (tasksWithChangedPartition.contains(task.getId())) {
return new DatastreamTaskImpl((DatastreamTaskImpl) task, newPartitions.get(task.getId()));
DatastreamTaskImpl newTask = new DatastreamTaskImpl((DatastreamTaskImpl) task, newPartitions.get(task.getId()));
saveStats(partitionInfoMap, taskThroughputMap, unrecognizedPartitionCountPerTask, task, partitionCount, newTask);
return newTask;
}
return task;
}).collect(Collectors.toSet());
return task;
}).collect(Collectors.toSet());
newAssignments.put(instance, newTasks);
});

Expand All @@ -179,6 +187,26 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
return newAssignments;
}

private void saveStats(Map<String, PartitionThroughputInfo> partitionInfoMap, Map<String, Integer> taskThroughputMap,
Map<String, Integer> unrecognizedPartitionCountPerTask, DatastreamTask task, int partitionCount,
DatastreamTaskImpl newTask) {
PartitionAssignmentStatPerTask stat = PartitionAssignmentStatPerTask.fromJson(((DatastreamTaskImpl) task).getStats());
if (partitionInfoMap.isEmpty()) {
stat.isThroughputRateLatest = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have a timestamp field here instead of having the latest flag, so that we get a sense of the last partition throughput distribution more accurately?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can add timestamp. We still need the latest flag, because not all the partition assignments will use Throughput based balancing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address it separately.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha! thanks

} else {
stat.throughputRate = taskThroughputMap.get(task.getId());
stat.isThroughputRateLatest = true;
}
stat.totalPartitions = partitionCount;
// ignores the partitions removed. This value will be approximate.
stat.partitionsWithUnknownThroughput += unrecognizedPartitionCountPerTask.getOrDefault(task.getId(), 0);
try {
newTask.setStats(stat.toJson());
} catch (IOException e) {
LOG.error("Exception while saving the stats to Json for task {}", task.getId(), e);
}
}

private void validatePartitionCountAndThrow(String datastream, int numTasks, int numPartitions,
int maxPartitionsPerTask) {
// conversion to long to avoid integer overflow
Expand Down Expand Up @@ -246,6 +274,65 @@ void unregisterMetricsForDatastream(String datastream) {
DYNAMIC_METRICS_MANAGER.unregisterMetric(CLASS_NAME, datastream, MAX_PARTITIONS_ACROSS_TASKS);
}

static class PartitionAssignmentStatPerTask {
private int throughputRate;
private int totalPartitions;
private int partitionsWithUnknownThroughput;
private boolean isThroughputRateLatest;

//getters and setters required for fromJson and toJson
public int getThroughputRate() {
return throughputRate;
}

public void setThroughputRate(int throughputRate) {
this.throughputRate = throughputRate;
}

public int getTotalPartitions() {
return totalPartitions;
}

public void setTotalPartitions(int totalPartitions) {
this.totalPartitions = totalPartitions;
}

public int getPartitionsWithUnknownThroughput() {
return partitionsWithUnknownThroughput;
}

public void setPartitionsWithUnknownThroughput(int partitionsWithUnknownThroughput) {
this.partitionsWithUnknownThroughput = partitionsWithUnknownThroughput;
}

public boolean getIsThroughputRateLatest() {
return isThroughputRateLatest;
}

public void setIsThroughputRateLatest(boolean isThroughputRateLatest) {
this.isThroughputRateLatest = isThroughputRateLatest;
}

/**
* Construct PartitionAssignmentStatPerTask from json string
* @param json JSON string of the PartitionAssignmentStatPerTask
*/
public static PartitionAssignmentStatPerTask fromJson(String json) {
PartitionAssignmentStatPerTask stat = new PartitionAssignmentStatPerTask();
if (StringUtils.isNotEmpty(json)) {
stat = JsonUtils.fromJson(json, PartitionAssignmentStatPerTask.class);
}
LOG.info("Loaded existing PartitionAssignmentStatPerTask: {}", stat);
return stat;
}

/**
* Get PartitionAssignmentStatPerTask serialized as JSON
*/
public String toJson() throws IOException {
return JsonUtils.toJson(this);
}
}
/**
* Encapsulates assignment metrics for a single datastream group
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
Expand Down Expand Up @@ -681,6 +682,11 @@ private void addTaskNodes(String instance, DatastreamTaskImpl task) {
KeyBuilder.datastreamTaskState(_cluster, task.getConnectorType(), task.getDatastreamTaskName());
_zkclient.ensurePath(taskStatePath);

// save the task stats.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the task node's directory structure in the method description above. This is a new subdirectory "stats" under the task, correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, "stats" directory will be inside "state" directory and will be conditional.

if (!StringUtils.isEmpty(task.getStats())) {
task.saveState("stats", task.getStats());
}

String instancePath = KeyBuilder.instanceAssignment(_cluster, instance, name);
String json = "";
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ public void assignFromScratchTest() {
Assert.assertTrue(assignedPartitions.contains("P1"));
Assert.assertTrue(assignedPartitions.contains("P2"));
Assert.assertTrue(assignedPartitions.contains("P3"));

String stats = (((DatastreamTaskImpl) task1).getStats());
LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask statObj = LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask.fromJson(stats);
Assert.assertTrue(statObj.getIsThroughputRateLatest());
Assert.assertEquals(statObj.getTotalPartitions(), 1);
Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 0);
Assert.assertEquals(statObj.getThroughputRate(), 5);
}

@Test
Expand Down Expand Up @@ -134,6 +141,13 @@ public void newAssignmentRetainsTasksFromOtherDatastreamsTest() {
DatastreamTask newTask = (DatastreamTask) allTasks.toArray()[0];
Assert.assertEquals(newTask.getPartitionsV2().size(), 1);
Assert.assertTrue(newTask.getPartitionsV2().contains("P3"));

String stats = (((DatastreamTaskImpl) newTask).getStats());
LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask statObj = LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask.fromJson(stats);
Assert.assertTrue(statObj.getIsThroughputRateLatest());
Assert.assertEquals(statObj.getTotalPartitions(), 1);
Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 0);
Assert.assertEquals(statObj.getThroughputRate(), 5);
}

@Test
Expand Down Expand Up @@ -162,6 +176,13 @@ public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() {
// verify that tasks got 2 partition each
Assert.assertEquals(task1.getPartitionsV2().size(), 2);
Assert.assertEquals(task2.getPartitionsV2().size(), 2);

String stats = (((DatastreamTaskImpl) task1).getStats());
LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask statObj = LoadBasedPartitionAssigner.PartitionAssignmentStatPerTask.fromJson(stats);
Assert.assertFalse(statObj.getIsThroughputRateLatest());
Assert.assertEquals(statObj.getTotalPartitions(), 2);
Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 2);
Assert.assertEquals(statObj.getThroughputRate(), 0);
}

@Test
Expand Down