一,读写锁

前面几篇讲解了AQS的一些具体的实现,在对共享资源进行操作时,要么就是只能使用独占锁,对每一个线程都进行一个加锁的操作,要么就是单独使用共享锁,同时存在对多个资源的操作,但是在遇到类似缓存这种读多写少,读写同时存在的时候,那么就需要考虑独占锁和共享锁同时存在的需求了。

在AQS的具体实现类中,就存在同时对共享资源读和写的操作的实现类,就是并发包中提供的ReentrantReadWriteLock 类,在该类内部,就实现了读锁和一个写锁。总而言之就是在多个线程只需要读取数据的时候,可以使用共享结点实现,如果存在一个线程需要去写数据的时候,那么就需要通过独占结点实现,用一句话概况就是:读读共享、写写互斥

public class ReentrantReadWriteLock implements ReadWriteLock{
   
     
    private static final long serialVersionUID = -6992448646407690164L;
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
}

1,读写锁的基本使用

在深入地了解这个类的底层原理之前,先了解一下这个类的基本使用,在这个 ReentrantReadWriteLock 类的注释中,已经有一个实例来方便我们了解这个类的基本使用,不得不说AQS里面所有的类的注释是非常的详细的

 

接下来参考他给的注释,仿写一个用例,用hashMap作为一级缓存,实现数据的缓存和读取。首先先定义一个线程池的工具类

/**
 * 线程池工具
 * @author DDKK.COM 弟弟快看,程序员编程资料站
 * @date : 2023/10/06
 */
public class ThreadPoolUtil {
   
     
    /**
     * io密集型:最大核心线程数为2N,可以给cpu更好的轮换,
     *           核心线程数不超过2N即可,可以适当留点空间
     * cpu密集型:最大核心线程数为N或者N+1,N可以充分利用cpu资源,N加1是为了防止缺页造成cpu空闲,
     *           核心线程数不超过N+1即可
     * 使用线程池的时机:1,单个任务处理时间比较短 2,需要处理的任务数量很大
     * 参考:https://blog.csdn.net/yuyan_jia/article/details/120298564
     */
    public static synchronized ThreadPoolExecutor getThreadPool() {
   
     
        if (pool == null) {
   
     
            //获取当前机器的cpu
            int cpuNum = Runtime.getRuntime().availableProcessors();
            log.info("当前机器的cpu的个数为:" + cpuNum);
            int maximumPoolSize = cpuNum * 2 ;
            pool = new ThreadPoolExecutor(
                    maximumPoolSize - 2,
                    maximumPoolSize,
                    5L,   //5s
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(),  //数组有界队列
                    Executors.defaultThreadFactory(), //默认的线程工厂
                    new ThreadPoolExecutor.AbortPolicy());  //直接抛异常,默认异常
        }
        return pool;
    }
}

接下来创建一个 PutHashMapTask 线程的任务类,用于添加数据,实现一个callAble接口,可以将添加后的hashMap给返回

/**
 * @Author: zhenghuisheng
 * @Date: 2023/10/6 22:41
 */
@Data
public class PutHashMapTask implements Callable {
   
     

    Lock writeLock;
    Map<String,Object> hashMap;

    public PutHashMapTask(Lock writeLock,Map<String,Object> hashMap){
   
     
        this.writeLock = writeLock;
        this.hashMap = hashMap;
    }

    @Override
    public Object call() throws Exception {
   
     
        //上锁
        writeLock.lock();
        for (int i = 0; i < 1000; i++) {
   
     
            hashMap.put(i+"",i);
        }
        //解锁
        writeLock.unlock();
        return hashMap;
    }
}

随后创建一个 GetHashMapTask 的任务类,用于获取数据,不需要返回,直接打印出即可

/**
 * @Author: zhenghuisheng
 * @Date: 2023/10/6 22:41
 */
@Data
public class GetHashMapTask implements Runnable {
   
     
    Lock readLock;
    Map<String,Object> hashMap;

    public GetHashMapTask(Lock readLock, Map<String,Object> hashMap){
   
     
        this.readLock = readLock;
        this.hashMap = hashMap;
    }

    @Override
    public void run() {
   
     
        //上锁
        readLock.lock();
        System.out.println(hashMap.get(new Random(1000)));
        //解锁
        readLock.unlock();
    }
}

