java并发-(6)线程池

Java中的线程池是日常开发中进行并发编程,使用最多的一个开发模式。合理使用线程池可以带来三个好处

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  2. 提高响应速度。线程池预先创建了线程,不必等到任务到来时才开始线程的创建
  3. 提高线程的可管理性。进行统一分配、调优、监控

线程池原理

ThreadPoolExecutor作为Java中线程池的实现,基于这个实现,这里分析一下线程池的原理。

ThreadPoolExecutor的类结构

  1. 提供接口void execute(Runnable command),解耦了提交任务、任务具体执行;
  2. 提供了管理接口:void shutdown(),提供能够执行有返回值任务的接口: Future submit(Callable task)

ThreadPoolExecutor源码分析

1. 状态和线程数管理

ThreadPoolExecutor使用ctl变量来描述线程池的运行状态(runState,使用ctl高三位表示)和有效线程数量(workerCount,使用ctl低29位表示),基于位运算的高效、简单的算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// rs表示runState,wc表示workerCount。这个函数通过位或运算,得出ctl的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

// Packing and unpacking ctl
// 通过位与运算,获取c的高三位的值。即runState的值(也就是获取线程状态)
private static int runStateOf(int c) { return c & ~CAPACITY; }

// 通过位与运算,获取c的除高三位除外的低位的值,即workerCount的值(也就是获取线程数)
private static int workerCountOf(int c) { return c & CAPACITY; }

// 初始化时,线程池状态rs为RUNNING,线程数为wc为0,得到ctl的初始值为 111000...(省略26个0)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 29
private static final int COUNT_BITS = Integer.SIZE - 3;

// 000111...(省略26个1),前面说到ThreadPoolExecutor中使用ctl的低29位描述线程数,这里的CAPACITY也就是该线程池中线程数的容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 可以看出各线程池状态的大小排序:RUNNING < SHUTDOWN < STOP < TINYING < TERMINATED
// 111000...(省略26个0)
private static final int RUNNING = -1 << COUNT_BITS;
// 0
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001000...(省略26个0)
private static final int STOP = 1 << COUNT_BITS;
// 010000...(省略26个0)
private static final int TIDYING = 2 << COUNT_BITS;
// 011000...(省略26个0)
private static final int TERMINATED = 3 << COUNT_BITS;
2. 任务的提交

线程池中的线程分为两种类型(核心线程和非核心线程),分别用corePoolSize、maximumPoolSize来限定其线程数上限;
当核心线程数达到上限时,先把任务添加到工作队列workQueue中
当工作队列workQueue为有界队列(即队列大小有限制)时,可以通过判断整个线程池的有效线程数是否达到maximumPoolSize,未达到则可以通过创建一个临时线程来执行任务(即非核心线程,存活时间keepAliveTime)
如果有效线程已达到maximumPoolSize,则使用reject策略拒绝任务提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// 使用一个新创建的线程、或者一个已存在的线程来执行提交的任务command
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取ctl的值
int c = ctl.get();
// workerCountOf(c)获取当前线程数,跟核心线程数(corePoolSize)限定数进行比较
if (workerCountOf(c) < corePoolSize) {
// 在线程池中创建核心线程,执行提交的任务,如果正常提交任务则直接返回该方法,证明任务已经提交到线程池中
if (addWorker(command, true))
return;
c = ctl.get();
}
// 上一步中未正常提交任务。(原因:1. 线程池处于关闭状态;2. 线程数大于设定的最大限制)
// 如果线程池为RUNNING状态,则提交到workQueue
if (isRunning(c) && workQueue.offer(command)) {
// double check运行状态,如果线程池当前不为RUNNING状态,避免因线程池关闭导致的任务丢失,把之前丢到workQueue中的任务删除,之后用reject策略告知调用者
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 检查线程池有效线程数,如果数量为0,则创建一个Worker(用于消费提交到queue中的任务)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果workQueue为有界队列(即大小有限制),尝试通过创建线程来执行任务(只有在当前线程数小于maximumPoolSize才能成功添加)
else if (!addWorker(command, false))
// 当前线程数大于maximumPoolSize,直接拒绝任务提交
reject(command);


private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取执行状态
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 满足以下任意一种情况,将直接退出该方法,不会进行创建Worker的创建和任务的执行,返回false告知上层调用,未正常提交任务
// 1. 线程池处于STOP、TYDING、TERMINATED状态
// 2. 线程池处于SHUTDOWN状态,并且firstTask(提交的任务)不为null时
// 3. 线程池处于SHUTDOWN状态,并且firstTask(提交的任务)为null时,而且阻塞队列workQueue为空时
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
// 线程数达到了线程池容量,或者达到了核心线程数(corePoolSize)(如果此时创建的是核心线程),或者达到了最大线程数(maximumPoolSize),不创建线程
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加线程数
if (compareAndIncrementWorkerCount(c))
break retry; // 跳出循环,进入实际创建Worker的流程
// 重新读取ctl的值,如果和方法开头获取的值不一致,则重新进入retry循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 如果满足以下两个条件中的一个,需要通过方法(isAlive)判断线程的运行状态
// 1. rs < SHUTDOWN说明,处于RUNNING状态
// 2. rs == SHUTDOWN并且提交的任务为null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 记录该线程池启动到现在的最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程,提交的任务在此时进行真正开始执行
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

3. 任务的执行

任务的执行,在Worker中的run方法执行,其内部调用runWorker(Worker w)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果创建Worker时,firstTask带上了提交任务,则直接执行。否则从阻塞队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前的钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后的钩子函数
afterExecute(task, thrown);
}
} finally {
task = null;
// 记录当前Worker完整执行了多少个任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程池关闭时调用
processWorkerExit(w, completedAbruptly);
}
}

4. 线程的关闭

通过中断worker中的线程来中止worker生命周期,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 线程池关闭,通过中断Worker中的线程来关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
// 中断所有worker
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

总结

通过以下源码阅读,在使用线程池时需要注意以下几点

  1. 指定合理的corePoolSize、maximumPoolSize参数,避免创建过多的线程,导致过多的上下文切换
  2. 应该使用有界队列来保存任务,否则maximumPoolSize参数不会起作用,始终只有corePoolSize在处理队列中的任务,缺乏伸缩性
  3. 在任务执行的前后,分别存在两个钩子函数:beforeExecute、afterExecute;如果有需要可以子类化ThreadPoolExecutor来重写这两个方法,实现一些我们自己的定制逻辑