一、BlockingQueue
1.1 什么是阻塞队列?
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空。
- 当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
- 异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
- 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
- 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
1.2 API介绍
阻塞队列接口:
public interface BlockingQueue<E> extends Queue<E> {
//插入元素e到队列中,成功返回true, 否则抛出异常。如果向限定了容量的队列中插入值,推荐使用offer()方法。
boolean add(E e);
//插入元素e到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
boolean offer(E e);
//插入元素e到队列中,,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
void put(E e) throws InterruptedException;
//在给定的时间插入元素e到队列中,如果设置成功返回true, 否则返回false.
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//检索并从队列的头部删除元素,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
E take() throws InterruptedException;
//在给定的时间范围内,检索并从队列的头部删除元素,从队列中获取值,如果没有取到会返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//获取队列中剩余的空间。
int remainingCapacity();
//从队列中移除指定的值。
boolean remove(Object o);
//判断队列中是否包含该值。
public boolean contains(Object o);
//将队列中值,全部移除,并追加到给定的集合中。
int drainTo(Collection<? super E> c);
//指定最多数量限制将队列中值,全部移除,并追加到给定的集合中。
int drainTo(Collection<? super E> c, int maxElements);
}
1.3 子接口
BlockingDeque
TransferQueue
TransferQueue继承了BlockingQueue,并扩展了一些新方法。
BlockingQueue是指这样的一个队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞。
TransferQueue则更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。新添加的transfer方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程transfer到另一个线程的过程中,它有效地实现了元素在线程之间的传递(以建立Java内存模型中的happens-before关系的方式)。
TransferQueue还包括了其他的一些方法:两个tryTransfer方法,一个是非阻塞的,另一个带有timeout参数设置超时时间的。还有两个辅助方法hasWaitingConsumer()和getWaitingConsumerCount()。
1.4 实现类
JUC一共提供了7种实现类,我们会依次讲解:
ArrayBlockingQueue
PriorityBlockingQueue
DelayQueue
LinkedBlockingQueue
LinkedBlockingDeque
SynchronousQueue
LinkedTransferQueue
二、ArrayBlockingQueue
2.1 API介绍
ArrayBlockingQueue
是一个线程安全的、基于数组、有界的、阻塞的、FIFO 队列
。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
此类基于 java.util.concurrent.locks.ReentrantLock
来实现线程安全,所以提供了 ReentrantLock
所能支持的公平性选择。
2.2 源码简析
底层是通过ReentrantLock,和Condition实现的阻塞、唤醒,原理比较简单:
以put方法添加元素为例:
队列空的话取元素也会阻塞:
dequeue是出队方法:
2.3 案例演示
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(3,true);
Producer producer = new Producer(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
private static int element = 0;
public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(element < 20) {
System.out.println("生产元素:"+element);
blockingQueue.put(element++);
}
} catch (Exception e) {
System.out.println("生产者在等待空闲空间的时候发生异常!");
e.printStackTrace();
}
System.out.println("生产者终止了生产过程!");
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(true) {
System.out.println("消费元素:"+blockingQueue.take());
}
} catch (Exception e) {
System.out.println("消费者在等待新产品的时候发生异常!");
e.printStackTrace();
}
System.out.println("消费者终止了消费过程!");
}
}
三、PriorityBlockingQueue
3.1 API介绍
PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现。
- 无界阻塞队列,并不是无界而是说在容量快满的时候会自动扩容。
- 不允许空元素,并且添加的元素需要实现Comparable接口,实现自然排序。
- 不能保证优先级一样的元素的顺序,需要用额外属性进行排序。
3.2 源码简析
底层数据结构是二叉堆
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
...
//可以看到底层是用平衡二叉堆存储元素的。
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;
...
}
关于二叉堆这里就不细说了。
扩容逻辑
//扩容的时候会调用
/**
* Tries to grow array to accommodate at least one more element
* (but normally expand by about 50%), giving up (allowing retry)
* on contention (which we expect to be rare). Call only while
* holding lock.
*
* @param array the heap array
* @param oldCap the length of the array
*/
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));// 这里是扩容规则,一开始扩容速度会很慢
if (newCap - MAX_ARRAY_SIZE > 0) {
// possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();//超过最大容量会导致内存溢出
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
获取元素
//出队的操作,用二叉堆的下浮操作
/**
* Mechanics for poll(). Call only while holding lock.
*/
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];//二叉队出队逻辑是弹出顶端元素
E x = (E) array[n];//将最后一个元素放到顶端,执行下浮操作
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)//使用比较器排序执行下浮操作
siftDownComparable(0, x, array, n);
else//使用自然排序执行下浮操作
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
下浮操作是二叉堆移除元素的算法,本节不细究。
//可以看到take就是调用dequeue出队方法获取元素的
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)//通过dequeue方法获取元素,如果队列为空就会进行一个阻塞操作
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
添加元素
//追加元素,用上浮操作
/**
* Inserts the specified element into this priority queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Queue#offer})
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();//先获取锁进行锁定
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))//判断队列容量是否需要扩容
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)//比较器为空就自然排序进行插入
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;//插入成功,容量大小+1
notEmpty.signal();//发一个信号,如果有线程在等待队列里的元素,做一个唤醒。
} finally {
lock.unlock();
}
return true;
}
上浮操作是二叉堆的添加元素算法,本节不细究。
//可以看到put、offer(timeout)都调用的的offer,因为这个队列是无界的
//所以不会产生因为队列满而阻塞的情况
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
3.3 案例演示
public class PriorityBlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<PriorityElement> queue = new PriorityBlockingQueue<>();
for (int i = 0; i < 5; i++) {
Random random=new Random();
PriorityElement ele = new PriorityElement(random.nextInt(10));
queue.put(ele);
}
while(!queue.isEmpty()){
System.out.println(queue.take());
}
}
}
class PriorityElement implements Comparable<PriorityElement> {
private int priority;//定义优先级
PriorityElement(int priority) {
//初始化优先级
this.priority = priority;
}
@Override
public int compareTo(PriorityElement o) {
//按照优先级大小进行排序
return priority >= o.getPriority() ? 1 : -1;
}
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
@Override
public String toString() {
return "PriorityElement [priority=" + priority + "]";
}
}
四、DelayQueue
4.1 API介绍
DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。
应用场景:
- 比如买东西下订单,一天之内没有付款就会关闭,或者秒杀业务,有时间限制等,就可以使用这个延迟队列。
- 比如连接服务器,服务器会监测到很多连接是空闲的,这个时候超过一定时间也可以把空闲连接进行关闭。
三个特点:无界、阻塞、过期、只能取到队首的过期元素
Delayed 接口
放入DelayQueue的元素必须实现Delayed
接口
Delayed继承自Comparable,所以也需要实现自然排序。
4.2 源码简析
底层数据结构是PriorityQueue
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
//看到底层用带优先级的队列来存储元素
private final PriorityQueue<E> q = new PriorityQueue<E>();
...
}
等待优化
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
...
/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*
* 指定等待队列头元素的线程。这个Leader-Follower模式的变体(http://www.cs.wustl.edu/~schmidt/POSA/POSA2/)
* 可以将不必要的时间等待最小化。当一个线程成为leader时,它只等待下一次延迟,而其他线程则无限期地等待。
* 在从take()或 poll(…)返回之前,领头线程必须向其他线程发出信号,除非其他线程在过渡期间成为领头线程。
* 当队列的头被具有较早过期时间的元素替换时,leader字段将被重置为null,并向一些等待的线程发出信号,但
* 不一定是当前的leader。因此,等待线程必须准备好在等待时获得或失去领导权。
*/
private Thread leader = null;
/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
* 当一个新的元素在队列的最前面可用或者一个新的线程需要成为leader时,发出信号的条件。
*/
private final Condition available = lock.newCondition();
...
}
DelayQueue做了一个优化,如果一个线程获取队首元素,但是该元素还没有过期,那么会将当前线程记录到leader变量上,让其阻塞到该元素刚好过期的时间,期间其他线程获取元素的时候会直接无限期等待。这样可以将不必要的时间等待最小化。
如果leader等待过程中添加了更早的的元素,会做相应的唤醒等一系列处理。
获取元素
//获取元素的方法
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞可中断
try {
for (;;) {
//循环
E first = q.peek(); //优先级队列中取出头元素
if (first == null)
available.await();//空队列则阻塞线程
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)//判断是否过期
return q.poll();//过期直接出队返回
first = null; // don't retain ref while waiting
if (leader != null)
available.await();//leader不为null,说明已经有其他线程比自己先等待了,当前线程就无限期阻塞
else {
//空的话说明没有其他线程在等待
Thread thisThread = Thread.currentThread();
//当前线程变为leader
leader = thisThread;
try {
available.awaitNanos(delay);//使当前线程等待,直到发出信号或中断,或者指定的等待时间过去。
//等待时间刚好是元素过期的时间
} finally {
if (leader == thisThread)
leader = null;//释放leader
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
添加元素
//追加元素
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
//成立说明刚添加的元素是队列里面延迟时间最短的元素
//刷新了最短记录,这个时候需要把leader情况,然后唤醒所有等待的线程
//重新执行获取元素的逻辑
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
4.3 案例演示
public class DelayQueueTest {
public static void main(String[] args) throws InterruptedException {
Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
Item item2 = new Item("item2",10, TimeUnit.SECONDS);
Item item3 = new Item("item3",15, TimeUnit.SECONDS);
DelayQueue<Item> queue = new DelayQueue<>();
queue.put(item1);
queue.put(item2);
queue.put(item3);
System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
for (int i = 0; i < 3; i++) {
Item take = queue.take();//阻塞获取
System.out.format("name:{%s}, time:{%s}\n",take.name, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
}
}
}
class Item implements Delayed {
/* 触发时间*/
private long time;
String name;
public Item(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0? unit.toMillis(time): 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Item item = (Item) o;
long diff = this.time - item.time;
if (diff <= 0) {
// 改成>=会造成问题
return -1;
}else {
return 1;
}
}
@Override
public String toString() {
return "Item{" +
"time=" + time +
", name='" + name + '\'' +
'}';
}
}