一,深入理解DelayQueue延迟队列
延时队列,顾名思义,就是可以实现在一段时间之后在执行这个任务。在分布式场景下可能会更加的选择使用MQ来完成这些操作,但是在单JVM进程中,或者在mq挂了的兜底方案中,会考虑使用这个DelayQueue来完成这个延时任务的。如一些订单超时未支付,任务超时管理,短信异步通知等情况,就可以使用这个延时队列来完成了。
在了解这个DelayQueue延迟队列之前,需要先熟悉上一篇PriorityQueue的基本使用和底层原理,因为这个延迟队列的底层的数据结构,就是通过这个优先级队列来实现的
class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>{
//组合了一个优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
}
由于这个优先级队列采用的是二叉堆的数据结构,并且采用的是小顶堆的数据结构,因此很容易猜出这个DelayQueue的底层原理了,就是假设5个延时任务,会将最近到期的这个任务排在阻塞队列的前面,因此在出队的时候,就可以保证先过期的先出队。
由于底层是通过这个PriorityQueue的优先级队列实现的,因此这个DelayQueue也是一个无界的阻塞队列,在使用这个延迟队列时,需要实现一个Delayed 的接口。总而言之就是:不保证先进先出,下一个即将过期的任务会排到队列的最前面
1,DelayQueue的基本使用
由于在实际开发中,会有这种订单超时的场景,因此这里主要是模拟一个订单的超时任务,来体验一下这个DelayQueue的基本使用
首先创建一个实现了Delayed接口的OrderDelay订单延时类,Delayed也是Comparable类的一个具体实现
/**
* Delayed的具体的方法实现
* @Author: zhenghuisheng
* @Date: 2023/10/14 0:32
*/
@Data
public class OrderDelay implements Delayed {
//需要延迟的时间
private long delayTime;
//订单id
private Integer orderId;
//商品名称
private String productName;
//构造方法
public OrderDelay(long delayTime,Integer orderId,String productName){
//需要延迟的 时间 + 当前系统的时间
this.delayTime = delayTime + System.currentTimeMillis();
this.orderId = orderId;
this.productName = productName;
}
//获取剩余的延时时间
@Override
public long getDelay(TimeUnit unit) {
//到达时间 - 剩余时间
long residueTime = this.delayTime - System.currentTimeMillis();
return unit.convert(residueTime,TimeUnit.MILLISECONDS);
}
//实现这个比较器方法
@Override
public int compareTo(Delayed o) {
OrderDelay orderDelay = (OrderDelay)o;
return orderDelay.delayTime > this.delayTime ? - 1 : 1;
}
}
随后创建一个生产者Producer线程任务类,用于将为支付的订单加入到这个延时队列中
@Data
public class Producer implements Runnable {
//全局的阻塞队列
private DelayQueue queue;
//延迟队列订单类对象
private OrderDelay orderDelay;
public Producer(DelayQueue queue,OrderDelay orderDelay){
this.queue = queue;
this.orderDelay = orderDelay;
}
//添加文件
@Override
public void run() {
try {
queue.put(orderDelay); //加入阻塞队列
System.out.println(orderDelay.getProductName() + "加入完毕...");
} catch (Exception e) {
e.printStackTrace();
}
}
}
随后创建一个消费者Consumer线程任务类,用于取出即将过期的订单任务
/**
* 消费者线程
* @Author: zhenghuisheng
* @Date: 2023/10/8 20:21
*/
@Data
public class Consumer implements Runnable {
private DelayQueue queue;
public Consumer(DelayQueue delayQueue){
this.queue = delayQueue;
}
@Override
public void run() {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
然后再创建一个线程池的工具类,用于更好的监控和管理线程
/**
* 线程池工具
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date : 2023/3/22
*/
public class ThreadPoolUtil {
/**
* io密集型:最大核心线程数为2N,可以给cpu更好的轮换,
* 核心线程数不超过2N即可,可以适当留点空间
* cpu密集型:最大核心线程数为N或者N+1,N可以充分利用cpu资源,N加1是为了防止缺页造成cpu空闲,
* 核心线程数不超过N+1即可
* 使用线程池的时机:1,单个任务处理时间比较短 2,需要处理的任务数量很大
*/
public static synchronized ThreadPoolExecutor getThreadPool() {
if (pool == null) {
//获取当前机器的cpu
int cpuNum = Runtime.getRuntime().availableProcessors();
log.info("当前机器的cpu的个数为:" + cpuNum);
int maximumPoolSize = cpuNum * 2 ;
pool = new ThreadPoolExecutor(
maximumPoolSize - 2,
maximumPoolSize,
5L, //5s
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), //数组有界队列
Executors.defaultThreadFactory(), //默认的线程工厂
new ThreadPoolExecutor.AbortPolicy()); //直接抛异常,默认异常
}
return pool;
}
}
最后创建一个有Main方法的测试类,用于对这个DelayQueue进行测试
/**
* @Author: zhenghuisheng
* @Date: 2023/10/14 1:41
*/
public class DelayQueueDemo {
//创建一个线程池
static ThreadPoolExecutor pool = ThreadPoolUtil.getThreadPool();
//创建一个全局的延迟队列
static DelayQueue<OrderDelay> delayQueue = new DelayQueue();
public static void main(String[] args) throws Exception {
//生产者创建任务
for (int i = 7; i > 2; i--) {
OrderDelay orderDelay = new OrderDelay(i * 1000, i, "id_" + i);
//创建生产者线程
Producer producerTask = new Producer(delayQueue, orderDelay);
//提交到线程池
pool.execute(producerTask);
}
Thread.sleep(50);
System.out.println("====生产者线程创建完毕====");
//创建消费者线程
for (int i = 0; i < 5; i++) {
Consumer consumerTask = new Consumer(delayQueue);
pool.execute(consumerTask);
}
}
}
最后看执行结果,先进来但是延迟时间长,所以后出去
id_7加入完毕…
id_5加入完毕…
id_6加入完毕…
id_4加入完毕…
id_3加入完毕…
生产者线程创建完毕
OrderDelay(delayTime=1697221724133, orderId=3, productName=id_3)
OrderDelay(delayTime=1697221725133, orderId=4, productName=id_4)
OrderDelay(delayTime=1697221726132, orderId=5, productName=id_5)
OrderDelay(delayTime=1697221727132, orderId=6, productName=id_6)
OrderDelay(delayTime=1697221728129, orderId=7, productName=id_7)
2,DelayQueue的底层源码分析
2.1,DelayQueue类属性
首先查看这个 DelayQueue 类,也是继承了这个抽象类,也是实现了这个BlockingQueue
class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
在这个类中首先最重要的就是这个PriorityQueue优先级队列,说明这个延迟队列的底层是通过这个优先级队列实现的
//组合了一个优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
随后就是一把互斥锁加一个条件队列组成,互斥锁就是offer方法和take方法的互斥,然后这个条件队列是在队列为空时存储这个线程结点的
private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
还有一个重要的属性,就是一个leader的线程标记,用对记录队头的线程,谁最早过期就记录谁
private Thread leader = null;
最后来看看该类的构造方法,里面是空的,因为里面的offer和take主要是操作这个PriorityQueue类
public DelayQueue() {
}
2.2,入队offer方法
接下来直接看这个类的offer方法的具体实现,这下面的逻辑是比较简单的,就是先入队,如果是第一个元素入队,那么回去唤醒条件队列中被阻塞的结点,因为这些结点是队列为空而将线程阻塞的,现在队列已经不为空了
public boolean offer(E e) {
final ReentrantLock lock = this.lock; //获取这把互斥锁
lock.lock(); //加锁
try {
q.offer(e); //线程入队
if (q.peek() == e) {
//如果堆顶为当前元素,表示第一个元素入队
leader = null;
available.signal(); //那么就会去唤醒因对列为空而被阻塞的线程结点
}
return true;
} finally {
lock.unlock(); //解锁
}
}
随后依旧是进入上面的offer方法,做一个具体的入队操作,这里需要结合PriorityQueue的属性来看,首先会判断这个数组是否达到设置的最大值或者扩容后的最大值,如果是,则继续扩容
public boolean offer(E e) {
if (e == null) throw new NullPointerException(); //结点为空
modCount++;
int i = size;
if (i >= queue.length) //达到最大值
grow(i + 1); //扩容
size = i + 1;
if (i == 0) queue[0] = e; //队列为空则直接加入堆顶
else siftUp(i, e); //否则上浮,堆算法
return true;
}
数组扩容的方法如下,先做一个扩容操作,并且最后创建一个新数组,将旧值copy到新数组中,随后将新数组返回假设此时的容量小于64,则扩大原来的容量+2,如果大于64,则扩大原来的容量一倍。
就是说假设此时容量为16,那么第一次扩容就是 16+16+2为34,第二次扩容为34 + 34 + 2为70,第三次扩容为70 + 70*2 = 210。
private void grow(int minCapacity) {
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
// overflow-conscious code
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
queue = Arrays.copyOf(queue, newCapacity);
}
如果此时不是第一个结点入队,那么就会调用这个 siftUp 方法,如果有自定义实现的比较器,则用自定义的,否则则直接使用内部默认的比较器
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
接着直接来看内部默认实现的这个上浮的方法吧,就是一个小顶堆的入队操作
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x; //创建一个比较构造器
while (k > 0) {
//队列的元素值
int parent = (k - 1) >>> 1; //获取当前结点的父节点的索引,左移一位即可
Object e = array[parent]; //根据索引下标取值
if (key.compareTo((T) e) >= 0) //比较和交换,如果当前值大于父节点则不动
break;
array[k] = e; //如果当前结点的值小于父结点,则将当前结点改成父结点的值(默认使用的是小顶堆)
k = parent; //k在这个while循环下一定会等于0,因此会走最下面的赋值,就是不断地通过while循环将最小的交换到最上面
}
array[k] = key; //如果队列的长度为0,则直接将堆顶元素赋值
}
在成功入队之后,最后会调用这个unlock方法,用于解锁,并且唤醒被阻塞的结点
lock.unlock();
2.3,出队take方法
在结点入队之后,那么接下来就看这个结点出队的方法,出队方法相对来说是稍微多一点的。首先出队第一个头结点,如果已经过期则直接出队,否者获取这个即将过期的时间延迟阻塞,即阻塞到到一定的时间主动唤醒,最后执行这个任务,会在这个for自旋中,可以保证所有的结点出队。并且通过一个临时变量 leader,只需获取最早过期的结点进行阻塞,从而不需关心比该结点更晚过期的结点,从而减少阻塞的数量。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//自旋
E first = q.peek(); //头结点出队
if (first == null) //如果队列为空
available.await(); //则加入条件队列阻塞
else {
//获取头结点的过期时间
long delay = first.getDelay(NANOSECONDS);
//头结点的过期时间小于0,说明已经过期。则直接出队
if (delay <= 0) return q.poll();
first = null; // don't retain ref while waiting
//特别说明,这个leader就是用于记录最早过期的那个线程
if (leader != null) //如果已经存在记录的最近过期的结点
available.await(); //则阻塞
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//延时阻塞,阻塞到一定时间主动唤醒
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); //优化,主动唤醒
lock.unlock(); //解锁
}
}
最后会通过unlock进行一个解锁操作。
lock.unlock();
3,总结
延迟队列的底层是通过这个优先级队列来实现的,越早过期的结点越先出队,内部也是采用ReentrantLock+条件队列来实现安全问题以及性能问题。延迟队列的结构也是无界队列形成的数组,在入队的结点元素需要时Delayed类的具体实现。