diff --git a/metrics-core/src/main/java/com/codahale/metrics/InstrumentedExecutorService.java b/metrics-core/src/main/java/com/codahale/metrics/InstrumentedExecutorService.java index 96035150b2..8cb14d0274 100644 --- a/metrics-core/src/main/java/com/codahale/metrics/InstrumentedExecutorService.java +++ b/metrics-core/src/main/java/com/codahale/metrics/InstrumentedExecutorService.java @@ -25,6 +25,8 @@ public class InstrumentedExecutorService implements ExecutorService { private static final AtomicLong NAME_COUNTER = new AtomicLong(); private final ExecutorService delegate; + private final MetricRegistry registry; + private final String name; private final Meter submitted; private final Counter running; private final Meter completed; @@ -50,12 +52,18 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi */ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) { this.delegate = delegate; + this.registry = registry; + this.name = name; this.submitted = registry.meter(MetricRegistry.name(name, "submitted")); this.running = registry.counter(MetricRegistry.name(name, "running")); this.completed = registry.meter(MetricRegistry.name(name, "completed")); this.idle = registry.timer(MetricRegistry.name(name, "idle")); this.duration = registry.timer(MetricRegistry.name(name, "duration")); + registerInternalMetrics(); + } + + private void registerInternalMetrics() { if (delegate instanceof ThreadPoolExecutor) { ThreadPoolExecutor executor = (ThreadPoolExecutor) delegate; registry.registerGauge(MetricRegistry.name(name, "pool.size"), @@ -86,6 +94,23 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi } } + private void removeInternalMetrics() { + if (delegate instanceof ThreadPoolExecutor) { + registry.remove(MetricRegistry.name(name, "pool.size")); + registry.remove(MetricRegistry.name(name, "pool.core")); + registry.remove(MetricRegistry.name(name, "pool.max")); + registry.remove(MetricRegistry.name(name, "tasks.active")); + registry.remove(MetricRegistry.name(name, "tasks.completed")); + registry.remove(MetricRegistry.name(name, "tasks.queued")); + registry.remove(MetricRegistry.name(name, "tasks.capacity")); + } else if (delegate instanceof ForkJoinPool) { + registry.remove(MetricRegistry.name(name, "tasks.stolen")); + registry.remove(MetricRegistry.name(name, "tasks.queued")); + registry.remove(MetricRegistry.name(name, "threads.active")); + registry.remove(MetricRegistry.name(name, "threads.running")); + } + } + /** * {@inheritDoc} */ @@ -173,11 +198,14 @@ private Collection> instrument(Collection shutdownNow() { - return delegate.shutdownNow(); + List remainingTasks = delegate.shutdownNow(); + removeInternalMetrics(); + return remainingTasks; } @Override diff --git a/metrics-core/src/test/java/com/codahale/metrics/InstrumentedExecutorServiceTest.java b/metrics-core/src/test/java/com/codahale/metrics/InstrumentedExecutorServiceTest.java index 76e026b0b6..a280784f4a 100644 --- a/metrics-core/src/test/java/com/codahale/metrics/InstrumentedExecutorServiceTest.java +++ b/metrics-core/src/test/java/com/codahale/metrics/InstrumentedExecutorServiceTest.java @@ -10,7 +10,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -167,9 +166,22 @@ public void reportsTasksInformationForThreadPoolExecutor() throws Exception { assertThat(poolSize.getValue()).isEqualTo(1); } + @Test + public void removesMetricsAfterShutdownForThreadPoolExecutor() { + executor = new ThreadPoolExecutor(4, 16, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(32)); + instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "stp"); + + assertThat(registry.getMetrics()).containsKeys("stp.pool.size", "stp.pool.core", "stp.pool.max", "stp.tasks.active", "stp.tasks.completed", "stp.tasks.queued", "stp.tasks.capacity"); + + instrumentedExecutorService.shutdown(); + + assertThat(registry.getMetrics()).doesNotContainKeys("stp.pool.size", "stp.pool.core", "stp.pool.max", "stp.tasks.active", "stp.tasks.completed", "stp.tasks.queued", "stp.tasks.capacity"); + } + @Test @SuppressWarnings("unchecked") - public void reportsTasksInformationForForkJoinPool() throws Exception { + public void reportsTasksInformationForForkJoinPool() { executor = Executors.newWorkStealingPool(4); instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "fjp"); submitted = registry.meter("fjp.submitted"); @@ -215,4 +227,16 @@ public void reportsTasksInformationForForkJoinPool() throws Exception { assertThat(idle.getCount()).isEqualTo(1); assertThat(idle.getSnapshot().size()).isEqualTo(1); } + + @Test + public void removesMetricsAfterShutdownForForkJoinPool() { + executor = Executors.newWorkStealingPool(4); + instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "sfjp"); + + assertThat(registry.getMetrics()).containsKeys("sfjp.tasks.stolen", "sfjp.tasks.queued", "sfjp.threads.active", "sfjp.threads.running"); + + instrumentedExecutorService.shutdown(); + + assertThat(registry.getMetrics()).doesNotContainKeys("sfjp.tasks.stolen", "sfjp.tasks.queued", "sfjp.threads.active", "sfjp.threads.running"); + } }