diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimitCalculator.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimitCalculator.java
new file mode 100644
index 00000000000..31db9e0508a
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimitCalculator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.threadpool;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link java.lang.Runtime#freeMemory()} technology is used to calculate the
+ * memory limit by using the percentage of the current maximum available memory,
+ * which can be used with {@link MemoryLimiter}.
+ *
+ * @see MemoryLimiter
+ * @see MemoryLimitCalculator
+ */
+public class MemoryLimitCalculator {
+
+ private static volatile long maxAvailable;
+
+ private static final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor();
+
+ static {
+ // immediately refresh when this class is loaded to prevent maxAvailable from being 0
+ refresh();
+ // check every 50 ms to improve performance
+ SCHEDULER.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);
+ Runtime.getRuntime().addShutdownHook(new Thread(SCHEDULER::shutdown));
+ }
+
+ private static void refresh() {
+ maxAvailable = Runtime.getRuntime().freeMemory();
+ }
+
+ /**
+ * Get the maximum available memory of the current JVM.
+ *
+ * @return maximum available memory
+ */
+ public static long maxAvailable() {
+ return maxAvailable;
+ }
+
+ /**
+ * Take the current JVM's maximum available memory
+ * as a percentage of the result as the limit.
+ *
+ * @param percentage percentage
+ * @return available memory
+ */
+ public static long calculate(final float percentage) {
+ if (percentage <= 0 || percentage > 1) {
+ throw new IllegalArgumentException();
+ }
+ return (long) (maxAvailable() * percentage);
+ }
+
+ /**
+ * By default, it takes 80% of the maximum available memory of the current JVM.
+ *
+ * @return available memory
+ */
+ public static long defaultLimit() {
+ return (long) (maxAvailable() * 0.8);
+ }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimiter.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimiter.java
index d1db0d16eb8..b26e37eb27f 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimiter.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemoryLimiter.java
@@ -122,7 +122,8 @@ public boolean acquire(Object e) {
return false;
}
memory.add(objectSize);
- if (sum < memoryLimit) {
+ // see https://github.com/apache/incubator-shenyu/pull/3356
+ if (memory.sum() < memoryLimit) {
notLimited.signal();
}
} finally {
@@ -140,13 +141,13 @@ public void acquireInterruptibly(Object e) throws InterruptedException {
}
acquireLock.lockInterruptibly();
try {
- final long sum = memory.sum();
final long objectSize = inst.getObjectSize(e);
- while (sum + objectSize >= memoryLimit) {
+ // see https://github.com/apache/incubator-shenyu/pull/3335
+ while (memory.sum() + objectSize >= memoryLimit) {
notLimited.await();
}
memory.add(objectSize);
- if (sum < memoryLimit) {
+ if (memory.sum() < memoryLimit) {
notLimited.signal();
}
} finally {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueue.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueue.java
new file mode 100644
index 00000000000..aae67b618c8
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueue.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.threadpool;
+
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue},
+ * does not depend on {@link java.lang.instrument.Instrumentation} and is easier to use than
+ * {@link MemoryLimitedLinkedBlockingQueue}.
+ *
+ * @see MemorySafeLinkedBlockingQueue
+ */
+public class MemorySafeLinkedBlockingQueue extends LinkedBlockingQueue {
+
+ private static final long serialVersionUID = 8032578371739960142L;
+
+ public static int THE_256_MB = 256 * 1024 * 1024;
+
+ private int maxFreeMemory;
+
+ public MemorySafeLinkedBlockingQueue() {
+ this(THE_256_MB);
+ }
+
+ public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
+ super(Integer.MAX_VALUE);
+ this.maxFreeMemory = maxFreeMemory;
+ }
+
+ public MemorySafeLinkedBlockingQueue(final Collection extends E> c,
+ final int maxFreeMemory) {
+ super(c);
+ this.maxFreeMemory = maxFreeMemory;
+ }
+
+ /**
+ * set the max free memory.
+ *
+ * @param maxFreeMemory the max free memory
+ */
+ public void setMaxFreeMemory(final int maxFreeMemory) {
+ this.maxFreeMemory = maxFreeMemory;
+ }
+
+ /**
+ * get the max free memory.
+ *
+ * @return the max free memory limit
+ */
+ public int getMaxFreeMemory() {
+ return maxFreeMemory;
+ }
+
+ /**
+ * determine if there is any remaining free memory.
+ *
+ * @return true if has free memory
+ */
+ public boolean hasRemainedMemory() {
+ return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
+ }
+
+ @Override
+ public void put(final E e) throws InterruptedException {
+ if (hasRemainedMemory()) {
+ super.put(e);
+ }
+ }
+
+ @Override
+ public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
+ return hasRemainedMemory() && super.offer(e, timeout, unit);
+ }
+
+ @Override
+ public boolean offer(final E e) {
+ return hasRemainedMemory() && super.offer(e);
+ }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
index eb14fba089c..b053bd9337a 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
@@ -18,6 +18,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
@@ -54,7 +55,7 @@ public Executor getExecutor(URL url) {
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue() :
- (queues < 0 ? new LinkedBlockingQueue()
+ (queues < 0 ? new MemorySafeLinkedBlockingQueue()
: new LinkedBlockingQueue(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/fixed/FixedThreadPool.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/fixed/FixedThreadPool.java
index 71ee07449a1..de1fb7ee683 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/fixed/FixedThreadPool.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/fixed/FixedThreadPool.java
@@ -18,6 +18,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
@@ -48,7 +49,7 @@ public Executor getExecutor(URL url) {
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue() :
- (queues < 0 ? new LinkedBlockingQueue()
+ (queues < 0 ? new MemorySafeLinkedBlockingQueue()
: new LinkedBlockingQueue(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
index 25e30039004..f0203e6676b 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
@@ -19,6 +19,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
@@ -51,7 +52,7 @@ public Executor getExecutor(URL url) {
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue() :
- (queues < 0 ? new LinkedBlockingQueue()
+ (queues < 0 ? new MemorySafeLinkedBlockingQueue()
: new LinkedBlockingQueue(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueueTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueueTest.java
new file mode 100644
index 00000000000..dd7f095b887
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/MemorySafeLinkedBlockingQueueTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.threadpool;
+
+import net.bytebuddy.agent.ByteBuddyAgent;
+import org.junit.jupiter.api.Test;
+
+import java.lang.instrument.Instrumentation;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class MemorySafeLinkedBlockingQueueTest {
+ @Test
+ public void test() throws Exception {
+ ByteBuddyAgent.install();
+ final Instrumentation instrumentation = ByteBuddyAgent.getInstrumentation();
+ final long objectSize = instrumentation.getObjectSize((Runnable) () -> {
+ });
+ int maxFreeMemory = (int) MemoryLimitCalculator.maxAvailable();
+ MemorySafeLinkedBlockingQueue queue = new MemorySafeLinkedBlockingQueue<>(maxFreeMemory);
+ // all memory is reserved for JVM, so it will fail here
+ assertThat(queue.offer(() -> {
+ }), is(false));
+
+ // maxFreeMemory-objectSize Byte memory is reserved for the JVM, so this will succeed
+ queue.setMaxFreeMemory((int) (MemoryLimitCalculator.maxAvailable() - objectSize));
+ assertThat(queue.offer(() -> {
+ }), is(true));
+ }
+}