「高并发」这样理解线程池中Worker线程的执行流程才正确

Worker类分析

Worker类从类的结构上来看,继承了AQS(AbstractQueuedSynchronizer类)并实现了Runnable接口。本质上,Worker类既是一个同步组件,也是一个执行任务的线程。接下来,我们看下Worker类的源码,如下所示。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {private static final long serialVersionUID = 6138294804551838833L;//执行任务的线程类final Thread thread;//初始化执行的任务,第一次执行的任务Runnable firstTask;//完成任务的计数volatile long completedTasks;//Worker类的构造方法,初始化任务并调用线程工厂创建执行任务的线程Worker(Runnable firstTask) {setState(-1); this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//重写Runnable接口的run()方法public void run() {//调用ThreadPoolExecutor类的runWorker(Worker)方法runWorker(this);}//检测是否是否获取到锁//state=0表示未获取到锁//state=1表示已获取到锁protected boolean isHeldExclusively() {return getState() != 0;}//使用AQS设置线程状态protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//尝试释放锁protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}复制代码

在Worker类的构造方法中,可以看出,首先将同步状态state设置为-1,设置为-1是为了防止runWorker方法运行之前被中断。这是因为如果其他线程调用线程池的shutdownNow()方法时,如果Worker类中的state状态的值大于0,则会中断线程,如果state状态的值为-1,则不会中断线程。

Worker类实现了Runnable接口,需要重写run方法,而Worker的run方法本质上调用的是ThreadPoolExecutor类的runWorker方法,在runWorker方法中,会首先调用unlock方法,该方法会将state置为0,所以这个时候调用shutDownNow方法就会中断当前线程,而这个时候已经进入了runWork方法,就不会在还没有执行runWorker方法的时候就中断线程。

注意:大家需要重点理解Worker类的实现。

Worker类中调用了ThreadPoolExecutor类的runWorker(Worker)方法。接下来,我们一起看下ThreadPoolExecutor类的runWorker(Worker)方法的实现。

runWorker(Worker)方法

首先,我们看下RunWorker(Worker)方法的源码,如下所示。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//释放锁,将state设置为0,允许中断任务的执行w.unlock();boolean completedAbruptly = true;try {//如果任务不为空,或者从任务队列中获取的任务不为空,则执行while循环while (task != null || (task = getTask()) != null) {//如果任务不为空,则获取Worker工作线程的独占锁w.lock();//如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程//大家好好理解下这个逻辑if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())//中断线程wt.interrupt();try {//执行任务前执行的逻辑beforeExecute(wt, task);Throwable thrown = null;try {//调用Runable接口的run方法执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//执行任务后执行的逻辑afterExecute(task, thrown);}} finally {//任务执行完成后,将其设置为空task = null;//完成的任务数量加1w.completedTasks++;//释放工作线程获得的锁w.unlock();}}completedAbruptly = false;} finally {//执行退出Worker线程的逻辑processWorkerExit(w, completedAbruptly);}}复制代码

这里,我们拆解runWorker(Worker)方法。

(1)获取当前线程的句柄和工作线程中的任务,并将工作线程中的任务设置为空,执行unlock方法释放锁,将state状态设置为0,此时可以中断工作线程,代码如下所示。

Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//释放锁,将state设置为0,允许中断任务的执行w.unlock();复制代码

(2)在while循环中进行判断,如果任务不为空,或者从任务队列中获取的任务不为空,则执行while循环,否则,调用processWorkerExit(Worker, boolean)方法退出Worker工作线程。

while (task != null || (task = getTask()) != null)复制代码

(3)如果满足while的循环条件,首先获取工作线程内部的独占锁,并执行一系列的逻辑判断来检测是否需要中断当前线程的执行,代码如下所示。

//如果任务不为空,则获取Worker工作线程的独占锁w.lock();//如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程//大家好好理解下这个逻辑if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())//中断线程wt.interrupt();复制代码

(4)调用执行任务前执行的逻辑,如下所示

//执行任务前执行的逻辑beforeExecute(wt, task);复制代码

(5)调用Runable接口的run方法执行任务

//调用Runable接口的run方法执行任务task.run();复制代码

(6)调用执行任务后执行的逻辑

//执行任务后执行的逻辑afterExecute(task, thrown);复制代码

(7)将完成的任务设置为空,完成的任务数量加1并释放工作线程的锁。

//任务执行完成后,将其设置为空task = null;//完成的任务数量加1w.completedTasks++;//释放工作线程获得的锁w.unlock();复制代码

(8)退出Worker线程的执行,如下所示

//执行退出Worker线程的逻辑processWorkerExit(w, completedAbruptly);复制代码

从代码分析上可以看到,当从Worker线程中获取的任务为空时,会调用getTask()方法从任务队列中获取任务,接下来,我们看下getTask()方法的实现。

getTask()方法

我们先来看下getTask()方法的源代码,如下所示。

private Runnable getTask() {//轮询是否超时的标识boolean timedOut = false;//自旋for循环for (;;) {//获取ctlint c = ctl.get();//获取线程池的状态int rs = runStateOf(c);//检测任务队列是否在线程池停止或关闭的时候为空//也就是说任务队列是否在线程池未正常运行时为空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//减少Worker线程的数量decrementWorkerCount();return null;}//获取线程池中线程的数量int wc = workerCountOf(c);//检测当前线程池中的线程数量是否大于corePoolSize的值或者是否正在等待执行任务boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果线程池中的线程数量大于corePoolSize//获取大于corePoolSize或者是否正在等待执行任务并且轮询超时//并且当前线程池中的线程数量大于1或者任务队列为空if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {//成功减少线程池中的工作线程数量if (compareAndDecrementWorkerCount(c))return null;continue;}try {//从任务队列中获取任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//任务不为空直接返回任务if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}复制代码

getTask()方法的逻辑比较简单,大家看源码就可以了,我这里就不重复描述了。

接下来,我们看下在正式调用Runnable的run()方法前后,执行的beforeExecute方法和afterExecute方法。

beforeExecute(Thread, Runnable)方法

beforeExecute(Thread, Runnable)方法的源代码如下所示。

protected void beforeExecute(Thread t, Runnable r) { }复制代码

可以看到,beforeExecute(Thread, Runnable)方法的方法体为空,我们可以创建ThreadPoolExecutor的子类来重写beforeExecute(Thread, Runnable)方法,使得线程池正式执行任务之前,执行我们自己定义的业务逻辑。

afterExecute(Runnable, Throwable)方法

afterExecute(Runnable, Throwable)方法的源代码如下所示。

protected void afterExecute(Runnable r, Throwable t) { }复制代码

可以看到,afterExecute(Runnable, Throwable)方法的方法体同样为空,我们可以创建ThreadPoolExecutor的子类来重写afterExecute(Runnable, Throwable)方法,使得线程池在执行任务之后执行我们自己定义的业务逻辑。

接下来,就是退出工作线程的processWorkerExit(Worker, boolean)方法。

processWorkerExit(Worker, boolean)方法

processWorkerExit(Worker, boolean)方法的逻辑主要是执行退出Worker线程,并且对一些资源进行清理,源代码如下所示。

private void processWorkerExit(Worker w, boolean completedAbruptly) {//执行过程中出现了异常,突然中断if (completedAbruptly)//将工作线程的数量减1decrementWorkerCount();//获取全局锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//累加完成的任务数量completedTaskCount += w.completedTasks;//将完成的任务从workers集合中移除workers.remove(w);} finally {//释放锁mainLock.unlock();}//尝试终止工作线程的执行tryTerminate();//获取ctlint c = ctl.get();//判断当前线程池的状态是否小于STOP(RUNNING或者SHUTDOWN)if (runStateLessThan(c, STOP)) {//如果没有突然中断完成if (!completedAbruptly) {//如果allowCoreThreadTimeOut为true,为min赋值为0,否则赋值为corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min为0并且工作队列不为空if (min == 0 && ! workQueue.isEmpty())//min的值设置为1min = 1;//如果线程池中的线程数量大于min的值if (workerCountOf(c) >= min)//返回,不再执行程序return; }//调用addWorker方法addWorker(null, false);}}复制代码

接下来,我们拆解processWorkerExit(Worker, boolean)方法。

(1)执行过程中出现了异常,突然中断执行,则将工作线程数量减1,如下所示。

//执行过程中出现了异常,突然中断if (completedAbruptly)//将工作线程的数量减1decrementWorkerCount();复制代码

(2)获取锁累加完成的任务数量,并将完成的任务从workers集合中移除,并释放,如下所示。

//获取全局锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//累加完成的任务数量completedTaskCount += w.completedTasks;//将完成的任务从workers集合中移除workers.remove(w);} finally {//释放锁mainLock.unlock();}复制代码

(3)尝试终止工作线程的执行

//尝试终止工作线程的执行tryTerminate();复制代码

(4)处判断当前线程池中的线程个数是否小于核心线程数,如果是,需要新增一个线程保证有足够的线程可以执行任务队列中的任务或者提交的任务。

//获取ctlint c = ctl.get();//判断当前线程池的状态是否小于STOP(RUNNING或者SHUTDOWN)if (runStateLessThan(c, STOP)) {//如果没有突然中断完成if (!completedAbruptly) {//如果allowCoreThreadTimeOut为true,为min赋值为0,否则赋值为corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min为0并且工作队列不为空if (min == 0 && ! workQueue.isEmpty())//min的值设置为1min = 1;//如果线程池中的线程数量大于min的值if (workerCountOf(c) >= min)//返回,不再执行程序return; }//调用addWorker方法addWorker(null, false);}复制代码

接下来,我们看下tryTerminate()方法。

tryTerminate()方法

tryTerminate()方法的源代码如下所示。

final void tryTerminate() {//自旋for循环for (;;) {//获取ctlint c = ctl.get();//如果线程池的状态为RUNNING//或者状态大于TIDYING//或者状态为SHUTDOWN并且任务队列为空//直接返回程序,不再执行后续逻辑if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//如果当前线程池中的线程数量不等于0if (workerCountOf(c) != 0) { //中断线程的执行interruptIdleWorkers(ONLY_ONE);return;}//获取线程池的全局锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//通过CAS将线程池的状态设置为TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {//调用terminated()方法terminated();} finally {//将线程池状态设置为TERMINATEDctl.set(ctlOf(TERMINATED, 0));//唤醒所有因为调用线程池的awaitTermination方法而被阻塞的线程termination.signalAll();}return;}} finally {//释放锁mainLock.unlock();}}}复制代码

(1)获取ctl,根据情况设置线程池状态或者中断线程的执行,并返回。

//获取ctlint c = ctl.get();//如果线程池的状态为RUNNING//或者状态大于TIDYING//或者状态为SHUTDOWN并且任务队列为空//直接返回程序,不再执行后续逻辑if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//如果当前线程池中的线程数量不等于0if (workerCountOf(c) != 0) { //中断线程的执行interruptIdleWorkers(ONLY_ONE);return;}复制代码

(2)获取全局锁,通过CAS设置线程池的状态,调用terminated()方法执行逻辑,最终将线程池的状态设置为TERMINATED,唤醒所有因为调用线程池的awaitTermination方法而被阻塞的线程,最终释放锁,如下所示。

//获取线程池的全局final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//通过CAS将线程池的状态设置为TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {//调用terminated()方法terminated();} finally {//将线程池状态设置为TERMINATEDctl.set(ctlOf(TERMINATED, 0));//唤醒所有因为调用线程池的awaitTermination方法而被阻塞的线程termination.signalAll();}return;}} finally {//释放锁mainLock.unlock();}复制代码

接下来,看下terminated()方法。

terminated()方法

terminated()方法的源代码如下所示。

protected void terminated() { }复制代码

可以看到,terminated()方法的方法体为空,我们可以创建ThreadPoolExecutor的子类来重写terminated()方法,值得Worker线程调用tryTerminate()方法时执行我们自己定义的terminated()方法的业务逻辑。

郑重声明:本文内容及图片均整理自互联网,不代表本站立场,版权归原作者所有,如有侵权请联系管理员(admin#wlmqw.com)删除。
(0)
用户投稿
上一篇 2022年8月20日
下一篇 2022年8月20日

相关推荐

  • 奶茶妹妹现状2022章泽天为什么会选择原谅刘强东?

      章泽天是名聪明的女人,她自身才华出众。知道自己想要什么,所以她活的精彩。邓文迪和奶茶妹妹,一个是把野心写在脸上,一个是放在心里。看她们的朋友圈就知道,外交手腕,两人都很强。  …

    2022年7月26日
  • 习惯的可怕

    当你把任何一件事和人当成习惯的时候,你的人生就会被定格。 如果你某个清晨醒来的时候,你习惯性的准备好一切,当你踏出房门的时候,你发现自己工作十几年的单位不复存在,你是重整旗鼓还是一…

    2022年6月28日
  • 10道不得不会的JVM面试题 看完对你有不少的帮助

    JVM 基于JDK8 1. 说一说JVM的主要组成部分 方法区和堆是所有线程共享的内存区域;而虚拟机栈、本地方法栈和程序计数器的运行是线程私有的内存区域,运行时数据区域就是我们常说…

    2022年6月23日
  • 科技部等七部门部署科研助理岗位开发和落实工作

    光明网6月30日讯 日前,科技部网站公布《科技部等七部门关于做好科研助理岗位开发和落实工作的通知》,要求统筹推进科技研发、高新技术企业成长、高新技术产业发展和科研助理岗位开发工作,…

    2022年7月5日
  • 吕中:儿女是她一生的骄傲,老伴离世后,“影后”儿媳最贴心

    青梅竹马,举案齐眉,携手一生,是人人所期盼的爱情。 不管是普通人家,还是明星大腕儿,都希望与自己青梅竹马之人携手一生。 试问又有几个人能做到,尤其是在娱乐圈,你又能找到几对青梅竹马…

    2022年7月4日
  • 单位没有给我登记特殊工种,但从事特殊工种10年了,怎么维护自己权益?

    感谢邀请,更感谢楼主的提问。 楼主您好,如果因单位的过错没有登记特殊工种,并且从事特殊工种10年了,这种情况下是很难再办理特殊工种提前退休的。但是由于是单位的原因,造成的过错可以要…

    2022年5月16日
  • 在学习、工作和生活中“受气”时如何处理

    无论什么人,干什么样的工作,做多大的领导,有时候都会“受气”。 小孩不听话经常受父母的气;父母老了,有些事糊涂,受儿女的气;工作没干好,受工友和领导的气;有些时候没给老板的面子,受…

    2022年6月23日
  • 一些法医对案发现场的尸体进行勘察和解剖时,他们的内心真的不害怕吗?

    我是个搞现场的法医,就是专门跟尸体打交道的法医,这个问题还是我们法医@法医阿一@王登峰法医最有发言权,我来回答一下吧。喜欢法医学知识的可以关注我,有什么法医方面的疑问可以私信我。 …

    2022年3月25日
  • 有什么方法可以缩小毛孔吗?

    毛孔粗大一直是妹子们最讨厌的皮肤问题。它会让我们的脸看起来坑坑洼洼的,不仅影响了脸部的美观,也拉低的颜值。那么有什么方法可以缩小毛孔吗?下面就由小编给大家介绍了几种缩毛孔的方法,一…

    2022年8月10日
  • 60岁之后,年轻且外貌漂亮的人,一般都有这些习惯,你有几个?

    前言: 很多人到60岁之后,都已经进入老年状态,我们会发现在现实生活当中,有些人过了60岁,而且还非常年轻,外貌也特别漂亮,当这些人走在别人面前的时候,别人都觉得年纪没有60岁,而…

    2022年4月29日

联系我们

联系邮箱:admin#wlmqw.com
工作时间:周一至周五,10:30-18:30,节假日休息