Java线程池深度揭秘(java 线程池详解)
cac55 2024-09-20 12:42 27 浏览 0 评论
作为 Java 程序员,无论是技术面试、项目研发或者是学习框架源码,不彻底掌握 Java 多线程的知识,做不到心中有数,干啥都没底气,尤其是技术深究时往往略显发憷。
坐稳扶好,通过今天的分享,能让你轻松 get 如下几点。
1. Executor 框架家族简介;
2. 源码解读:线程池状态以及状态流转;
3. 源码解读:部分成员变量及方法;
4. 源码解读:任务提交submit方法背后;
5. 源码揭秘之后的反思;
6. 寄语。
Executor 家族简介
一图胜千言,脑中有图心不慌。
(一)Executor 接口。
public interface Executor {
void execute(Runnable command);
}
Executor 是一个接口(主要用于定义规范),定义了 execute 方法,用于接收 Runnable 对象。
(二)ExecutorService 接口。
public interface ExecutorService extends Executor {
// ... ...
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// ... ...
}
ExecutorService 也是一个接口,继承了 Executor 接口,增加了更多方法,相当于扩展了 Executor 接口的功能,例如定义了 submit() 系列方法,支持任务执行后得到返回结果。
(三)AbstractExecutorService 抽象类。
public abstract class AbstractExecutorService implements ExecutorService {
// ... ...
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// ... ...
}
AbstractExecutorService 是一个抽象类,实现了 ExecutorService 接口中的部分方法,例如提供了任务提交的 submit 方法的默认实现,而 submit 方法最终会调用 execute 方法。
不过 AbstractExecutorService 并没有实现 execute 方法,相当于为子类留了个口子,让子类去灵活扩展(钩子函数)。
(四)ScheduledExecutorService 接口。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
ScheduledExecutorService 接口继承了 ExecutorService,增加定时调度的方法,使其成为一个可定时调度任务的接口,相当于扩展了 ExecutorService 的功能。
(五)ScheduledThreadPoolExecutor 类。
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
// ... ...
}
ScheduledThreadPoolExecutor 类继承自 ThreadPoolExecutor 类,并且实现了 ScheduledExecutorService 接口,变成一个可定时调度任务的线程池。
(六)ThreadPoolExecutor 类。
public class ThreadPoolExecutor extends AbstractExecutorService {
// ... ...
}
ThreadPoolExecutor 继承 AbstractExecutorService 抽象类,并实现了 execute 等一系列方法。
(七)Executors 类。
public class Executors {
// ... ...
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// ... ...
}
研发人员可以通过 Executors 工厂类来创建线程池并返回一个ExecutorService 对象,而内部几乎全是对 ThreadPoolExecutor 的封装。
通过 Executor 的家族简单认识,应该能感觉到 ThreadPoolExecutor 类的重要性,所以接下来要重点对 ThreadPoolExecutor 类的源码进行剖析。
源码解读:线程池状态以及状态流转
上面注释截图来源于 ThreadPoolExecutor 的源码,别懵圈,仔细看差不多都能懂,能够看出线程池的五种状态以及对应的状态流转。
不知道你能看懂多少,看不懂也没关系,接下来把上面的注释用图呈现给大家。通过源码中的注释,能够勾勒出如下线程池的状态流转图(好的注释是多么的重要啊,感叹号!)。
源码解读:部分成员变量及方法
/**
* ctl 是一个 AtomicInteger 类型的原子对象。
* 其实设计很有意思:ctl 共包括 32 位(高 3 位表示"线程池状态",低 29 位表示"线程池中的线程数量")。
* 个人感觉:线程池状态与线程数量合二为一,用一个变量来表示,来减少锁竞争,提高并发效率。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** 表示线程池线程数的位数:32 - 3 = 29 位 */
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 表示最大线程容量(000,11111111111111111111111111111)*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits(运行状态保存在 int 值的高 3 位)
/** 111,00000000000000000000000000000 */
private static final int RUNNING = -1 << COUNT_BITS;
/** 000,00000000000000000000000000000 */
private static final int SHUTDOWN = 0 << COUNT_BITS;
/** 001,00000000000000000000000000000 */
private static final int STOP = 1 << COUNT_BITS;
/** 010,00000000000000000000000000000 */
private static final int TIDYING = 2 << COUNT_BITS;
/** 011,00000000000000000000000000000 */
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
/** 获取线程池的运行状态 */
private static int runStateOf(int c) { return c & ~CAPACITY; }
/** 线程池内有效线程的数量 (workerCount) */
private static int workerCountOf(int c) { return c & CAPACITY; }
/** 线程池的状态和线程的数量组装,成为 ctl */
private static int ctlOf(int rs, int wc) { return rs | wc; }
仔细去看上面的代码,注释已经很清晰啦。重点关注 ctl 变量,这个变量将线程池自身状态和线程数量,融合在这一个变量中,其中高 3 位表示线程池状态,低 29 位表示线程池中的线程数量,这样在多线程环境下更易保证线程池自身状态和线程数量的统一,不得不佩服源代码作者 Doug Lea,可谓是设计甚妙!
源码解读:任务提交 submit 方法背后
疑问?当调用 submit() 方法,把一个任务提交给线程池去处理的时候,线程池的处理过程是什么样的呢?
通过开篇对 Executor 的家族简介,能够看到 submit() 方法最终会调用 ThreadPoolExecutor 的 execute 方法,走进源码好好看看 execute 方法都做了啥?
重点关注源码中的注释(红框圈住部分),若看懂注释,提交任务时线程池对应的处理,也就懂了一半啦(感触:好的编码规范真的好重要,业务开发时,核心代码一定要有注释)。
若依然很懵逼,一图胜千言,那就继续上图吧。
了解上图的整体流程,再去看看源码就彻悟啦。
public void execute(Runnable command) {
//【Step 0. 如果任务为空则抛出 NPE 异常】
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//【Step 1. 判断核心线程是否已满】
// 1.1. 判断当前线程数是否已经达到核心线程数的限制
if (workerCountOf(c) < corePoolSize) {
// 1.2. 如果未达到核心线程数的限制,则会直接添加一个核心线程,并指定首次执行的任务,进行任务处理
if (addWorker(command, true))
return;
// 1.3. 如果添加失败,则刷新线程池的状态和线程的数量对应的变量 ctl
c = ctl.get();
}
//【Step 2. 判断阻塞队列是否已满】
// 2.1. 检查线程池是否是运行状态,然后将任务添加到等待队列
if (isRunning(c) && workQueue.offer(command)) {
// 2.2. 任务成功添加到等待队列,再次刷新 ctl
int recheck = ctl.get();
// 2.3. 添加任务到等待队列成功后,如果线程池不是运行状态,则将刚添加的任务从队列移除并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.4. 判断当前线程数量,如果线程数量为 0,则添加一个非核心线程,并且不指定首次执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//【Step 3. 判断最大线程数量是否已经达到】
// 3.1. 添加非核心线程,指定首次执行任务,如果添加失败,执行异常策略
else if (!addWorker(command, false))
reject(command);
}
结合注释去读代码,应该都能搞懂。很显然 execute 方法中,多处都调用了 addWorker 方法,接下来简单剖析一下 addWorker 方法。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// ... ...
for (;;) {
// ... ...
// 通过 CAS 自旋,增加线程数 +1,增加成功跳出双层循环,继续往下执行
if (compareAndIncrementWorkerCount(c))
break retry;
// ... ...
}
}
// 到这儿,说明已经成功的将线程数 +1 了,但是真正的线程还没有被添加
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 添加线程,Worker 是继承了 AQS,实现了 Runnable 接口的包装类
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// ... ...
// 添加新增的 Worker
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
// ... ...
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动 Worker
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
为了简明扼要,方法酌情进行了删减。addWorker 方法主要是通过双重 for 循环进行线程数 +1,然后创建 Worker,并进行添加到 HashSet<Worker> workers 列表中,然后调用 t.start() 启动 Worker。
那么接下来再一起看看 Worker 里面都做了啥?
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// ... ...
final Thread thread;
Runnable firstTask;
/**
* 通过指定的 firstTask 任务创建 Worker 对象
*/
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
// 通过当前 Worker 对象创建对应的线程对象 t,
// 所以调用 t.start() 时最终会调用 Worker 的 run 方法
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// run 方法最终会调用 ThreadPoolExecutor 的 runWorker 方法
runWorker(this);
}
// ... ...
}
通过 Worker 的构造函数能够了解到,会通过创建的 Worker 对象去构建线程对象,当线程对象启动时最终会调用 runWorker 方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 取出需要执行的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果 task 不是 null 或者去 workQueue 队列中取到待执行的任务不为 null
while (task != null || (task = getTask()) != null) {
// ... ...
try {
// 开始执行任务前的钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
// ... ...
} finally {
// 任务执行后的钩子方法
afterExecute(task, thrown);
}
} finally {
// ... ...
}
}
completedAbruptly = false;
} finally {
// Worker 退出
processWorkerExit(w, completedAbruptly);
}
}
runWorker 方法,首先会取出要执行的任务 task,如果为空则会调用 getTask 方法从任务队列中获取,然后调用任务对应的 run 方法进行执行,另外预置了 beforeExecute、afterExecute 两个钩子函数,让研发人员监控线程执行成为可能。
另外,线程池中的线程如何从队列中获取待执行的任务的呢?走进 getTask 方法看一看。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 这块体现了:线程池的线程是复用的,通过循环去获取队列中的任务去执行。
for (;;) {
int c = ctl.get();
// ... ...
int wc = workerCountOf(c);
// allowCoreThreadTimeOut: 是否允许核心线程超时.
// 如果设置为 false,那么线程池在达到 corePoolSize 个工作线程之前,不会让闲置的工作线程退出。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// ... ...
try {
// 从 workQueue 队列中取待执行的任务,根据 timed 来选择等待时间
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
为了便于理解,源码做了部分删减。重点关注从任务队列中获取待执行任务的对象的方法调用:workQueue.poll()、workQueue.take() ,前者是移除并返回队列中的头部元素,如果队列为空,则返回 null,而后者是移除并返回队列中的头部元素,如果队列为空,则阻塞。
烟未灭,酒过半 ... ... 源码探讨就谈到这儿... ...
源码揭秘之后的反思
(一)钩子函数的使用场景
场景一:
如上面自定义的 MyThreadPoolExecutor,可以让日志打印线程及线程数等等信息。意味着研发人员可以扩展 ThreadPoolExecutor,对钩子函数 beforeExecute、afterExecute 进行实现,进而可以知晓线程池内部的调度细节,可以有效进行监控,针对故障排查应该很有帮助。
场景二:
AbstractExecutorService 并没有实现 execute 方法,而是为子类 ThreadPoolExecutor 留了个口子,让子类去灵活扩展(钩子函数)。
仔细想想业务开发时,诸多的使用场景,何尝不是如此呢?
(二)线程池的 submit 方法与 execute 方法啥区别呢?
execute 方法,适用于不需要关注返回值的场景,只需要将线程丢到线程池中去执行就可以了。
而 submit() 方法,适用于需要关注返回值的场景,不过最终会调用 execute() 方法。
考虑到性能提升,如果不需要关注返回值,则建议直接调用 execute() 方法,因为那样会屏蔽很多中间调度。
(三)线程池状态与线程数量用一个 ctl 变量表示的好处?
线程池状态和线程数量合二为一,用一个原子变量来表示,来减少锁竞争,提高并发效率。
(四)清晰的注释是否有必要?
通过探秘源码,很多图都是根据源码注释勾勒出来的。可以看出清晰的注释,对于核心流程而言真的很重要,一定要养成良好的编码习惯,关键业务逻辑、核心流程,建议一定要写好注释,利人又利己,何乐而不为之。
(五)Executor 家族框架,若写基础框架时,是否有借鉴意义呢?
个人感觉很有借鉴意义,因为无论业务开发还是基础服务,总会看到类似模式框架的身影,总会有大牛模仿着造轮子,所以闲暇之余可以抽象一下。
寄语写最后
本次,主要对 Executor 家族进行了简单介绍,并着重对线程池背后的 ThreadPoolExecutor 类进行深度剖析,知其然知其所以然,希望对大家有帮助。
好了,本次就谈到这里,一起聊技术、谈业务、喷架构,少走弯路,不踩大坑。欢迎关注「一猿小讲」,会持续输出原创精彩分享,敬请期待!
推荐阅读:
相关推荐
- 电工电路图中二极管、三极管的符号标识
-
1、二极管二极管是一种常用的具有一个PN结的半导体器件,它具有单向导电性,通过二极管的电流只能沿一个方向流动。二极管只有在所加的正向电压达到一定值后才能导通。在电工电路图中,二极管以专用的图形符号和电...
- 开关部件在电工电路中的符号标识
-
1、在电工电路中还常常绘制有具有专门含义的图形符号,认识这些符号对于快速和准确理解电路图十分必要。在识读电工电路的过程中,还常常会遇到各种各样的功能部件的图形符号,用于标识其所代表的物理部件,例如各种...
- 走过路过 别错过!整理最全电工电路各种元器件及辅料字母符号解析
-
走过路过别错过!整理最全电工电路各种元器件及辅料字母符号解析建议收藏备用起来以备不时之需!每天学习一点点就会有收获!...
- 熬夜吐血整理的电工电路的字母符号!及各种元器件实物图解符号!
-
熬夜吐血整理的电工电路的字母符号!及各种元器件实物图解符号!...
- 电气人士接好了!史上最全的电气符号介绍
-
有没有人像小编一样看到这样的图纸就犯晕啊?像这样的图纸对于电气人士来说应该不陌生吧,可是对于一些刚入行的,或者在电气行业却不是技术岗位的人来说,那与天书没什么区别。今天小编狠狠心,为大家搜集了一些关于...
- 新手工程师,这些电路图符号你都了解吗?
-
以下电路图符号大全,千万别弄错了噢~~更多行业信息可查阅快点PCB平台订阅号:eqpcb_cp。...
- 电工学习通(一):电路图符号知识大全(安科瑞任心怡、许玉龙)
-
电路图符号知识我们常说的电路图呢是一种以物理电学标准符号来绘制各MOS管电子元器件组成和关系的电路原理布局图,听不懂也没关系,我们只要记住以下几点就可以了:电路图符号数量众多,大致可以分为四个类别:传...
- 常用电子元器件电路符号及实物外形图,你值得拥有
-
作为一名电工初学者,认识并了解常用的电子元器件是一项必备的基本技能,这包括电子元器件的电路符号、实物、用途等。本文电工学习网小编和大家分享一些电子元器件的电路符号及实物外形图,希望对大家的学习有所帮助...
- 电工常用的符号及单位
-
常用的符号及单位①欧姆定律I=U/R(适用于电阻电路,如白炽灯)②电能计算W=P·t(W为我们常说的电度,P为功率多少瓦或千瓦,t为时间小时计量)例如一个220V,60W的白炽灯,在220V电压工...
- 电路图常用的字母符号及释义(详细版!)
-
你是不是在查看电路图时常遇到一些看不懂的字母或字符,不明白它们表示什么含义?今天小编整理了一些电路图常用的字母符号及其释义,供大家查阅,赶快收藏吧!在之前的文章,小编大致整理了绘制电路图常涉及的电路符...
- 最全电工电路的字母符号大全!电工必备知识技能!建议收藏备用
-
最全电工电路的字母符号大全!电工必备知识技能!建议收藏备用!每天学习一点点就会有收获!学海无涯!...
- 电路符号大全,赶快收藏
-
认识电路符号是绘制电路图的前提。绘制电路图需要涉及的电路符号罗列出来有很多,大致可以分为五个类别:基本电路符号、传输路径符号、开关和继电器符号、集成电路组件以及限定符号。基本电路符号绘制基础电路图可能...
- 电气电路的图形符号,不怕看不懂电路图啦
-
一、电压、电流、电池的图形符号//二、信号灯、信号器件、按钮、旋钮开关和测量仪表的图形符号//三、负载开关的图形符号//四、熔断器的图形符号//五、继电器、接触器、接触器触点和操作器件的图形符号//六...
- 图解普通电阻器电路符号的含义,初学者必看
-
电子元器件的电路符号中含有许多有用的、对电路分析有益的识图信息,掌握了电子元器件电路符号的识图,电路分析就会简单一些。电阻器电路符号图1-1所示是普通电阻器电路符号图解示意图。在电路分析中,为了表述方...
- 电路图符号大全(电容、电阻、二极管、三极官、集成电路)
-
基础知识薄弱,不懂工作原理,不会看图、识图,这里更多电路图(原理图)符号大全、电路图形符号(指用一种书画图形代表一种电子元件)(比如:电容、电阻、二极管、三极官、集成电路等等)的符号为初学...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- 如何绘制折线图 (52)
- javaabstract (48)
- 新浪微博头像 (53)
- grub4dos (66)
- s扫描器 (51)
- httpfile dll (48)
- ps实例教程 (55)
- taskmgr (51)
- s spline (61)
- vnc远程控制 (47)
- 数据丢失 (47)
- wbem (57)
- flac文件 (72)
- 网页制作基础教程 (53)
- 镜像文件刻录 (61)
- ug5 0软件免费下载 (78)
- debian下载 (53)
- ubuntu10 04 (60)
- web qq登录 (59)
- 笔记本变成无线路由 (52)
- flash player 11 4 (50)
- 右键菜单清理 (78)
- cuteftp 注册码 (57)
- ospf协议 (53)
- ms17 010 下载 (60)