Java并发系列笔记之线程池

线程池是资源复用的典范之作,其基本思想是维护一个含有一定数量的在运行的线程集合,在需要运行线程任务的时候直接从这个集合中取出一个线程去运行任务,而不是重新创建一个。

如果我们自己去实现一个线程池,那么基本的想法是维护一个线程的集合,这些线程都从一个队列中去取任务,如果队列为空,则阻塞对应的线程,等待队列不空的消息通知。当线程完成了任务,应该将线程返回给线程队列,而不是关闭线程。基本思想是这样,那么Java中具体是如何实现线程池的呢,我们来看看。

ThreadPoolExecutor

一般情况下,我们使用线程池会使用Executors的静态方法获取线程池,如

  • public static ExecutorService newFixedThreadPool(int nThreads)
  • public static ExecutorService newSingleThreadExecutor()
  • public static ExecutorService newCachedThreadPool()
  • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

除了最后一个方法外,其它三个方法都是返回一个ThreadPoolExecutor实例,它是ExecutorService接口的一个实现类。瞄一眼ThreadPoolExecutor的构造方法

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

它的构造方法有多个重载,其中最终都是使用了上面这个构造方法,它包括7个参数,corePoolSize,核心线程数量;maximumPoolSize,最大线程数量;keepAliveTime,超过核心线程数量线程的在空闲后的存活时间;unit,存活时间的时间单位;workQueue,超过核心线程后任务存放的阻塞队列;threadFactory,常用定义线程名字的线程工厂,可以使用默认工厂;最后一个handler,是阻塞队列已满,并且线程数达到maximumPoolSize的时候的处理策略,其中包括了抛出异常(AbortPolicy),谁请求谁调用(CallerRunsPolicy),丢弃线程池中的一个任务来执行现在的任务(DiscardOldesPolicy),直接丢弃掉(DiscardPolicy)默认使用抛异常策略。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

可以看到newFixedThreadPool使用默认工厂,默认拒绝策略,链表阻塞队列,最大线程数和核心线程数相同,并且如果超过核心线程数的线程控制,立即失效。这种比较适合任务数固定的任务。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newSingleThreadExecutor的核心线程数和最大线程数都是1,并且使用LinkedBlockingQueue作为缓冲队列。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

newCachedThreadPool在没有任务的时候并不创建线程,只有在任务出现的时候才使用SynchronousQueue队列传递任务给线程,但SynchronousQueue每次只能传递一个任务,新的任务首先会通过offer(非阻塞)方法尝试传递任务给线程,如果此时没有空闲的线程,会新生成线程来完成任务,而且线程的空闲时间是60s,所以比较适合任务多而且短的任务集(短的原因是总可以有空闲线程去SynchronousQueue中取任务)。关于SynchronousQueue,可以参考这几篇文章《SynchronousQueue使用实例》,《J.U.C之阻塞队列:SynchronousQueue》。

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

ScheduledThreadPoolExecutor可以使用schedule方法在给定时间延迟时进行调用。

ThreadPoolExecutor是如何管理线程池的?

从上面我们看到使用ThreadPoolExecutor可以获取线程池,并且可以调用execute(Runnable)和submit(Runnable)提交任务。那么在执行execute方法的时候,ThreadPoolExecutor是如何管理线程池来完成当前的任务的。我们可以从execute的源码中一窥这个处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//第一步
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//第二步
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//第三步
reject(command);
}

其基本策略有三大步,第一步,在当前的线程池中线程数量小于核心线程数量,直接调用addWork(Runnable,boolean)添加线程,并且执行任务任务,任务完成后这个线程会驻留在池中等待执行任务。如果线程数已经超过的核心线程数,第二步,则将任务使用阻塞队列的offer非阻塞方式放入队列中。如果阻塞队列也满了,则尝试addWork(Runnable,boolean),如果线程数量超过了最大线程数量,则添加失败,调用拒绝策略。
addWork(Runnable)方法基本思想是首先将Runable包装成Worker,并执行Worker。Worker的数据结构使得在执行完该方法后,会尝试从阻塞队列中获取任务继续执行。
另外值得注意的是JDK中的线程池标注的5种状态,如下图所示

只有RUNNING状态的线程池可以接收任务,处于SHUTDOWN状态的线程池不可以接受新任务,但是可以继续对已添加的任务进行处理。处于STOP状态的线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。TIDYING状态的线程池在执行terminated函数,而TERMINATED状态说明已经执行完terminated函数,彻底终止。其中RUNNING可以转换成SHUTDOWN(shutdown()函数)或STOP状态(shutDownNow()函数),有STOP或SHUTDOWN状态可以转换成TIDYING,而TIDYING可以转换成TERMINATED。

参考文献

J.U.C之线程池:ThreadPoolExecutor
深度解读 java 线程池设计思想及源码实现

声明

本文首发表于我的博客,欢迎关注!转载须注明文章出处,作者保留文章所有权。

坚持原创技术分享,您的支持将鼓励我继续创作!