随后创建一个主线程类,内部定义一把 ReentrantReadWriteLock 读写锁,然后创建线程池一级创建一个hashMap作为一级缓存,随后读写都操作这个hash桶

/**
 * @Author: zhenghuisheng
 * @Date: 2023/10/6 22:36
 */
public class ReentrantReadWriteLockDemo {
   
     
    //定义一把读写锁
    private static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    //初始化一个线程池
    static ThreadPoolExecutor threadPool = ThreadPoolUtil.getThreadPool();
    //创建一个hashMap作为一级缓存
    static Map<String,Object> hashMap = new HashMap<>();

    public static void main(String[] args) throws Exception {
   
     
        //写入数据任务线程
        PutHashMapTask putHashMapTask = new PutHashMapTask(readLock,hashMap);
        //向线程池中添加任务
        Future<?> submit = threadPool.submit(putHashMapTask);
        hashMap =  (Map<String,Object>)submit.get();
        System.out.println(hashMap.size());
        //读取数据任务线程
        GetHashMapTask getHashMapTask = new GetHashMapTask(writeLock,hashMap);
        threadPool.execute(getHashMapTask);
        //主线程休眠
        Thread.sleep(100);
        System.exit(0);
    }
}

上面只是讲解了这个基本使用,还有一些读读共享,写写互斥以及写锁降级成读锁在这个类中都有详细的注释。

在写锁降级成读锁时,会在写锁释放锁之前,给读锁进行一个加锁的操作,主要是为了解决在高并发的场景下,在修改状态时为了保证所有线程的可见性问题,因为在工作线程将变量刷新到主内存时,会有一定的延迟,这样做得好处就是提前将值刷回到缓存,从而解决可见性的延迟问题。总而言之,写锁到读锁降级是为了保证可见性的问题,通过提前刷盘,保证缓存一致性,防止来不及刷盘而读取到脏数据

{
   
     	
	rwl.readLock().lock();
	if (!cacheValid) {
   
     
     	rwl.readLock().unlock();
     	rwl.writeLock().lock();
     	try {
   
     
            //读锁在写锁释放锁之前先加锁
       		rwl.readLock().lock();
        }
     } finally {
   
     
       rwl.writeLock().unlock();
     }
}    

在这个读写锁中,目前还是只支持锁的降级,并没有支持锁的升级的,主要还是因为这个线程可见性的问题,就是假设在同一时刻多个线程来读取数据,其中一个线程修改了数据,那么此时读取线程的数据就变成了脏数据,这就是为啥有延迟双删这个实现的原因,尤其是在缓存中,网络问题等等不可避免的因素,以及最主要的可见性的问题,因此在读写锁中,并没有这个读锁到写锁升级的过程

2,读写锁的底层实现原理

在看底层源码之前,先看这个ReentrantReadWriteLock的类图,如下图所示,其顶层接口是一个readWriteLock,内部定义读锁和写锁的的抽象方法,内部同时引入了Sync,该接口是一个AQS的子类,具有AQS的所有特性,一次内部具有公平锁和非公平锁,可重入等特性

 

而不管是读锁还是写锁,内部都实现了这个Lock的接口,Lock内部定义了所有的加解锁的规范,整个AQS中,Lock是作为一个全局的规范类来使用的

public static class WriteLock implements Lock{
   
     }
public static class ReadLock implements Lock{
   
     }

2.1,读写状态state的设计

在之前的AQS系列中,都是通过cas + 同步等待队列来实现这个加锁的方式,不管是共享锁还是独占锁,同步状态器state的值都是只用于表示独占状态或者共享状态,就是一个单一的职责,那么读写锁是如何通过这个state来表示两种状态的呢,直接看源码吧

在这个ReentrantReadWriteLock的内部类中,有一个Sync的抽象的静态内部类,在中间有一段注释,其翻译结果如下

读取与写入计数提取常量和函数。锁状态在逻辑上分为两个无符号短路:较低的一个表示独占(写入器)锁保持计数,而较高的是共享(读取器)锁保持数

 

也就是说,读写锁也是通过这个state这个关键字来区分的,state是int的数据类型,占4个字节,就是32位,那么高16位表示的就是共享节点的数据,低16位表示的就是独占节点的数据

