上一篇博客我们介绍了ReentrantLock,其实如果单纯解决线程安全问题ReentrantLock就足够用了,但是ReentrantLock是独占锁,也就是说一次只能有一个线程获取到该锁,但是实际中会有写少读多的情况发生。我们知道,读取数据如果同时只能一个线程读取,那么会严重影响效率,显然我们的ReentrantLock满足不了这个需求,所以ReentrantReadWriteLock读写锁应运而生。它采用读写分离的策略,允许多个线程同时获取读锁。
1、初探ReentrantReadWriteLock
要想了解ReentrantReadWriteLock内部原理,我们先来看看他的类图吧:
我们可以看到ReentrantReadWriteLock内部维护了两把锁,ReadLock和WriteLock,他们都依赖Sync去实现具体的功能。Sync继承自AQS,并且也提供了公平与非公平的实现,在上一篇博客中我们介绍了公平与非公平的区别,及源码分析。这里就不做过多的赘述了。我这里就直接介绍非公平下的读写锁的实现。
我们知道AQS中只维护了state这个变量,且继承的子类需要根据具体用处,来给它定义。那么一个state如何用来表示读锁和写锁呢?ReentrantReadWriteLock巧妙地把state的高16位表示读状态,也就是获取读锁的次数,用state的低16位表示获取到写锁的可重入次数。我们可以简单来看一下Sync里面维护的变量的代码:
static final int SHARED_SHIFT = 16;
//共享锁的单位值1左移16位=65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//共享锁的线程的最大数:1左移16位-1 = 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//写锁掩码,二进制,15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
//返回读锁的线程数
static int sharedCount(int c) {
return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
//返回写锁的重入次数
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK; }
/**
*用来记录除去第一个获取到读锁的线程外的其他线程的重入次数
*/
private transient ThreadLocalHoldCounter readHolds;
/**
*用来记录最后一个获取到读锁的线程的重入次数
*/
private transient HoldCounter cachedHoldCounter;
/**
*用来记录第一个获取到读锁的线程
*/
private transient Thread firstReader = null;
/**
*用来记录第一个获取到读锁的线程的重入次数
*/
private transient int firstReaderHoldCount;
读到这里,你肯定一堆疑惑,不要紧,你接着往下看:
2、写锁的获取与释放
2.1、void lock()
ReentrantReadWriteLock实现写锁是通过WriteLock来实现的。WriteLock写锁是一个独占锁也是一个可重入锁,也就是说同时只能有一个线程占有该锁。如果当前没有线程获取读锁和写锁,那么当前线程请求后可以直接获得(因为我们这里用的是非公平的方式)。如果当前有线程占用读锁和写锁,那么请求写锁的线程就会被阻塞挂起。如果当前线程已经获得了写锁,那么它还可以再次获取。话不多说,我们来看看他的源码:
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
//尝试获取锁,失败后执行后面的代码
if (!tryAcquire(arg) &&
//获取锁失败,将当前线程变为Node.EXCLUSIVE的节点,加入AQS阻塞队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//将自己阻塞挂起
selfInterrupt();
}
那么我们接着看看tryLockAcquire的代码:
protected final boolean tryAcquire(int acquires) {
//获得当前线程
Thread current = Thread.currentThread();
//获取AQS的state
int c = getState();
//写锁的重入次数
int w = exclusiveCount(c);
//c!=0说明读锁或者写锁已经被获取了
if (c != 0) {
// w=0说明已经有线程获取了读锁,但是没获取写锁
//w!=0并且线程也不是这个锁的拥有者,直接返回false,获取写锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//执行到这里,说明当前线程获取了写锁,然后判断一下重入的次数有没有越界
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 重新设置state的值
setState(c + acquires);
return true;
}
//执行到这里,说明读锁或者写锁没被获取了
//所以这是,第一个获取写锁的线程
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置写锁的拥有者
setExclusiveOwnerThread(current);
return true;
}
对于上面的代码,基本流程注释都写的很详细了,我这里只说一下writerShouldBlock,在各个方法其实就是用来区别公平锁和非公平锁的。非公平锁的实现方式:
final boolean writerShouldBlock() {
return false; // writers can always barge
}
就是直接返回false,也就是直接进行下面的CAS操作,并设置状态state,和锁的拥有者。也就是不管先来后到,只要到了这里就拿锁就完事了。
而公平锁的处理方式就像下面这样:
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
它会先查看一下,AQS阻塞队列里面有没有比当前线程先来的线程。如果有,那么当前线程就会放弃争夺。直接返回false。
2.2、void lockInterruptibly()
类似于lock方法,他与lock方法的不同就在于,他对中断作出了响应。,当其他线程调用该线程的interrupt方法中断当前线程,那么当前线程会抛出InterruptedException异常。
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
2.3、boolean tryLock()
尝试获取写锁,如果当前没有其他线程占用读锁或者写锁,那么当前线程尝试获取锁就会成功,返回true。如果当前锁被其他线程占用,那么执行该方法会直接返回false,且不会阻塞线程,如果当前线程已经持有了写锁,那么增加AQS的状态值,并返回true。
public boolean tryLock( ) {
return sync.tryWriteLock();
}
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
上面的tryWriteLock方法和tryAcquire类似,这里我就不做多的赘述了。
2.4、boolean tryLock(long timeout , TimeUnit unit)
这个方法的不同之处就在于他设置了一个超时时间,如果尝试获取写锁失败则会把当前线程挂起指定时间,待超时以后就会激活该线程,如果还是没有获取到锁,那么就会返回false。另外,这个方法还对中断做了响应。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
2.5、void unlock()
这个方法为释放锁,如果当前线程持有该锁,调用该方法会把AQS的状态值减1,减去1后为0,那么就释放该锁,否则就仅仅减1,特别要注意,我们一定要获取锁,才能释放锁。如果没有获得锁就释放,那么就会抛出IllegalMonitorStateException异常。我们一起看看源码吧:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
//释放成功,则激活阻塞队列里面的队头线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//判断释放锁的线程,是否应有这把锁,如果没有锁就抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//状态值-1
int nextc = getState() - releases;
//判断重入的次数是否为0
boolean free = exclusiveCount(nextc) == 0;
//重入次数为0,就把写锁的所有者设置为null
if (free)
setExclusiveOwnerThread(null);
//把新的状态值设置回AQS
setState(nextc);
return free;
}
3、读锁的获取与释放
ReentrantReadWriteLock实现读锁是通过ReadLock来实现的。读锁是一个共享锁,也就是允许多个线程去读取,也是一个可重入锁。
3.1、 void lock()
当一个线程获取读锁时,如果当前没有其他线程持有读锁和写锁,则当前线程可以获取读锁,然后AQS的state的高16位的值就会加1,然后方法返回。但是如果其他线程持有写锁,注意是写锁,那么当前线程就会被阻塞。因为有线程持有写锁,说明他正在修改数据,你这个时候去读到的数据必然会产生不一致性。
接下来我们就来看看读锁的lock的源码:
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
//获得当前线程
Thread current = Thread.currentThread();
//获得AQS的state值
int c = getState();
//如果写锁的重入次数不为0,写锁的拥有者不为当前线程,则返回-1
//也就是写锁被其他线程占用了
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//获得读锁的计数,也就是有多少个线程拥有写锁
int r = sharedCount(c);
//和上面说的writeShouldBlock()方法大同小异,
//不同的实现,会得到公平锁和非公平锁
//整个判断为尝试获取锁,多个线程只有一个会成功,不成功的进入fullTryAcquireShared自旋
if (!readerShouldBlock() &&
//判断当前线程有没有超过最大数
r < MAX_COUNT &&
//上面都通过了,当前线程拿到写锁,并执行CAS修改c
compareAndSetState(c, c + SHARED_UNIT)) {
//如果r为0,说明他是第一个获取读锁的线程
if (r == 0) {
//设置相关属性
firstReader = current;
firstReaderHoldCount = 1;
//否则就判断一下当前线程和第一个线程是否为同一个线程
} else if (firstReader == current) {
//相等,重入次数加一
firstReaderHoldCount++;
} else {
//获得最后一个获取读锁的线程的重入次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
//或者记录其他线程可重入的次数
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//类似于TtryAcquireShared方法,但是是自旋获取锁
return fullTryAcquireShared(current);
}
那么我们接下来看看fullTryAcquireShared这个方法吧:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
//自旋
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
你会发现,其实和上面的真的大同小异,只是多了一个自旋而已。
接下来,我们直接来看释放锁吧
3.2、void unlock()
我们找到他的释放锁的代码:
public void unlock() {
sync.releaseShared(1);
}
我们发现他的释放锁的方法,其实是委托给Sync的releaseShared方法来实现的。
那么我们来看看releaseShared的代码:
public final boolean releaseShared(int arg) {
//尝试释放锁,成功返回true,失败返回false
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
//获取当前线程
Thread current = Thread.currentThread();
//判断当前线程是不是第一个获得读锁的线程
if (firstReader == current) {
//如果是,判断他的重入次数是否为1
if (firstReaderHoldCount == 1)
//如果为1,就释放第一次获取到的锁的线程
firstReader = null;
else
//不是,则重入次数-1
firstReaderHoldCount--;
} else {
//不是第一个获得锁的线程
//获得最后一个获得锁的线程的重入次数
HoldCounter rh = cachedHoldCounter;
//判断当前线程是否为最后一个获得锁的线程
if (rh == null || rh.tid != getThreadId(current))
//不是,则拿当前线程的重入次数
rh = readHolds.get();
int count = rh.count;
//如果重入次数小于=1
if (count <= 1) {
//释放
readHolds.remove();
//如果小于等于0,那么抛出异常
if (count <= 0)
throw unmatchedUnlockException();
}
//将重入次数-1
--rh.count;
}
//自旋进行CAS,CAS成功则返回true,释放成功
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
4、案例介绍
上一篇博客介绍了ReentrantLock实现线程安全的ArrayList,但是ReentrantLock是独占锁,所以在读多写少的情况下,他的效率非常低下。那么下面我来使用ReentrantReadWriteLock来实现。读取list的元素使用读锁,增加删除操作使用写锁。
public class ReadWriteArrayList {
//用线程不安全的ArrayList当做底层容器
private ArrayList<String> list = new ArrayList<String>();
//创建读写锁
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
//获得读锁
private final Lock read = lock.readLock();
//获得写锁
private final Lock write = lock.writeLock();
//添加元素
public boolean add(String e){
write.lock();
try{
list.add(e);
return true;
}catch (Exception ex){
return false;
} finally {
write.unlock();
}
}
//获得元素
public String get(int index){
read.lock();
try{
return list.get(index);
}finally {
read.unlock();
}
}
//删除元素
public boolean remove(String e){
write.lock();
try{
list.remove(e);
return true;
}catch (Exception ex){
return false;
} finally {
write.unlock();
}
}
}