Skip to content

Commit

Permalink
Clean up InstrumentedExecutorService metrics after shutdown (#3202)
Browse files Browse the repository at this point in the history
Fixes #2920
  • Loading branch information
the-thing authored Oct 3, 2024
1 parent 1ad3f16 commit 90fa922
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class InstrumentedExecutorService implements ExecutorService {
private static final AtomicLong NAME_COUNTER = new AtomicLong();

private final ExecutorService delegate;
private final MetricRegistry registry;
private final String name;
private final Meter submitted;
private final Counter running;
private final Meter completed;
Expand All @@ -50,12 +52,18 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi
*/
public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) {
this.delegate = delegate;
this.registry = registry;
this.name = name;
this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));
this.running = registry.counter(MetricRegistry.name(name, "running"));
this.completed = registry.meter(MetricRegistry.name(name, "completed"));
this.idle = registry.timer(MetricRegistry.name(name, "idle"));
this.duration = registry.timer(MetricRegistry.name(name, "duration"));

registerInternalMetrics();
}

private void registerInternalMetrics() {
if (delegate instanceof ThreadPoolExecutor) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) delegate;
registry.registerGauge(MetricRegistry.name(name, "pool.size"),
Expand Down Expand Up @@ -86,6 +94,23 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi
}
}

private void removeInternalMetrics() {
if (delegate instanceof ThreadPoolExecutor) {
registry.remove(MetricRegistry.name(name, "pool.size"));
registry.remove(MetricRegistry.name(name, "pool.core"));
registry.remove(MetricRegistry.name(name, "pool.max"));
registry.remove(MetricRegistry.name(name, "tasks.active"));
registry.remove(MetricRegistry.name(name, "tasks.completed"));
registry.remove(MetricRegistry.name(name, "tasks.queued"));
registry.remove(MetricRegistry.name(name, "tasks.capacity"));
} else if (delegate instanceof ForkJoinPool) {
registry.remove(MetricRegistry.name(name, "tasks.stolen"));
registry.remove(MetricRegistry.name(name, "tasks.queued"));
registry.remove(MetricRegistry.name(name, "threads.active"));
registry.remove(MetricRegistry.name(name, "threads.running"));
}
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -173,11 +198,14 @@ private <T> Collection<? extends Callable<T>> instrument(Collection<? extends Ca
@Override
public void shutdown() {
delegate.shutdown();
removeInternalMetrics();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
List<Runnable> remainingTasks = delegate.shutdownNow();
removeInternalMetrics();
return remainingTasks;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -167,9 +166,22 @@ public void reportsTasksInformationForThreadPoolExecutor() throws Exception {
assertThat(poolSize.getValue()).isEqualTo(1);
}

@Test
public void removesMetricsAfterShutdownForThreadPoolExecutor() {
executor = new ThreadPoolExecutor(4, 16,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(32));
instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "stp");

assertThat(registry.getMetrics()).containsKeys("stp.pool.size", "stp.pool.core", "stp.pool.max", "stp.tasks.active", "stp.tasks.completed", "stp.tasks.queued", "stp.tasks.capacity");

instrumentedExecutorService.shutdown();

assertThat(registry.getMetrics()).doesNotContainKeys("stp.pool.size", "stp.pool.core", "stp.pool.max", "stp.tasks.active", "stp.tasks.completed", "stp.tasks.queued", "stp.tasks.capacity");
}

@Test
@SuppressWarnings("unchecked")
public void reportsTasksInformationForForkJoinPool() throws Exception {
public void reportsTasksInformationForForkJoinPool() {
executor = Executors.newWorkStealingPool(4);
instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "fjp");
submitted = registry.meter("fjp.submitted");
Expand Down Expand Up @@ -215,4 +227,16 @@ public void reportsTasksInformationForForkJoinPool() throws Exception {
assertThat(idle.getCount()).isEqualTo(1);
assertThat(idle.getSnapshot().size()).isEqualTo(1);
}

@Test
public void removesMetricsAfterShutdownForForkJoinPool() {
executor = Executors.newWorkStealingPool(4);
instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "sfjp");

assertThat(registry.getMetrics()).containsKeys("sfjp.tasks.stolen", "sfjp.tasks.queued", "sfjp.threads.active", "sfjp.threads.running");

instrumentedExecutorService.shutdown();

assertThat(registry.getMetrics()).doesNotContainKeys("sfjp.tasks.stolen", "sfjp.tasks.queued", "sfjp.threads.active", "sfjp.threads.running");
}
}

0 comments on commit 90fa922

Please sign in to comment.