//高16位,共享节点	  低16位,独占节点	
00000000 00000000 00000000 00000000

因此就定义了变量 SHARED_SHIFT ,值为16,SHARED_UNIT 表示的是共享节点的单位,1右移16位;MAX_COUNT 表示的是最大线程的数据,不管是共享节点还是独占节点的最大值;EXCLUSIVE_MASK 表示的是独占节点的数据

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

而且在获取数据时,内部已经定义好了返回值,如获取共享结点的线程数,独占结点的重入数等

//获取多少个线程占有这把共享锁,通过右移的方式获取
static int sharedCount(int c)    {
   
      return c >>> SHARED_SHIFT; }
//获取独占锁的总线程数,通过位运算
static int exclusiveCount(int c) {
   
      return c & EXCLUSIVE_MASK; }

在共享锁中,通过 HoldCounter 方法和 ThreadLocalHoldCounter 来作为计数器的,内部主要是通过ThreadLocal 变量来记录可重入锁的次数。

static final class HoldCounter {
   
     
	int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
   
     
    public HoldCounter initialValue() {
   
     
        return new HoldCounter();
	}
}

2.2,写锁的底层实现

研究完这个state的底层设计之后,再来研究这个 writeLock 写锁底层的实现原理

private static Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
writeLock.unLock();

2.2.1,lock加锁逻辑

首先依旧是通过这个lock方法作为入口,进去会进入这个 acquire 方法,首先会进入这个 tryAcquire 方法

