一,深入理解ArrayBlockingQueue的底层实现
在理解阻塞队列BlockingQueue之前,先理解Queue的特性,Queue是作为一种经典的数据结构使用的,与之相对应的的就是栈。队列采用的是先进先出的策略模式,在Java中也有Queue的具体实现,内部也有着一些对应的api,如add,remove入队出队等
public interface Queue<E> extends Collection<E> {
...}
在Queue这个接口内有一个子类,就是本文要讲解的主角 BlockingQueue ,在原来的Queue基础上,新增了两个附加操作put和take ,这两个操作可支持阻塞,而在阻塞队列中有多个阻塞队列的具体实现,因此本文主要是讲解 ArrayBlockingQueue 的具体使用
public interface BlockingQueue<E> extends Queue<E> {
void put(E e) throws InterruptedException;
E take() throws InterruptedException;
}
1,阻塞队列的api基本使用
在阻塞队列中,有以下的api可以使用,如一些入队,出队等操作,但是同一个操作会有多个方法
支持将数据加入到队列,但是不支持阻塞的方法有下面两种
boolean add(E e); //数据入队,队列已满则抛出异常
boolean offer(E e); //数据入队,队列已满则返回false
支持将数据加入到队列,但是支持阻塞的方法有下面两种,表示的是队列未满则插入,队列已满则阻塞
//数据入队,线程阻塞
void put(E e) throws InterruptedException;
//数据入队,线程阻塞一段时间
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
支持数据出队,但是不支持阻塞的方法有以下
boolean remove(Object o); //数据出队,队列为空则抛异常
支持数据出队,同时支持数据阻塞的方法有以下,表示的是队列有数据则删除,队列没有数据则阻塞
//数据出队,允许中断,线程阻塞一段时间
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//数据出队,线程阻塞
E take() throws InterruptedException;
2,ArrayBlockingQueue的基本使用
上面的这些方法是阻塞队列的通用方法,因此下面主要针对这个 ArrayBlockingQueue 来讲解内部的使用。举一个简单的生产者和消费者模型,来描述这个ArrayBlockingQueue的使用
首先定义一个产品类,内部的属性变量比较简单
/**
* @Author: zhenghuisheng
* @Date: 2023/10/8 20:24
*/
@Data
public class Product {
private Integer id;
private String productName;
}
随后定义一个生产者Producer的线程任务类,内部用于生产产品,并将生产的产品加入到阻塞队列中
/**
* 生产者线程
* @Author: zhenghuisheng
* @Date: 2023/10/8 20:21
*/
@Data
public class Producer implements Runnable {
private ArrayBlockingQueue arrayBlockingQueue;
public Producer(ArrayBlockingQueue arrayBlockingQueue){
this.arrayBlockingQueue = arrayBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
Product product = new Product();
product.setId(i);
product.setProductName("商品" + i + "号");
try {
//加入阻塞队列
arrayBlockingQueue.put(product);
System.out.println("生产者" + i + "号生产完毕");
Thread.sleep(50);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
随后创建一个消费者Consumer线程的任务类,主要用于消费者消费线程
/**
* 消费者线程
* @Author: zhenghuisheng
* @Date: 2023/10/8 20:21
*/
@Data
public class Consumer implements Runnable {
private ArrayBlockingQueue arrayBlockingQueue;
public Consumer(ArrayBlockingQueue arrayBlockingQueue){
this.arrayBlockingQueue = arrayBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
//消费者消费
Object take = arrayBlockingQueue.take();
System.out.println(take);
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("消费者消费完毕");
}
}
随后创建一个线程池的根据类,用于定义线程池的各个参数以及初始化线程池
/**
* 线程池工具
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date : 2023/3/22
*/
public class ThreadPoolUtil {
/**
* io密集型:最大核心线程数为2N,可以给cpu更好的轮换,
* 核心线程数不超过2N即可,可以适当留点空间
* cpu密集型:最大核心线程数为N或者N+1,N可以充分利用cpu资源,N加1是为了防止缺页造成cpu空闲,
* 核心线程数不超过N+1即可
* 使用线程池的时机:1,单个任务处理时间比较短 2,需要处理的任务数量很大
*/
public static synchronized ThreadPoolExecutor getThreadPool() {
if (pool == null) {
//获取当前机器的cpu
int cpuNum = Runtime.getRuntime().availableProcessors();
log.info("当前机器的cpu的个数为:" + cpuNum);
int maximumPoolSize = cpuNum * 2 ;
pool = new ThreadPoolExecutor(
maximumPoolSize - 2,
maximumPoolSize,
5L, //5s
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), //数组有界队列
Executors.defaultThreadFactory(), //默认的线程工厂
new ThreadPoolExecutor.AbortPolicy()); //直接抛异常,默认异常
}
return pool;
}
}
最后在创建一个测试类作为主类的入口,随后就可以看到生产者消费者各个的消费信息了
/**
* @Author: zhenghuisheng
* @Date: 2023/10/8 20:27
*/
public class ArrayBlockingQueueDemo {
//创建一个线程池
static ThreadPoolExecutor pool = ThreadPoolUtil.getThreadPool();
//创建一个全局阻塞队列
private static ArrayBlockingQueue queue = new ArrayBlockingQueue(16);
public static void main(String[] args) throws InterruptedException {
//创建生产者线程
Producer producer = new Producer(queue);
//创建消费者线程
Consumer consumer = new Consumer(queue);
//线程加入线程池
pool.execute(producer);
pool.execute(consumer);
Thread.sleep(10000);
System.exit(0);
}
}
其打印结果如下,乱序是没多大问题的
Product(id=0, productName=商品0号)
生产者0号生产完毕
生产者1号生产完毕
Product(id=1, productName=商品1号)
Product(id=2, productName=商品2号)
生产者2号生产完毕
生产者3号生产完毕
Product(id=3, productName=商品3号)
Product(id=4, productName=商品4号)
生产者4号生产完毕
3,ArrayBlockingQueue的底层源码
3.1,ArrayBlockingQueue的基本属性
在讲解这个类的源码之前,先看一下这个类的继承以及这个类中重要的一些属性,该类是继承了一个抽象的队列,并且是BlockingQueue的一个具体的实现
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>{
}
在这个类中,有一个ReentrantLock锁,说明这个ArrayBlockingQueue的底层是通过这个互斥锁实现的,并且还引入了两个条件等待队列,很明显一个是队列满了put的时候阻塞,一个是队列空了take的阻塞。根据前面几篇的AQS的文章的讲解,很容易想到这两个条件队列的作用;用的ReentrantLock锁,也大概得可以知道take和put操作时互斥的,因此在性能上肯定是会出现问题的
//互斥锁
final ReentrantLock lock;
//为空时take的条件队列
private final Condition notEmpty;
//满时put的条件队列
private final Condition notFull;
除了上面这些属性之外,还有一些关于数组的定义,删除到哪个位置,添加到那个数组的索引位置,这些都是有记录的。由于数组是一块连续的空间,并且底层是通过队列的方式实现,因此根据先进先出原则,进来的数据都是在队尾,删除的都是队头的数据,整个操作都是对这个数组进行操作的
final Object[] items; //数组
int takeIndex; //前驱指针,用于记录删除到了哪个结点
int putIndex; //后继指针,用于记录添加到了哪个结点
int count; //数组数量
大概就是这么回事,takeindex记录的是出队到哪个位置的下标,putindex记录的入队到哪个位置的下标
最后再看看这个类的构造器方法,在初始化一个ArrayBlockingQueue时,就会将数组以及容量,还有两个条件队列全部构建完成,并且这把ReentrantLock互斥锁使用的是公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; //初始化一个数组
lock = new ReentrantLock(fair); //初始化一把互斥锁
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
3.2,put入队操作
在初步的了解完上面的这些属性之后,那么接下来主要讲解一下阻塞队列入队的操作,那么直接看这个put方法
//入队操作
public void put(E e) throws InterruptedException {
checkNotNull(e); //判断是否为空的操作
final ReentrantLock lock = this.lock; //获取全局的互斥锁
lock.lockInterruptibly(); //可中断锁
try {
while (count == items.length) //当队列满的时候
notFull.await(); //线程阻塞
enqueue(e); //如数组没满,则入队
} finally {
lock.unlock(); //解锁
}
}
入队的逻辑也比较简单,将值加入到数组中,并将putindex的值+1,数组中的结点数+1,最后调用signal唤醒数组为空时被阻塞的线程,将Empty条件等待队列的结点加入到同步等待队列中,最后被put中的finally中的unlock方法给唤醒
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x; //添加数据
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
在这里有一个重点方法,就是上面的这一句,当长度达到外部设置的长度之后,不会进行一个扩容操作,而是直接通过追加写的方式,从头开始。这就不禁有一个疑问了,为何这样设计,在印象中通过这种方式设计的结构,那么久只有环状的数组了
if (++putIndex == items.length)
putIndex = 0;
我直接好家伙,简直太精髓了,因为数组在删除数据的时候,那么就会有大量的数据需要移动,那么无论在空间还是时间都会产生一定的成本,因此为了优化这一步骤,直接采用环状的形式。
比如take一个数据,由于是要保证先进先出的原则,那么就需要删除头结点的数据,那么后面的全部数据都要发生一次移动,那不会影响出队的效率问题吗,因此这就不得不说这个takeindex的作用了
如果在数据出队后,采用环状的数据结构,那么就不需要因为删除数组的前面数据而移动数组后面的数据,只需要修改这个takeindex的指向就可以了,这样就成功的优化了出队的效率,从而减少大量数据的移动,不得不说真的是太强了
详细的可以参考本人写的JUC系列8,循环屏障的的底层实现原理,讲解的更加的详细,从条件队列转换到同步队列的逻辑都讲解的比较清楚
3.3,出队操作
接下来查看数据出队的操作,如下面的take方法,也会先获取这把全局的互斥锁,随后会判断队列是否为空,然后再入队的操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //获取全局的互斥锁
lock.lockInterruptibly(); //允许锁可中断
try {
while (count == 0) //如果队列中数据为空
notEmpty.await(); //阻塞
return dequeue(); //队列中数据不为空,出队
} finally {
lock.unlock(); //解锁
}
}
如果队列的数据不为空,则进入出队的逻辑,即这个dequeue方法。这里面的逻辑也比较简单,上面说了使用环状+修改下标的位置来解决这个大量数据移动的问题,就是通过这段代码来实现的。在将数据取出完成之后,说明此时队列处于没满的状态,那么就会唤醒因为满了而加入条件队列的数据,从而从条件队列转移到同步队列,最后通过take方法的finally方法中的unlock将结点唤醒
private E dequeue() {
final Object[] items = this.items; //获取这个数组
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null; //将头结点数据置为空
if (++takeIndex == items.length) //takeindex加1,修改指针指向位置
takeIndex = 0; //环状数组
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //唤醒因数组满而加入到条件队列的数据
return x;
}
4,总结
ArrayBlockingQueue内部主要是通过数组的方式实现的阻塞队列,内部采用的是环状数组,并且记录了入队和出队的两个下标,从而优化在数据出队时产生数据大量移动的问题。队列保证先进先出原则
ArrayBlockingQueue内部用了ReentrantLock互斥锁和两个条件队列组成,在put加入数据时,如果队列满了则会将这个线程加入条件队列,在take取出数据时,如果数组数据为空也会将这个线程加入条件队列。put和take成功都会唤醒另一个条件队列的线程
ArrayBlockingQueue更加的适用于生产者和消费者的模型,并且生产速度和消费速度比较接近的情况下使用。并且生产者和消费者共用一把互斥锁,没有单一的职责,即不能并行的工作,那么在高并发的场景下,是会会存在的一定的瓶颈的