1,线程池的基本使用
在了解这个线程池的底层原理之前,来先了解一下线程池的基本使用,首先定义一个线程池的工具类,用于构建一个全局的线程池,这里通过获取空闲的cpu的个数来设置最大的线程数和核心线程数,阻塞队列选择带有容量的链表有界队列, 选用的默认的线程工厂
/**
* 线程池工具
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date : 2023/10/14
*/
public class ThreadPoolUtil {
/**
* io密集型:最大核心线程数为2N,可以给cpu更好的轮换,
* 核心线程数不超过2N即可,可以适当留点空间
* cpu密集型:最大核心线程数为N或者N+1,N可以充分利用cpu资源,N加1是为了防止缺页造成cpu空闲,
* 核心线程数不超过N+1即可
* 使用线程池的时机:1,单个任务处理时间比较短 2,需要处理的任务数量很大
*/
public static synchronized ThreadPoolExecutor getThreadPool() {
if (pool == null) {
//获取当前机器的cpu
int cpuNum = Runtime.getRuntime().availableProcessors();
log.info("当前机器的cpu的个数为:" + cpuNum);
int maximumPoolSize = cpuNum * 2 ;
pool = new ThreadPoolExecutor(
maximumPoolSize - 2,
maximumPoolSize,
5L, //5s
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50), //链表有界队列
Executors.defaultThreadFactory(), //默认的线程工厂
new ThreadPoolExecutor.AbortPolicy()); //直接抛异常,默认异常
}
return pool;
}
}
随后创建一个线程任务Task类,用于将线程加入到线程池中
/**
* @Author: zhenghuisheng
* @Date: 2023/10/14 23:19
*/
public class Task implements Runnable {
@Override
public void run() {
System.out.println("线程名称:" + Thread.currentThread().getName());
try {
//模拟业务
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
最后来一个测试主类,用于添加任务,并且获取阻塞队列的长度
/**
* @Author: zhenghuisheng
* @Date: 2023/10/14 23:17
*/
public class ThreadPoolDemo {
//获取线程池
static ThreadPoolExecutor pool = ThreadPoolUtil.getThreadPool();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//创建线程任务
Task task = new Task();
//线程池添加任务
pool.execute(task);
//获取阻塞队列
BlockingQueue<Runnable> queue = pool.getQueue();
System.out.println("阻塞队列的长度为: " + queue.size());
}
}
}
在前面的阻塞队列系列中,有很多事关于线程池的使用的,可以参考那里的使用
2,线程池的核心参数
在线程池中,最重要的就是7个参数的设置,需要根据不同的场景设置不同的参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
}
接下来对上面的7个参数做一个初步的定义
int corePoolSize:核心线程数,默认不设置超时时间,当然可以手动设置过期超时时间
int maximumPoolSize:最大线程数 = 非核心线程 + 核心线程数,即线程池里面最多容纳的数量
long keepAliveTime:线程池维护线程所允许的空闲时间,非核心线程超时则会默认被销毁
TimeUnit unit:时间单位,有分钟,s,毫秒,微秒,纳秒等
BlockingQueue<Runnable> workQueue:存放未来得及提交的任务,其本质为一个BlockingQueue的阻塞队列,当线程池
中的核心线程满了之后就会将任务放入到阻塞队列里面
ThreadFactory threadFactory:线程工厂默认使用Executors里面的 defaultThreadFactory方法 来创建线程
RejectedExecutionHandler handler:拒绝策略,当线程池里面满了以及阻塞阻塞队列满了的情况下,会触发拒绝策略
2.1,通过银行案例说明这7个参数
1,假设今天只开了三个窗口,对应的就是三个核心线程的数量,一个人来办理业务就对应一个窗口,即为对应的核心线程
2,如果三个窗口满了,则下一个人在等候区等待,就是对应的阻塞队列,当然等候区的座位有限,即对应队列的长度也是有 限的,不过第一个人来到等候区会第一个办理业务,即对应队列的性质,先进先出
3,等候区也满了,银行发现今天来办理业务的人多,因此就会多开窗口临时接待这些办理业务的人员,该窗口对应的就是非核心线程,当然窗口有限,因此非核心线程的数量也是有限的,又因为该窗口为临时窗口,所以在没人的时候或者人数少的时候,该窗口也会被关闭,让临时工休息或者解雇,以防止资源的浪费,因此对应的非核心线程就被设置了空闲状态下的超时时间,也是为了避免资源的浪费
4,如果柜台的人数也满了,等候区的人数也满了,即达到饱和状态,银行就会劝退接下来要来办理业务的人,让他们下午来或者明天来,当然不同人有不同的劝退方式,这就是线程池的存在的多种拒绝策略的原因
2.2,如何设置核心线程数和最大线程数
为了更加充分合理的理由这个cpu和内存等资源,因此需要根据实际开发的工作类型来设置核心数的大小。这里是参考与Doug Lea写的 《Java并发编程实战》
2.2.1,cpu密集型
cpu的职责主要就是两件事,取指令、执行指令 ,在设计到大量的计算、排序、加密解密等,则可以优先的考虑使用这个cpu密集型任务。因此为了充分的利用cpu,核心线程数可以设置为剩余cpu的个数n,最大线程数可以设置为n+1到2n都可以,最好设置成n+1。这里加1主要是为了防止缺页中断而导致cpu的空闲
//获取系统可用的处理器的个数,1个cpu对应两个处理器
int n = Runtime.getRuntime().availableProcessors();
2.2.1,io密集型
io密集型,指的是存在大量的磁盘io、网络io等,这种特点是不会特别的消耗cpu资源,但是io会耗时很长,如果一个cpu对应一个线程,那么cpu就会长时间的等待,因此为了充分的利用cpu,一般会设置这个核心线程数为n,但是最大线程数为2n,甚至更多
//获取系统可用的处理器的个数,1个cpu对应两个处理器
int n = Runtime.getRuntime().availableProcessors();
但是最合理的线程数设置应该是需要进行压测的,这里的设置只是一个初步的方案
2.3,如何挑选阻塞队列
在前面的几篇文章中,详细的描述了阻塞队列的几种具体实现,分别是: ArrayBlockingQueue、LinkedBlockingqueue、PriorityQueue、DelayQueue,这几种队列的特性和源码在前面几篇都有详细的描述。
如果是为了追求性能和吞吐量,并且在经过压测之后不会出现OOM的情况下,可以优先选择使用带有容量的链表实现的阻塞队列 LinkedBlockingqueue,因为内部用了两把互斥锁,使得生产者和消费者职责更加单一
LinkedBlockingqueue queue = new LinkedBlockingqueue(50);
如果是需要排队等设置优先级的情况,如VIP优先,会员优先等情况,可以考虑使用二叉堆实现的PriorityQueue
//默认长度是11,会扩容
Priorityqueue queue = new PriorityQueue();
如果是需要用到延时队列,如一些订单方案,超时方案等,可以优先选择 DelayQueue
DelayQueue Queue = new DelayQueue();
如果没有要求,或者说生产者生产速度和消费者的消费的速度接近,那么可以选择这个数组的方式,但是数组内部使用的环状数组,不支持扩容的操作。如果可以的话,链表实现的阻塞队列绝大多数是可以代替这个数组的
ArrayBlockingQueue queue = new ArrayBlockingQueue(50);
2.4,如何挑选拒绝策略
在线程池的拒绝策略中,主要有四种拒绝策略
CallerRunsPolicy:由execute方法提交任务来执行这个任务
AbortPolicy:抛出异常、拒绝提交任务
DiscardPolicy:直接抛异常,不做任何处理
DiscardOldestPolicy:去除队列中的第一个任务
线程池中,默认使用的是抛出异常,拒绝提交任务的策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
但是在实际开发中,如果真的有关于订单,用户信息等重要的任务时,是真的不可能直接使用上面这四种任务的,而是使用自定义的拒绝策略,如写一个 RejectedExecutionHandler 的具体实现,并重写里面的抽象方法即可。
/**
* @Author: zhenghuisheng
* @Date: 2023/10/16 5:20
*/
@Component
public class MyRejectException implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//获取线程
Task task = (Task) r;
Order order = task.getOrder();
//入库
order.getId();
//...
}
}
3,源码分析
3.1,ThreadPoolExecutor的基本属性
在了解了线程池的基本使用之后,再来深入的了解一下这个线程池的底层原路
public class ThreadPoolExecutor extends AbstractExecutorService {
}
首先最下面是一个线程的状态,用整型32位的高三位表示
private static final int RUNNING = -1 << COUNT_BITS; //运行
private static final int SHUTDOWN = 0 << COUNT_BITS; //关闭
private static final int STOP = 1 << COUNT_BITS; //停止
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
随后内部定义了这些个参数,除了这个阻塞队列是一个final固定的工作队列之外,其他的关键字都用了volatile关键字修饰,都是为了保证线程间的可见性的
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile long keepAliveTime;
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
既然涉及到阻塞队列的入队出队,那么就需要一个互斥锁以及条件队列来保证这个线程安全,条件队列肯定是满了的时候被阻塞的
private final ReentrantLock mainLock = new ReentrantLock(); //互斥锁
private final Condition termination = mainLock.newCondition(); //条件队列
接着就是这个 Worker 的静态内部类,很明显,该类是继承于AQS实现的,那这个整体设计就是基于同步队列来实现的一个工作类,因此可以先熟读前面的AQS系列的文章
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
在这个类中还有一个重要的工具,就是hashset,会将所有的worker加入到这个hashset里面
private final HashSet<Worker> workers = new HashSet<Worker>();
3.2,Worker类的基本属性
3.2.1,线程工厂创建线程
接着就是查看一下这个Worker类的内部,在这个构造方法,在构建这个Wroker类的结点时,默认会将该结点的状态设置成-1,即运行状态,随后从线程工厂中创建一个线程
Worker(Runnable firstTask) {
setState(-1); // 设置线程不可中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
随后就是查看这个默认的工厂的创建,即直接查看这个 getThreadFactory 方法,由此可知线程创建的底层最终是通过这个 System.getSecurityManager 系统底层来创建的,
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
随后通过调用这个 newThread 方法来获取创建的线程
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
3.2.2,如何通过AQS自定义设置一把锁
继续看这个Worker的这个内部类,发现里面有两个重要的方法获取锁和释放锁的方法,在前面讲解这个AQS的CLH同步队列中,画了一个图,如下所示
根据上面的图就知道这个AQS是如何保证线程安全的,那么就是下面的俩个方法。在抢锁时,通过cas判断这个state是否为0,进入一个抢锁逻辑,是的话设置这个同步状态器的exclusive的值为当前线程;在释放锁时,直接将同步状态器的exclusive值设置为null,将state的值设置为0,这样就简单的实现了一把aqs锁了
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
3.3,线程池执行逻辑
在创建任务以及线程池后,一般有两种方式将线程池加入到线程池中,一种是execute,一种是submit,前者没有返回值,后者有返回值,但是最终都是要调用这个execute方法的,因此这里以execute方法作为入口
pool.execute(task); //将任务加入到线程池中
接着直接进入这个 execute 方法中,
public void execute(Runnable command) {
if (command == null) //判空
throw new NullPointerException();
int c = ctl.get(); //获取线程数的数量
if (workerCountOf(c) < corePoolSize) {
//判断已有线程数是否小于核心线程
if (addWorker(command, true)) //创建核心线程
return;
c = ctl.get();
}
//线程为运行状态,并且线程入队成功
if (isRunning(c) && workQueue.offer(command)) {
//入队逻辑
int recheck = ctl.get();
//双重检测,防止在那一刻变成了不可运行状态
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //拒绝策略
reject(command);
}
他的整体流程,可以直接通过下面这种图片来概括
接下来进入这个 addWorker 方法,首先会对这个worker进行一个校验,判断他的状态是否为运行状态。随后就是一个AQS的入队操作,会将所有创建的wroker加入到hashset里面,最后会调用这个start方法运行这个线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get(); //线程记录数
int rs = runStateOf(c); //获取线程的状态
//如果是上面的1,2,3的三种形态,则直接false
//或者说队列不是空的
if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); //获取最大的核心线程数
if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize)) return false;
if (compareAndIncrementWorkerCount(c)) break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) continue retry;
}
}
//上面一堆判断主要是验证是否能增加核心线程或者非核心线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
//接下来的就是AQS的逻辑
try {
w = new Worker(firstTask); //创建一个Worker工作结点
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock; //获取互斥锁
mainLock.lock(); //加锁
try {
int rs = runStateOf(ctl.get()); //判断结点的状态
//状态小于0或者等于0,头结点为空
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) throw new IllegalThreadStateException();
workers.add(w); //加入hashset里面
int s = workers.size();
if (s > largestPoolSize) largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock(); //解锁
}
if (workerAdded) {
t.start(); //执行start方法
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return
}
在调用它的start方法之后,最后会执行他的run方法
public void run() {
runWorker(this);
}
接着再研究他的 runWorker 的这个方法,在这个执行的run方法中,可以得知队列中的任务是最后执行的,核心线程和非核心线程时先执行的。并且在run运行这个线程任务的时候,可以手动的重写beforeExecute 前置任务和 afterExecute 后置任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//精髓点,先执行非核心线程和核心线程,最后执行队列中的任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //前置任务
Throwable thrown = null;
try {
task.run();
} finally {
afterExecute(task, thrown); //后置任务
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在线程池中,是通过有限的线程个数,加上对应的run方法的数量来加快响应速率的。在多线程中,需要创建一个线程对象调用一个start方法,如果要100个线程,那么就得创建100个线程,调用100个run方法,但是如果是线程池的话,只需要创建最大的线程数量,加上100个run方法即可,因此线程池的速率会比多线程的快。总而言之,线程池是对线程run方法的调用
在这个运行线程的方法最后,有一个 processWorkerExit 方法,表示当前线程已经执行完毕,可以继续执行下一个任务的run方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false); //增加一个线程任务,空线程
}
}
4,为何不推荐jdk自带的线程池
在jdk中有四种自带的线程池,分别是:newSingleThreadExecutor,newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool ,接下来通过这些构建线程池的具体参数来分析为何会不推荐使用
先看这个单例的线程池,首先最大线程数和核心线程数是1,强行由多线程变成单线程,一般场景是不会选择的,但是这并不是重点,而是阻塞队列选择的是无界的阻塞队列,LinkedBlockingqueue无界阻塞队列的长度是整型的最大值,假设一个对象占1m,那么这么多线程加入队列,得2^32 - 1m的容量,那么迟早OOM
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
接下来看这个固定长度的线程池,和单例的是一个问题,使用的LinkedBlockingqueue无界阻塞队列,OOM
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
下一个缓存的线程池,好不容易用的是同步队列不OOM了,但是最大线程时整型最大值,就是丢这么多任务到内存中,那么CPU和内存迟早溢出
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
最后一个延时的线程池,和上面的缓存的一样,最大核心线程数整型最大值,cpu打满和内存溢出
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
5,总结
首先通过execute方法将任务加入到线程池,随后会先判断线程池中已有线程数是否小于核心线程数,小于则继续创建核心线程,线程数大于或者等于核心线程数时,则将线程加入到阻塞队列中,队列满了的情况下则创建非核心线程。线程的数量是有限的,通过有限的线程去执行任务线程的run方法,从而让线程池更加的高效,在执行任务时,会先执行完非核心线程和核心线程的任务,最后再执行阻塞队列的任务。
在实际开发中,最好选择自定义线程池的方式,而不是选择jdk的自带的线程,并且根据具体的业务场景选择相应的阻塞队列,设置相应大小的核心线程数和最大线程数,重写对应的拒绝则略做补偿机制等。