public final void acquire(int arg) {
   
     
    if (!tryAcquire(arg) &&	//尝试加锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))	//加锁失败则进入队列,独占节点
        selfInterrupt();
}

随后查看这个方法,根据方法名称可知这是一个获取锁的方法。

由于是写锁加锁的逻辑,首先会获取state值,如果是0,那么高位肯定是没有值得,那么肯定是不存在读锁,后续直接操作这个写锁的逻辑即可。

如果state值不为0,那么就需要判断这个state是读锁还是写锁,首先会获取 exclusiveCount 写锁的总个数,如果个数为0,那么表示这个state的值都是读锁的,那么直接返回false,如果这个线程不是重入的,那么也会直接返回false;如果当前线程满足上面的任意条件,那么会判断当前线程重入的次数,不能超过2的32次方-1。总而言之可以总结成两句话:写写互斥,写读互斥

protected final boolean tryAcquire(int acquires) {
   
     
    Thread current = Thread.currentThread();	//获取当前线程 
    int c = getState();		//获取状态,state位32位的值
    int w = exclusiveCount(c);	//获取独占的总线程个数
    //如果state值不为0
    if (c != 0) {
   
     	
        //如果独占的写锁个数为0并且不是重入锁,那么整个状态为读锁状态
        if (w == 0	//写读互斥 
            || current != getExclusiveOwnerThread())	//写写互斥
            return false;	//则直接返回false
        //重入,个数不能大于2的16-1次方
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //state的值+1,因为是写锁,只需要在低位加即可,不需要考虑左移
        setState(c + acquires);
        return true;
    }
	//如果获取的state值为0,那么高16位肯定是全为0,那么就进入独占锁逻辑 
    if (writerShouldBlock() ||	//阻塞操作,没有队列则创建
            !compareAndSetState(c, c + acquires))	//cas抢锁
        return false;
    //同步状态器设置当前线程为独占锁线程
    setExclusiveOwnerThread(current);
    return true;
}

如果获取锁失败,则会执行入队的操作,前面几篇AQS的文章在写入队,阻塞的这些方法已经写了很多次了,因此这里不做详细的解释

    private Node addWaiter(Node mode) {
   
     
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
   
     
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
   
     
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

上面的是入队的操作,下面的是阻塞的逻辑,可以查看前面几篇AQS的文章,写的很详细了

    final boolean acquireQueued(final Node node, int arg) {
   
     
        boolean failed = true;
        try {
   
     
            boolean interrupted = false;
            for (;;) {
   
     
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
   
     
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
   
     
            if (failed)
                cancelAcquire(node);
        }
    }

2.2.2,unlock解锁逻辑

上面讲解了加锁的逻辑,下面的是writeLock解锁的逻辑,比较简单

public final boolean release(int arg) {
   
     
	if (tryRelease(arg)) {
   
     
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
}

首先会进入 tryRelease 方法中释放锁,就是将state的状态减1,由于是写锁,因此对低位操作刚好就是减少写锁的状态值

protected final boolean tryRelease(int releases) {
   
     
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;	//状态减1
    boolean free = exclusiveCount(nextc) == 0;	//判断写锁状态是否为0
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

最后会调用这个 unparkSuccessor 进行出队和唤醒的操作,最终是调用这个LockSupport.unpark方法

LockSupport.unpark(s.thread);

2.3,读锁的底层实现

写锁的逻辑相对而言是比较简单的,接下来查看读锁的底层逻辑实现

private static Lock readLock = readWriteLock.readLock();
readLock.lock();
readLock.unLock();

2.3.1,读锁的加锁逻辑

依旧先查看这个readLock的lock加锁方法,很明显,操作的结点是一个共享结点,上面的写锁是一个独占结点。

public final void acquireShared(int arg) {
   
     
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

继续查看内部tryAcquireShared方法的加锁逻辑,很明显代码比写锁的更多。

首先依旧是会先判断是否存在写结点,如果存在直接返回-1,并且会判断在写结点降级成读结点时,结点是否为同一个结点,如果不是同一个结点则返回-1

如果只存在读结点,则会进行尝试加锁的操作,如果是第一次加这个读锁,则会单独记录,如果不是第一次加,则会将这些可重入锁存储在ThreadLocal里面,从而记录可重入锁的总数。最后如果加锁失败,则会自旋重试加锁

protected final int tryAcquireShared(int unused) {
   
     
    Thread current = Thread.currentThread();
    int c = getState();		//获取同步状态器的值
    if (exclusiveCount(c) != 0 &&	//低位写结点的值不为0,读写互斥
            getExclusiveOwnerThread() != current)	//判断当前线程是否写线程,降级操作
        return -1;
    int r = sharedCount(c);		//读锁的总数
    if (!readerShouldBlock() &&	
            r < MAX_COUNT &&	//小于65535
            compareAndSetState(c, c + SHARED_UNIT)) {
   
     	//尝试加锁,加65535
        if (r == 0) {
   
     	//第一次进来获取读锁
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
   
     	//代表重入
            firstReaderHoldCount++;	
        } else {
   
     
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh); 
            rh.count++; //threadLocl + 1
        }
        return 1;
    }
    return fullTryAcquireShared(current);	//	加锁失败再次尝试获取
}

2.3.1,读锁的解锁逻辑

接下来继续查看这个unlock方法,首先会进入这个 releaseShared 方法

public final boolean releaseShared(int arg) {
   
     
    if (tryReleaseShared(arg)) {
   
     
        doReleaseShared();
        return true;
    }
    return false;
}

接下来查看这个 tryReleaseShared 释放锁的方法,其内部主要就是做了两件事,第一件就是将重入锁的值降低为0,第二件事就是讲高位的值进行减1的操作

protected final boolean tryReleaseShared(int unused) {
   
        
	Thread current = Thread.currentThread();
    //如果有可重入锁,则清0
    if (firstReader == current) {
   
     	
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1) firstReader = null;
        else firstReaderHoldCount--;
    } else {
   
     
        ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
   
     
            readHolds.remove();
            if (count <= 0) throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
   
     
        int c = getState();
        int nextc = c - SHARED_UNIT;	//减高位的值
        if (compareAndSetState(c, nextc)) return nextc == 0;
    }
}

3,总结

ReentrantReadWriteLock读写锁底层也是通过AQS实现,通过state的高低位来区分读锁和写锁,高位表示读锁,低位表示写锁,高位读锁采用的是共享结点,低位写锁采用的是独占节点,共享结点中采用ThreadLocal来保存重入锁的次数,在整个读写锁中,主要是满足以下的规则:读读共享、读写互斥、写写互斥、写读降级

内部的ReadLock和WriteLock满足AQS的所有特性,内部也全部实现了CAS抢锁、失败入队、队列不存在则先创建队列,线程阻塞、修改结点状态、结点出队、线程唤醒等操作