面试官:有了解过线程池的工作原理吗?说说看

面试官: 有了解过线程池的工作原理吗?说说看

前言

目前正在出一个Java多线程专题长期系列教程,从入门到进阶源码解读, 篇幅会较多, 喜欢的话,给个关注 ~

本节主要带大家从ThreadPoolExecutor源码角度来了解一下线程池的工作原理,一起来看下吧~

Executor 接口

首先Executor这个接口是线程池实现的顶层接口类,我们上节遇到的ExecutorService也是继承了Executor

public interface ExecutorService extends Executor {…}

ExecutorService的上层AbstractExecutorService这个抽象类实现了接口ExecutorService

public abstract class AbstractExecutorService implements ExecutorService {…}

ThreadPoolExecutor继承了AbstractExecutorService

public class ThreadPoolExecutor extends AbstractExecutorService {…}

ThreadPoolExecutor这个类我们需要重点看一下,它是接口的实现类,我们以newCachedThreadPool为例

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }

可以看到内部其实还是调用了ThreadPoolExecutor,我们再看newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }

内部也是调了它, 下面我们就看下这个类

ThreadPoolExecutor

首先我们从构造函数看起,它主要有四个构造函数

构造函数一

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }

一共有五个参数:

  • corePoolSize – 保留在池中的线 程数,即使是空闲的,除非设置allowCoreThreadTimeOut
  • maximumPoolSize – 池中允许的最大线程数
  • keepAliveTime – 当线程数大于核心时,这是多余的空闲线程在终止前等待新任务的最长时间。
  • unit – keepAliveTime参数的时间单位
  • workQueue – 用于在执行任务之前保存任务的队列。此队列将仅保存由execute方法提交的Runnable任务。

构造函数二

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }

其它参数同上

  • threadFactory 执行器创建新线程时使用的工厂

构造函数三

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }

  • handler 由于达到线程边界和队列容量而阻塞执行时使用的处理程序

构造函数四

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

这个把之前都综合了一下,其实可以看到前几个内部都调用了this,调用自身,也就是调用这个构造函数,进行一些初始化

BlockingQueue 阻塞队列

有几个参数比较好理解,我们来看下这个参数workQueue, 它是一个阻塞队列,这里简要给大家提一下,这块内容也比较重要,后边会专门去讲

BlockingQueue本身是一个还接口,它有几个比较常用的阻塞队列

  • LinkedBlockingQueue 链式阻塞队列,底层数据结构是链表
  • ArrayBlockingQueue 数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
  • SynchronousQueue 同步队列,内部容量为0,每个put操作必须等待一个take操作
  • DelayQueue 延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素

ThreadFactory 线程工厂

这个是线程工厂类,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等, 同样它也是一个接口,我们在ThreadPoolExecutor内部看到了 Executors.defaultThreadFactory(),这个是一个默认工厂

static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = “pool-” + poolNumber.getAndIncrement() + “-thread-“; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }

没有指定参数,就会默认创建DefaultThreadFactory,还有其它的factory,大家可以自行看下,区别就在创建线程时指定的参数

RejectedExecutionHandler 拒绝策略

RejectedExecutionHandler同样是一个接口,这个处理器是用来专门处理拒绝的任务,也就是ThreadPoolExecutor无法处理的程序。同理,我们可以看到ThreadPoolExecutor内部有调了defaultHandler

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

这个是默认的拒绝策略, 可以看到它的默认处理是抛出拒绝的异常

public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(“Task ” + r.toString() + ” rejected from ” + e.toString()); } }

再带大家看下另外的策略, DiscardPolicy,这个策略不会抛出异常,它会丢弃这个任务

public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }

DiscardOldestPolicy 该策略丢弃最旧的未处理请求,然后重试execute ,除非执行程序被关闭,在这种情况下任务被丢弃。

public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 判断是否关闭 if (!e.isShutdown()) { e.getQueue().poll(); // 任务重试 e.execute(r); } } }

CallerRunsPolicy 直接在execute方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下,任务将被丢弃。

public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 直接判断是否关闭 未关闭就执行 if (!e.isShutdown()) { r.run(); } } }

线程调度策略

看完构造函数,下面看下它的一些常量

private static final int COUNT_BITS = Integer.SIZE – 3;// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;

