From b5fd13d458ba25ab08579a5b5149c0a1d80ea83f Mon Sep 17 00:00:00 2001 From: codingprh Date: Fri, 10 Jun 2022 09:57:04 +0800 Subject: [PATCH 1/4] feat: add MemorySafeLinkedBlockingQueue --- .../threadpool/MemoryLimitCalculator.java | 81 ++++++ .../rpc/common/threadpool/MemoryLimiter.java | 270 ++++++++++++++++++ .../MemorySafeLinkedBlockingQueue.java | 96 +++++++ .../rpc/common/utils/ThreadPoolUtils.java | 4 +- .../MemorySafeLinkedBlockingQueueTest.java | 30 ++ 5 files changed, 480 insertions(+), 1 deletion(-) create mode 100644 core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java create mode 100644 core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java create mode 100644 core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java create mode 100644 core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java new file mode 100644 index 000000000..b188cdcfb --- /dev/null +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alipay.sofa.rpc.common.threadpool; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * {@link java.lang.Runtime#freeMemory()} technology is used to calculate the + * memory limit by using the percentage of the current maximum available memory, + * which can be used with {@link MemoryLimiter}. + * + * @see MemoryLimiter + * @see MemoryLimitCalculator + */ +public class MemoryLimitCalculator { + + private static volatile long maxAvailable; + + private static final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor(); + + static { + // immediately refresh when this class is loaded to prevent maxAvailable from being 0 + refresh(); + // check every 50 ms to improve performance + SCHEDULER.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS); + Runtime.getRuntime().addShutdownHook(new Thread(SCHEDULER::shutdown)); + } + + private static void refresh() { + maxAvailable = Runtime.getRuntime().freeMemory(); + } + + /** + * Get the maximum available memory of the current JVM. + * + * @return maximum available memory + */ + public static long maxAvailable() { + return maxAvailable; + } + + /** + * Take the current JVM's maximum available memory + * as a percentage of the result as the limit. + * + * @param percentage percentage + * @return available memory + */ + public static long calculate(final float percentage) { + if (percentage <= 0 || percentage > 1) { + throw new IllegalArgumentException(); + } + return (long) (maxAvailable() * percentage); + } + + /** + * By default, it takes 80% of the maximum available memory of the current JVM. + * + * @return available memory + */ + public static long defaultLimit() { + return (long) (maxAvailable() * 0.8); + } +} \ No newline at end of file diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java new file mode 100644 index 000000000..e54c7e706 --- /dev/null +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alipay.sofa.rpc.common.threadpool; + +import java.lang.instrument.Instrumentation; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * memory limiter. + */ +public class MemoryLimiter { + + private final Instrumentation inst; + + private long memoryLimit; + + private final LongAdder memory = new LongAdder(); + + private final ReentrantLock acquireLock = new ReentrantLock(); + + private final Condition notLimited = acquireLock.newCondition(); + + private final ReentrantLock releaseLock = new ReentrantLock(); + + private final Condition notEmpty = releaseLock.newCondition(); + + public MemoryLimiter(Instrumentation inst) { + this(Integer.MAX_VALUE, inst); + } + + public MemoryLimiter(long memoryLimit, Instrumentation inst) { + if (memoryLimit <= 0) { + throw new IllegalArgumentException(); + } + this.memoryLimit = memoryLimit; + this.inst = inst; + } + + public void setMemoryLimit(long memoryLimit) { + if (memoryLimit <= 0) { + throw new IllegalArgumentException(); + } + this.memoryLimit = memoryLimit; + } + + public long getMemoryLimit() { + return memoryLimit; + } + + public long getCurrentMemory() { + return memory.sum(); + } + + public long getCurrentRemainMemory() { + return getMemoryLimit() - getCurrentMemory(); + } + + private void signalNotEmpty() { + releaseLock.lock(); + try { + notEmpty.signal(); + } finally { + releaseLock.unlock(); + } + } + + private void signalNotLimited() { + acquireLock.lock(); + try { + notLimited.signal(); + } finally { + acquireLock.unlock(); + } + } + + /** + * Locks to prevent both acquires and releases. + */ + private void fullyLock() { + acquireLock.lock(); + releaseLock.lock(); + } + + /** + * Unlocks to allow both acquires and releases. + */ + private void fullyUnlock() { + releaseLock.unlock(); + acquireLock.unlock(); + } + + public boolean acquire(Object e) { + if (e == null) { + throw new NullPointerException(); + } + if (memory.sum() >= memoryLimit) { + return false; + } + acquireLock.lock(); + try { + final long sum = memory.sum(); + final long objectSize = inst.getObjectSize(e); + if (sum + objectSize >= memoryLimit) { + return false; + } + memory.add(objectSize); + // see https://github.com/apache/incubator-shenyu/pull/3356 + if (memory.sum() < memoryLimit) { + notLimited.signal(); + } + } finally { + acquireLock.unlock(); + } + if (memory.sum() > 0) { + signalNotEmpty(); + } + return true; + } + + public void acquireInterruptibly(Object e) throws InterruptedException { + if (e == null) { + throw new NullPointerException(); + } + acquireLock.lockInterruptibly(); + try { + final long objectSize = inst.getObjectSize(e); + // see https://github.com/apache/incubator-shenyu/pull/3335 + while (memory.sum() + objectSize >= memoryLimit) { + notLimited.await(); + } + memory.add(objectSize); + if (memory.sum() < memoryLimit) { + notLimited.signal(); + } + } finally { + acquireLock.unlock(); + } + if (memory.sum() > 0) { + signalNotEmpty(); + } + } + + public boolean acquire(Object e, long timeout, TimeUnit unit) throws InterruptedException { + if (e == null) { + throw new NullPointerException(); + } + long nanos = unit.toNanos(timeout); + acquireLock.lockInterruptibly(); + try { + final long objectSize = inst.getObjectSize(e); + while (memory.sum() + objectSize >= memoryLimit) { + if (nanos <= 0) { + return false; + } + nanos = notLimited.awaitNanos(nanos); + } + memory.add(objectSize); + if (memory.sum() < memoryLimit) { + notLimited.signal(); + } + } finally { + acquireLock.unlock(); + } + if (memory.sum() > 0) { + signalNotEmpty(); + } + return true; + } + + public void release(Object e) { + if (null == e) { + return; + } + if (memory.sum() == 0) { + return; + } + releaseLock.lock(); + try { + final long objectSize = inst.getObjectSize(e); + if (memory.sum() > 0) { + memory.add(-objectSize); + if (memory.sum() > 0) { + notEmpty.signal(); + } + } + } finally { + releaseLock.unlock(); + } + if (memory.sum() < memoryLimit) { + signalNotLimited(); + } + } + + public void releaseInterruptibly(Object e) throws InterruptedException { + if (null == e) { + return; + } + releaseLock.lockInterruptibly(); + try { + final long objectSize = inst.getObjectSize(e); + while (memory.sum() == 0) { + notEmpty.await(); + } + memory.add(-objectSize); + if (memory.sum() > 0) { + notEmpty.signal(); + } + } finally { + releaseLock.unlock(); + } + if (memory.sum() < memoryLimit) { + signalNotLimited(); + } + } + + public void releaseInterruptibly(Object e, long timeout, TimeUnit unit) throws InterruptedException { + if (null == e) { + return; + } + long nanos = unit.toNanos(timeout); + releaseLock.lockInterruptibly(); + try { + final long objectSize = inst.getObjectSize(e); + while (memory.sum() == 0) { + if (nanos <= 0) { + return; + } + nanos = notEmpty.awaitNanos(nanos); + } + memory.add(-objectSize); + if (memory.sum() > 0) { + notEmpty.signal(); + } + } finally { + releaseLock.unlock(); + } + if (memory.sum() < memoryLimit) { + signalNotLimited(); + } + } + + public void clear() { + fullyLock(); + try { + if (memory.sumThenReset() < memoryLimit) { + notLimited.signal(); + } + } finally { + fullyUnlock(); + } + } +} \ No newline at end of file diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java new file mode 100644 index 000000000..369757883 --- /dev/null +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alipay.sofa.rpc.common.threadpool; + +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue}, + * does not depend on {@link java.lang.instrument.Instrumentation} + * + * @see MemorySafeLinkedBlockingQueue + */ +public class MemorySafeLinkedBlockingQueue extends LinkedBlockingQueue { + + private static final long serialVersionUID = 8032578371739960142L; + + public static int THE_256_MB = 256 * 1024 * 1024; + + private int maxFreeMemory; + + public MemorySafeLinkedBlockingQueue() { + this(THE_256_MB); + } + + public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) { + super(Integer.MAX_VALUE); + this.maxFreeMemory = maxFreeMemory; + } + + public MemorySafeLinkedBlockingQueue(final Collection c, + final int maxFreeMemory) { + super(c); + this.maxFreeMemory = maxFreeMemory; + } + + /** + * set the max free memory. + * + * @param maxFreeMemory the max free memory + */ + public void setMaxFreeMemory(final int maxFreeMemory) { + this.maxFreeMemory = maxFreeMemory; + } + + /** + * get the max free memory. + * + * @return the max free memory limit + */ + public int getMaxFreeMemory() { + return maxFreeMemory; + } + + /** + * determine if there is any remaining free memory. + * + * @return true if has free memory + */ + public boolean hasRemainedMemory() { + return MemoryLimitCalculator.maxAvailable() > maxFreeMemory; + } + + @Override + public void put(final E e) throws InterruptedException { + if (hasRemainedMemory()) { + super.put(e); + } + } + + @Override + public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { + return hasRemainedMemory() && super.offer(e, timeout, unit); + } + + @Override + public boolean offer(final E e) { + return hasRemainedMemory() && super.offer(e); + } +} \ No newline at end of file diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java index 4a474a2a5..f4c57019b 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.rpc.common.utils; +import com.alipay.sofa.rpc.common.threadpool.MemorySafeLinkedBlockingQueue; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -248,7 +250,7 @@ public static BlockingQueue buildQueue(int size, boolean isPriority) { queue = size < 0 ? new PriorityBlockingQueue() : new PriorityBlockingQueue(size); } else { - queue = size < 0 ? new LinkedBlockingQueue() + queue = size < 0 ? new MemorySafeLinkedBlockingQueue() : new LinkedBlockingQueue(size); } } diff --git a/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java b/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java new file mode 100644 index 000000000..8a51bb365 --- /dev/null +++ b/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java @@ -0,0 +1,30 @@ +package com.alipay.sofa.rpc.common.threadpool; + +import net.bytebuddy.agent.ByteBuddyAgent; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.instrument.Instrumentation; + +public class MemorySafeLinkedBlockingQueueTest { + + @Test + public void test() throws Exception { + ByteBuddyAgent.install(); + final Instrumentation instrumentation = ByteBuddyAgent.getInstrumentation(); + final long objectSize = instrumentation.getObjectSize((Runnable) () -> { + }); + int maxFreeMemory = (int) MemoryLimitCalculator.maxAvailable(); + MemorySafeLinkedBlockingQueue queue = new MemorySafeLinkedBlockingQueue<>(maxFreeMemory); + + // all memory is reserved for JVM, so it will fail here + Assert.assertEquals(queue.offer(() -> { + }), false); + + // maxFreeMemory-objectSize Byte memory is reserved for the JVM, so this will succeed + queue.setMaxFreeMemory((int) (MemoryLimitCalculator.maxAvailable() - objectSize)); + Assert.assertEquals(queue.offer(() -> { + }), true); + } + +} \ No newline at end of file From 94e41ca93544c57a864a64a76277efb7914e595e Mon Sep 17 00:00:00 2001 From: codingprh Date: Wed, 6 Jul 2022 14:12:17 +0800 Subject: [PATCH 2/4] feat: add MemorySafeLinkedBlockingQueue --- .../threadpool/MemoryLimitCalculator.java | 1 - .../rpc/common/threadpool/MemoryLimiter.java | 13 ++++++------- .../MemorySafeLinkedBlockingQueue.java | 5 ++--- .../MemorySafeLinkedBlockingQueueTest.java | 17 ++++++++++++++++- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java index b188cdcfb..f1d2ede20 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.alipay.sofa.rpc.common.threadpool; import java.util.concurrent.Executors; diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java index e54c7e706..cd5ffc15e 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.alipay.sofa.rpc.common.threadpool; import java.lang.instrument.Instrumentation; @@ -30,17 +29,17 @@ public class MemoryLimiter { private final Instrumentation inst; - private long memoryLimit; + private long memoryLimit; - private final LongAdder memory = new LongAdder(); + private final LongAdder memory = new LongAdder(); - private final ReentrantLock acquireLock = new ReentrantLock(); + private final ReentrantLock acquireLock = new ReentrantLock(); - private final Condition notLimited = acquireLock.newCondition(); + private final Condition notLimited = acquireLock.newCondition(); - private final ReentrantLock releaseLock = new ReentrantLock(); + private final ReentrantLock releaseLock = new ReentrantLock(); - private final Condition notEmpty = releaseLock.newCondition(); + private final Condition notEmpty = releaseLock.newCondition(); public MemoryLimiter(Instrumentation inst) { this(Integer.MAX_VALUE, inst); diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java index 369757883..191f3894c 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.alipay.sofa.rpc.common.threadpool; import java.util.Collection; @@ -31,9 +30,9 @@ public class MemorySafeLinkedBlockingQueue extends LinkedBlockingQueue { private static final long serialVersionUID = 8032578371739960142L; - public static int THE_256_MB = 256 * 1024 * 1024; + public static int THE_256_MB = 256 * 1024 * 1024; - private int maxFreeMemory; + private int maxFreeMemory; public MemorySafeLinkedBlockingQueue() { this(THE_256_MB); diff --git a/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java b/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java index 8a51bb365..a1cca3fc2 100644 --- a/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java +++ b/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.alipay.sofa.rpc.common.threadpool; import net.bytebuddy.agent.ByteBuddyAgent; @@ -26,5 +42,4 @@ public void test() throws Exception { Assert.assertEquals(queue.offer(() -> { }), true); } - } \ No newline at end of file From 622ba0a251513b3d54a25ce31135fab09c21fc08 Mon Sep 17 00:00:00 2001 From: codingprh Date: Wed, 6 Jul 2022 19:26:45 +0800 Subject: [PATCH 3/4] feat: add MemorySafeLinkedBlockingQueue test --- .../alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java b/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java index dcf4e38d6..6da188a44 100644 --- a/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java +++ b/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java @@ -17,6 +17,7 @@ package com.alipay.sofa.rpc.common.utils; import com.alipay.sofa.rpc.common.struct.NamedThreadFactory; +import com.alipay.sofa.rpc.common.threadpool.MemorySafeLinkedBlockingQueue; import org.junit.Assert; import org.junit.Test; @@ -151,7 +152,7 @@ public void buildQueue() throws Exception { BlockingQueue queue = ThreadPoolUtils.buildQueue(0); Assert.assertEquals(queue.getClass(), SynchronousQueue.class); queue = ThreadPoolUtils.buildQueue(-1); - Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class); + Assert.assertEquals(queue.getClass(), MemorySafeLinkedBlockingQueue.class); queue = ThreadPoolUtils.buildQueue(10); Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class); } @@ -165,7 +166,7 @@ public void buildQueue1() throws Exception { queue = ThreadPoolUtils.buildQueue(100, true); Assert.assertEquals(queue.getClass(), PriorityBlockingQueue.class); queue = ThreadPoolUtils.buildQueue(-1, false); - Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class); + Assert.assertEquals(queue.getClass(), MemorySafeLinkedBlockingQueue.class); queue = ThreadPoolUtils.buildQueue(100, false); Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class); } From ae2e21f554e0f7317c7c73eedfd245be769403b1 Mon Sep 17 00:00:00 2001 From: codingprh Date: Tue, 19 Jul 2022 18:03:40 +0800 Subject: [PATCH 4/4] feat: remove useless class --- .../threadpool/MemoryLimitCalculator.java | 8 +- .../rpc/common/threadpool/MemoryLimiter.java | 269 ------------------ 2 files changed, 3 insertions(+), 274 deletions(-) delete mode 100644 core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java index f1d2ede20..283943494 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java @@ -21,12 +21,10 @@ import java.util.concurrent.TimeUnit; /** - * {@link java.lang.Runtime#freeMemory()} technology is used to calculate the - * memory limit by using the percentage of the current maximum available memory, - * which can be used with {@link MemoryLimiter}. + * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue}, + * does not depend on {@link java.lang.instrument.Instrumentation} * - * @see MemoryLimiter - * @see MemoryLimitCalculator + * @see MemorySafeLinkedBlockingQueue */ public class MemoryLimitCalculator { diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java deleted file mode 100644 index cd5ffc15e..000000000 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimiter.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.rpc.common.threadpool; - -import java.lang.instrument.Instrumentation; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.LongAdder; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -/** - * memory limiter. - */ -public class MemoryLimiter { - - private final Instrumentation inst; - - private long memoryLimit; - - private final LongAdder memory = new LongAdder(); - - private final ReentrantLock acquireLock = new ReentrantLock(); - - private final Condition notLimited = acquireLock.newCondition(); - - private final ReentrantLock releaseLock = new ReentrantLock(); - - private final Condition notEmpty = releaseLock.newCondition(); - - public MemoryLimiter(Instrumentation inst) { - this(Integer.MAX_VALUE, inst); - } - - public MemoryLimiter(long memoryLimit, Instrumentation inst) { - if (memoryLimit <= 0) { - throw new IllegalArgumentException(); - } - this.memoryLimit = memoryLimit; - this.inst = inst; - } - - public void setMemoryLimit(long memoryLimit) { - if (memoryLimit <= 0) { - throw new IllegalArgumentException(); - } - this.memoryLimit = memoryLimit; - } - - public long getMemoryLimit() { - return memoryLimit; - } - - public long getCurrentMemory() { - return memory.sum(); - } - - public long getCurrentRemainMemory() { - return getMemoryLimit() - getCurrentMemory(); - } - - private void signalNotEmpty() { - releaseLock.lock(); - try { - notEmpty.signal(); - } finally { - releaseLock.unlock(); - } - } - - private void signalNotLimited() { - acquireLock.lock(); - try { - notLimited.signal(); - } finally { - acquireLock.unlock(); - } - } - - /** - * Locks to prevent both acquires and releases. - */ - private void fullyLock() { - acquireLock.lock(); - releaseLock.lock(); - } - - /** - * Unlocks to allow both acquires and releases. - */ - private void fullyUnlock() { - releaseLock.unlock(); - acquireLock.unlock(); - } - - public boolean acquire(Object e) { - if (e == null) { - throw new NullPointerException(); - } - if (memory.sum() >= memoryLimit) { - return false; - } - acquireLock.lock(); - try { - final long sum = memory.sum(); - final long objectSize = inst.getObjectSize(e); - if (sum + objectSize >= memoryLimit) { - return false; - } - memory.add(objectSize); - // see https://github.com/apache/incubator-shenyu/pull/3356 - if (memory.sum() < memoryLimit) { - notLimited.signal(); - } - } finally { - acquireLock.unlock(); - } - if (memory.sum() > 0) { - signalNotEmpty(); - } - return true; - } - - public void acquireInterruptibly(Object e) throws InterruptedException { - if (e == null) { - throw new NullPointerException(); - } - acquireLock.lockInterruptibly(); - try { - final long objectSize = inst.getObjectSize(e); - // see https://github.com/apache/incubator-shenyu/pull/3335 - while (memory.sum() + objectSize >= memoryLimit) { - notLimited.await(); - } - memory.add(objectSize); - if (memory.sum() < memoryLimit) { - notLimited.signal(); - } - } finally { - acquireLock.unlock(); - } - if (memory.sum() > 0) { - signalNotEmpty(); - } - } - - public boolean acquire(Object e, long timeout, TimeUnit unit) throws InterruptedException { - if (e == null) { - throw new NullPointerException(); - } - long nanos = unit.toNanos(timeout); - acquireLock.lockInterruptibly(); - try { - final long objectSize = inst.getObjectSize(e); - while (memory.sum() + objectSize >= memoryLimit) { - if (nanos <= 0) { - return false; - } - nanos = notLimited.awaitNanos(nanos); - } - memory.add(objectSize); - if (memory.sum() < memoryLimit) { - notLimited.signal(); - } - } finally { - acquireLock.unlock(); - } - if (memory.sum() > 0) { - signalNotEmpty(); - } - return true; - } - - public void release(Object e) { - if (null == e) { - return; - } - if (memory.sum() == 0) { - return; - } - releaseLock.lock(); - try { - final long objectSize = inst.getObjectSize(e); - if (memory.sum() > 0) { - memory.add(-objectSize); - if (memory.sum() > 0) { - notEmpty.signal(); - } - } - } finally { - releaseLock.unlock(); - } - if (memory.sum() < memoryLimit) { - signalNotLimited(); - } - } - - public void releaseInterruptibly(Object e) throws InterruptedException { - if (null == e) { - return; - } - releaseLock.lockInterruptibly(); - try { - final long objectSize = inst.getObjectSize(e); - while (memory.sum() == 0) { - notEmpty.await(); - } - memory.add(-objectSize); - if (memory.sum() > 0) { - notEmpty.signal(); - } - } finally { - releaseLock.unlock(); - } - if (memory.sum() < memoryLimit) { - signalNotLimited(); - } - } - - public void releaseInterruptibly(Object e, long timeout, TimeUnit unit) throws InterruptedException { - if (null == e) { - return; - } - long nanos = unit.toNanos(timeout); - releaseLock.lockInterruptibly(); - try { - final long objectSize = inst.getObjectSize(e); - while (memory.sum() == 0) { - if (nanos <= 0) { - return; - } - nanos = notEmpty.awaitNanos(nanos); - } - memory.add(-objectSize); - if (memory.sum() > 0) { - notEmpty.signal(); - } - } finally { - releaseLock.unlock(); - } - if (memory.sum() < memoryLimit) { - signalNotLimited(); - } - } - - public void clear() { - fullyLock(); - try { - if (memory.sumThenReset() < memoryLimit) { - notLimited.signal(); - } - } finally { - fullyUnlock(); - } - } -} \ No newline at end of file