Skip to content

Commit

Permalink
Add handling for skipping tasks not emitting watermarks from the begi…
Browse files Browse the repository at this point in the history
…nning
  • Loading branch information
Xinyu Liu committed Aug 14, 2024
1 parent a5fd2e9 commit 97fab20
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ private final static class WatermarkState {
private final Map<String, Long> timestamps = new HashMap<>();
private final Map<String, Long> lastUpdateTime = new HashMap<>();
private final long watermarkIdleTime;
private final long createTime;
private final LongSupplier systemTimeFunc;
private volatile long watermarkTime = WATERMARK_NOT_EXIST;

WatermarkState(int expectedTotal, long watermarkIdleTime, LongSupplier systemTimeFunc) {
this.expectedTotal = expectedTotal;
this.watermarkIdleTime = watermarkIdleTime;
this.systemTimeFunc = systemTimeFunc;
this.createTime = systemTimeFunc.getAsLong();
}

synchronized void update(long timestamp, String taskName) {
Expand All @@ -77,22 +79,30 @@ synchronized void update(long timestamp, String taskName) {
if (taskName == null) {
// we get watermark either from the source or from the aggregator task
watermarkTime = Math.max(watermarkTime, timestamp);
} else if (timestamps.size() == expectedTotal) {
// For any intermediate streams, the expectedTotal is the upstream task count.
// Check whether we got all the watermarks, and set the watermark to be the min.
} else if (canUpdateWatermark(currentTime)) {
Optional<Long> min;
if (watermarkIdleTime <= 0) {
// All upstream tasks are required in the computation
min = timestamps.values().stream().min(Long::compare);
} else {
// Exclude the tasks that have been idle in watermark emission.
min = timestamps.entrySet().stream()
.filter(t -> currentTime - lastUpdateTime.get(t.getKey()) < watermarkIdleTime)
.map(Map.Entry::getValue).min(Long::compare);
.map(Map.Entry::getValue)
.min(Long::compare);
}
watermarkTime = min.orElse(timestamp);
watermarkTime = Math.max(watermarkTime, min.orElse(timestamp));
}
}

private boolean canUpdateWatermark(long currentTime) {
// The watermark can be updated if
// 1. we received watermarks from all upstream tasks, or
// 2. we allow task idle in emitting watermarks and the idle time has passed.
return (timestamps.size() == expectedTotal)
|| (watermarkIdleTime > 0 && currentTime - createTime > watermarkIdleTime);
}

long getWatermarkTime() {
return watermarkTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,36 +126,63 @@ public void testIdle() {
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
TASK_WATERMARK_IDLE_MS, new MockSystemTime());

// First wm is marked before the idle time, so it's not counted
WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

// Watermark is computed based on "task 1" since "task 0" passes the idle time
watermarkMessage = new WatermarkMessage(5L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

// Watermark is computed based on "task 0" since the time already passes the idle threshold
watermarkMessage = new WatermarkMessage(6L, "task 0");
watermarkStates.update(watermarkMessage, intPartition1);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 6L);
assertEquals(watermarkStates.getWatermark(intermediate), 5L);

// watermark from task 1 on int p1 to 4
watermarkMessage = new WatermarkMessage(10L, "task 1");
// Watermark from "task 1" is less than current watermark, ignore
watermarkMessage = new WatermarkMessage(2L, "task 1");
watermarkStates.update(watermarkMessage, intPartition1);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 6L);
// verify we got a watermark 3 (min) for int stream
// verify we got a watermark (min) for int stream
assertEquals(watermarkStates.getWatermark(intermediate), 5L);

// Watermark from "task 0" is updated, but less than current watermark
watermarkMessage = new WatermarkMessage(3L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
assertEquals(watermarkStates.getWatermark(intermediate), 5L);

// Watermark is computed this time due to advance in "task 0"
watermarkMessage = new WatermarkMessage(7L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
assertEquals(watermarkStates.getWatermark(intermediate), 5L);

// Watermark is computed this time due to advance in "task 1"
watermarkMessage = new WatermarkMessage(10L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 7L);
assertEquals(watermarkStates.getWatermark(intermediate), 6L);
}

static class MockSystemTime implements LongSupplier {
boolean firstTime = true;
class MockSystemTime implements LongSupplier {
int createTime = 0;
boolean firstWatermark = true;

@Override
public long getAsLong() {
if (firstTime) {
firstTime = false;
if (createTime < ssps.size()) {
createTime++;
return System.currentTimeMillis() - TASK_WATERMARK_IDLE_MS;
}

if (firstWatermark) {
firstWatermark = false;
// Make the first task idle
return System.currentTimeMillis() - TASK_WATERMARK_IDLE_MS;
} else {
Expand Down

0 comments on commit 97fab20

Please sign in to comment.