通过变量名,我们大致知道是用来表示线程池的状态。线程池本身有一个调度线程,这个线程就是用于管理整个线程池的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等,所以它本身也有上面的状态值。

当线程池被创建后就会处于RUNNING状态, 主池控制状态ctl是一个原子整数

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

调用shutdown()方法后处于「SHUTDOWN」状态,线程池不能接受新的任务,清除一些空闲worker,不会等待阻塞队列的任务完成。

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }

另外,还有一个shutdownNow,调用后处于「STOP」状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。

public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中断所有线程 interruptWorkers(); // 将任务队列排空到一个新列表中 这里要注意下 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }

当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为「TIDYING」状态。接着会执行terminated()函数。

final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 不等于0时 中断任务线程 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将状态设置为 TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 终止 terminated(); } finally { // 执行完 terminated 转为 TERMINATED状态 ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }

execute

这个是执行任务的核心方法,我们一起看一下

public void execute(Runnable command) { // 如果任务不存在 抛空异常 if (command == null) throw new NullPointerException(); // 获取当前状态值 int c = ctl.get(); // 当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果不小于corePoolSize,则将任务添加到workQueue队列。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。 if (! isRunning(recheck) && remove(command)) reject(command); // 线程池处于running状态,但是没有线程,则创建线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果放入workQueue失败,则创建非核心线程执行任务, else if (!addWorker(command, false)) // 如果这时创建失败,就会执行拒绝策略。 reject(command); }

在源码中,我们可以看到,多次进行了isRunning判断。在多线程的环境下,线程池的状态是多变的。很有可能刚获取线程池状态后线程池状态就改变了

总结

下面给大家简要的总结一下线程池的处理流程

  • 线程总数量小于线程池中保留的线程数量(corePoolSize),无论线程是否空闲,都会新建一个核心线程执行任务,这一步需要获取全局锁
  • 线程总数量大于corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行,从而达到线程的复用
  • 当缓存队列满了,会创建非核心线程去执行这个任务。
  • 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取拒绝策略进行处理。
  • 结束语

    它的源码还是比较长的,一篇文章说不清楚,有兴趣的同学可以通过本篇文章的理解继续阅读它的源码。

    下一节, 继续带大家详探讨ThreadPoolExecutor中是如何进行线程复用 ~

    往期内容推荐

    • Java多线程专题之线程与进程概述
    • Java多线程专题之线程类和接口入门
    • Java多线程专题之进阶学习Thread(含源码分析)
    • Java多线程专题之Callable、Future与FutureTask(含源码分析)
    • 面试官: 有了解过线程组和线程优先级吗
    • 面试官: 说一下线程的生命周期过程
    • 面试官: 说一下线程间的通信
    • 面试官: 说一下Java的共享内存模型
    • 面试官: 有了解过指令重排吗,什么是happens-before
    • 面试官: 有了解过volatile关键字吗 说说看
    • 面试官: 有了解过Synchronized吗 说说看
    • Java多线程专题之Lock锁的使用
    • 面试官: 有了解过ReentrantLock的底层实现吗?说说看
    • 面试官: 有了解过CAS和原子操作吗?说说看
    • Java多线程专题之线程池的基本使用
    • 我的博客(阅读体验较佳)
    • 写给初学者的Java基础教程
    • 一文带你快速学习Java集合类
    • 花几分钟快速了解一下泛型与枚举
    • Java注解与反射入门到进阶
    • JavaIO教程从入门到进阶

    项目源码(源码已更新 欢迎star )

    • java-thread-all
    • 地址: https://github.com/qiuChengleiy/java-thread-all.git

    推荐 SpringBoot & SpringCloud (源码已更新 欢迎star )

    • springboot-all
    • 地址: https://github.com/qiuChengleiy/springboot-all.git
    • SpringBoot系列教程合集
    • 一起来学SpringCloud合集
    郑重声明:本文内容及图片均整理自互联网,不代表本站立场,版权归原作者所有,如有侵权请联系管理员(admin#wlmqw.com)删除。
    (0)
    用户投稿
    上一篇 2022年6月23日
    下一篇 2022年6月23日

    相关推荐

    联系我们

    联系邮箱:admin#wlmqw.com
    工作时间:周一至周五,10:30-18:30,节假日休息