造轮子之MemorySafeLinkedBlockingQueue-LinkBlockingQueue改进

LinkBlockingQueue改进

问题背景

https://github.com/apache/dubbo/pull/9722/files使用线程池的同学对于标题中的队列想必都有过使用,但上述队列使用不当时则会造成程序OOM,那怎么来控制呢?

使用ArrayBlockingQueue?如何来评估长度?

是否有一个完美的解决方案呢,MemorySafeLinkedBlockingQueue则通过对内存的限制判断尽面控制队列的容量,完成解决了可能存在的OOM问题。

获取内存大小(注:单位大B;支持准实时更新):

Runtime.getRuntime().freeMemory()//JVM中已经申请到的堆内存中还未使用的大小Runtime.getRuntime().maxMemory()// JVM可从操作系统申请到的最大内存值 -XxmRuntime.getRuntime().totalMemory()// JVM已从操作系统申请到的内存大小 —Xxs可设置该值大小-初始堆的大小

线程池在excute任务时,放队列,放不进去,使用新线程运行任务。这个放不进行,是使用的offer??非阻塞方法吗?

参考:https://blog.csdn.net/weixin_43108539/article/details/125190023

public void execute(Runnable command) {if (command == null)throw new NullPointerException(); //拿到32位的intint c = ctl.get(); //工作线程数<核心线程数if (workerCountOf(c) < corePoolSize) {//进入if,代表可以创建 核心 线程数if (addWorker(command, true))return;//如果没进入if,代表创建核心线程数失败,重新获取 ctlc = ctl.get();}//判断线程池为Running状态,将任务添加入阻塞队列,使用offerif (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再次判断是否为Running状态,若不是Running状态,remove任务if (! isRunning(recheck) && remove(command))reject(command);//如果线程池在Running状态,线程池数量为0else if (workerCountOf(recheck) == 0)//阻塞队列有任务,但是没有工作线程,添加一个任务为空的工作线程处理阻塞队列中的任务addWorker(null, false);}//阻塞队列已满,创建非核心线程,拒绝策略-addWorker中有判断核心线程数是否超过最大线程数else if (!addWorker(command, false))reject(command);}

空闲内存计算

package com.zte.sdn.oscp.queue;import cn.hutool.core.thread.NamedThreadFactory;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;public class MemoryLimitCalculator {private static volatile long maxAvailable;private static final AtomicBoolean refreshStarted = new AtomicBoolean(false);private static void refresh() {maxAvailable = Runtime.getRuntime().freeMemory();}private static void checkAndScheduleRefresh() {if (!refreshStarted.get()) {// immediately refresh when first call to prevent maxAvailable from being 0// to ensure that being refreshed before refreshStarted being set as true// notice: refresh may be called for more than once because there is no lockrefresh();if (refreshStarted.compareAndSet(false, true)) {ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(“Dubbo-Memory-Calculator”));// check every 50 ms to improve performancescheduledExecutorService.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);Runtime.getRuntime().addShutdownHook(new Thread(() -> {refreshStarted.set(false);scheduledExecutorService.shutdown();}));}}}/** * Get the maximum available memory of the current JVM. * * @return maximum available memory */public static long maxAvailable() {checkAndScheduleRefresh();return maxAvailable;}/** * Take the current JVM’s maximum available memory * as a percentage of the result as the limit. * * @param percentage percentage * @return available memory */public static long calculate(final float percentage) {if (percentage1) {throw new IllegalArgumentException();}checkAndScheduleRefresh();return (long) (maxAvailable() * percentage);}/** * By default, it takes 80% of the maximum available memory of the current JVM. * * @return available memory */public static long defaultLimit() {checkAndScheduleRefresh();return (long) (maxAvailable() * 0.8);}}

内存安全队列

package com.zte.sdn.oscp.queue;import java.util.Collection;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class MemorySafeLinkedBlockingQueue extends LinkedBlockingQueue {private static final long serialVersionUID = 8032578371739960142L;public static int THE_256_MB = 256 * 1024 * 1024;private int maxFreeMemory;private Rejector rejector;public MemorySafeLinkedBlockingQueue() {this(THE_256_MB);}public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {super(Integer.MAX_VALUE);this.maxFreeMemory = maxFreeMemory;//default as DiscardPolicy to ensure compatibility with the old versionthis.rejector = new DiscardPolicy();}public MemorySafeLinkedBlockingQueue(final Collection c, final int maxFreeMemory) {super(c);this.maxFreeMemory = maxFreeMemory;//default as DiscardPolicy to ensure compatibility with the old versionthis.rejector = new DiscardPolicy();}/** * set the max free memory. * * @param maxFreeMemory the max free memory */public void setMaxFreeMemory(final int maxFreeMemory) {this.maxFreeMemory = maxFreeMemory;}/** * get the max free memory. * * @return the max free memory limit */public int getMaxFreeMemory() {return maxFreeMemory;}/** * set the rejector. * * @param rejector the rejector */public void setRejector(final Rejector rejector) {this.rejector = rejector;}/** * determine if there is any remaining free memory. * * @return true if has free memory */public boolean hasRemainedMemory() {return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;}@Overridepublic void put(final E e) throws InterruptedException {if (hasRemainedMemory()) {super.put(e);} else {rejector.reject(e, this);}}@Overridepublic boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {if (!hasRemainedMemory()) {rejector.reject(e, this);return false;}return super.offer(e, timeout, unit);}@Overridepublic boolean offer(final E e) {if (!hasRemainedMemory()) {rejector.reject(e, this);return false;}return super.offer(e);}}

拒绝策略

注意其中的rejector是拒绝策略,默认的DiscardPolicy什么也不处理;

而DiscardOldPolicy的处理逻辑很简单

public class DiscardOldestPolicy implements Rejector {@Overridepublic void reject(final E e, final Queue queue) {queue.poll();queue.offer(e);}}

AbortPolicy则直接抛出异常

public class AbortPolicy implements Rejector {@Overridepublic void reject(final E e, final Queue queue) {throw new RejectException(“no more memory can be used !”);}}

个人建议增加日志打印即可。

郑重声明:本文内容及图片均整理自互联网,不代表本站立场,版权归原作者所有,如有侵权请联系管理员(admin#wlmqw.com)删除。
(0)
用户投稿
上一篇 2022年9月9日
下一篇 2022年9月9日

相关推荐

  • 采用N5095处理器 大唐NUC迷你主机只需1199

    大唐NUC迷你电脑主机现在到手只需1199元。 这款主机采用N5095处理器,具有四核心四线程,动态加速频率达2.9GHz,还支持双4K高清显示输出。其搭配LPDDR4 8GB内存…

    2022年7月31日
  • 我国再次成功发射“一箭三星” 中国航天3天3连发

    北京时间2022年9月27日7时50分,我国在太原卫星发射中心使用长征六号运载火箭,以“一箭三星”方式,成功将试验十六号A/B星和试验十七号卫星发射升空,卫星顺利进入预定轨道,发射…

    2022年9月27日
  • 神舟十五号计划近日择机发射

    本报北京11月21日电(中青报·中青网记者 邱晨辉)据中国载人航天工程办公室消息,北京时间2022年11月21日,神舟十五号载人飞船与长征二号F遥十五运载火箭组合体已转运至发射区,…

    2022年11月24日
  • 《光遇》11月21日每日任务怎么做 11.21每日任务攻略

    光遇每天都会更新四个日常任务,玩家完成任务可以获得蜡烛奖励,一些小伙伴还不知道今天的任务要怎么做,下面小编就给大家带来了光遇11.21每日任务攻略,一起来看看吧。 光遇11.21每…

    2022年11月21日
  • 把电动车送上太空?美电动车商Canoo成为NASA供应商

    西班牙《公众》日报网站近日报道称,在世界范围内更加以NASA之名为人所知的美国航空航天局,选择由电动汽车初创公司Canoo来提供电动汽车,将阿耳忒弥斯任务的宇航员从准备大楼运送到发…

    2022年6月28日
  • 魔兽世界卡鲁亚克声望任务怎么做 wowwlk卡鲁亚克声望任务流程攻略

    魔兽世界的wlk已经正式的上线了,游戏中新增了一些声望任务,今天给大家带来的是其中一个声望任务卡鲁亚克声望任务流程攻略,下面就是具体的内容. 魔兽世界卡鲁亚克声望任务流程攻略 开启…

    2022年9月30日
  • 介绍利用微信赚钱的4种方式(微信赚钱的方法有哪些)

    微信上的赚钱方法很多,比如微商,代购等,但不适合所有人。邵连虎今天给大家推荐的,是不需要投资,而且都是非常简单,人人可操作的,所以,有兴趣的可以研究下:1,关注公众号赚钱在微信上有…

    2022年10月22日
  • C++关键字整理

    atomic #include 原子操作(atomic): 互斥量的加锁一般是针对一个代码段,而原子操作针对的一般都是一个变量。原子变量既不可复制亦不可移动。(1)它表示在多个线程…

    2022年6月30日
  • 《魔兽世界》怀旧服某种邀请任务怎么完成?某种邀请任务内容介绍

    魔兽世界wlk某种邀请任务怎么完成?很多玩家还不清楚魔兽世界wlk怀旧服某种邀请任务怎么完成,下面一起来看一下魔兽世界wlk某种邀请任务内容介绍。希望可以帮助各位玩家顺利的进行游戏…

    2022年10月9日
  • Java子线程中的异常处理(通用)

    在普通的单线程程序中,捕获异常只需要通过try … catch … finally …代码块就可以了。那么,在并发情况下,比如在父线程中启动了子…

    2022年6月21日

联系我们

联系邮箱:admin#wlmqw.com
工作时间:周一至周五,10:30-18:30,节假日休息