Java AQS实现
本篇理清锁和线程相关知识,主要知识点:
AQS;
ReentrantLock;
Condition;
Synchronized;
Semaphore,CountDownLatch;
AQS
AQS:AbstractQueuedSynchronizer,抽象队列同步器。
提供实现阻塞锁的框架。像ReentrantLock, Semaphore等都基于AQS来实现并发资源控制逻辑。
abstract class AbstractQueuedSynchronizer{
//等待队列头结点
private transient volatile Node head;
//等待队列尾结点
private transient volatile Node tail;
//同步状态,计算可重入次数
private volatile int state;
//静态内部类Node
static final class Node {...}
//内部类ConditionObject
public class ConditionObject implements Condition{...}
}
AQS类可重写方法:
tryAcquire:以独占模式获取锁。
tryRelease:独占模式下修改状态释放锁。
tryAcquireShared:以共享模式获取锁。
在读写锁中使用。读写锁允许多个读线程同时访问,不允许读写,写写同时访问。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//设置读锁获取次数相关信息
…
return 1;
}
//再次尝试获取读锁,直到跳出循环
return fullTryAcquireShared(current);
}
此方法大概逻辑:
判断锁被独占并且不是当前线程,返回-1,添加到等待队列中。
判断等待队列下个结点是共享结点,共享计数小于最大值65535(1<<16 – 1), 并且cas操作更新计数+1值成功,则返回1。代码中用state字段的高16位表示读锁,低16位表示写锁。
举例:当前线程C尝试获取读锁
如果A已获取写锁,那么返回-1,等待入队。
如果C已获取写锁,那么readerShouldBlock判断读是否阻塞,主要逻辑是判断等待的下一个结点是否为独占结点,是则走后续逻辑,避免队列中写线程饥饿。
如果是共享结点,则此线程读不阻塞(意味着可以获取读锁。写锁可降级,获取写锁的线程不释放也能获取读锁)。
总结:判断读是否阻塞的条件是等待队列是否有独占线程在等待,有则入队排后面,没有则可以获取锁。
tryReleaseShared:共享模式下修改状态释放锁。
isHeldExclusively:判断独占线程是否为当前线程。
Node:等待队列(CLH队列)结点,为AQS静态内部类。
基于阻塞的CLH队列容纳所有的阻塞线程。
相关属性:
static final class Node {
//共享模式
static final Node SHARED = new Node();
//独占模式
static final Node EXCLUSIVE = null;
/**
* 等待状态
* 初始值0
* CANCELLED值1:表示此线程结点被取消。
* SIGNAL值-1:表示后继线程需要唤醒。
* CONDITION值-2:表示线程在条件队列(Condition)中等待。
* PROPAGATE值-3:共享模式下,向后传播。可理解为被唤醒的线程会接着唤醒在等待队列中的后继线程。
*/
volatile int waitStatus;
volatile Node prev; //前驱结点
volatile Node next; //后继结点
volatile Thread thread; //同步状态的线程
Node nextWaiter; //条件队列中的下一个等待结点
…
}
ReentrantLock
可重入锁ReentrantLock基于AQS来实现同步控制。
静态内部类Sync继承AQS。重写了父类tryRelease方法。
public class ReentrantLock implements Lock{
private final Sync sync;
//锁的同步控制类,继承AQS
abstract static class Sync extends AbstractQueuedSynchronizer {…}
//非公平锁
static final class NonfairSync extends Sync {
final void lock() {
acquire(1);
}
…
}
//公平锁
static final class FairSync extends Sync {
final void lock() {
//cas操作成功,表示获取锁,并更新独占线程信息
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
//构造函数
public ReentrantLock() {
sync = new NonfairSync();
}
}
创建ReentrantLock对象默认为非公平锁。可在构造方法传true实现公平锁。
公平锁和非公平锁都实现了Sync类的lock抽象方法,重写了AQS中tryAcquire方法。
区别:非公平锁获取时,先更新同步状态,成功则表示获取到锁并更新独占模式线程exclusiveOwnerThread 值为当前线程。未获取到锁则进入等待队列。
ReentrantLock#lock()分析
以NonfairSync非公平锁分析
ReentrantLock lock = new ReentrantLock();
//线程A执行
lock.lock();
final void lock() {
//cas操作更新state内存值为1
if (compareAndSetState(0, 1))
//设置独占线程值
setExclusiveOwnerThread(Thread.currentThread());
else
//执行锁取锁逻辑
acquire(1);
}
线程A执行lock()获取锁,锁未被其它线程获取,所以线程A更新state成功,AQS内存属性值为:
线程B执行lock(),cas操作失败,走acquire(1)逻辑。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg)方法逻辑:
获取同步状态值state;
state为0,表示锁未被其它线程获取。执行cas操作,将state值从0变1,成功则表示到锁,并设置独占线程值。
state不为0,表示锁已被占用,判断获取锁的线程是不是自己,是则state +1。
addWaiter(Node.EXCLUSIVE):创建独占结点,准备入队。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
线程B执行addWatier方法时,内存中head和tail都是空的,所以会先执行enq方法。
enq(node):
先初始化一个空结点node1, 此时头结点=尾结点=node1;
设置当前结点node2为尾结点,node2的前驱结点=node1, node1的后继结点=node2。
第一步:第二步:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前驱结点
final Node p = node.predecessor();
//前驱结点等于头结点,则再次尝试获取锁
if (p == head && tryAcquire(arg)) {
//获取到锁,则设置头结点为当前结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//1. 前驱结点为唤醒态,返回true。
//2. 前驱结点为取消态,do while移除取消结点。
//3. 前驱结点为初始状态0或者传播态,则cas操作更新状态值为唤醒态。
需要等待,那么就调用parkAndCheckInterrupt()方法阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
线程B接着执行acquireQueued方法,如果前驱结点是头结点,则再次尝试获取锁,获取到锁,则把当前结点设置为头结点。
未获取到锁,则判断前驱结点waitStatus值,如果waitStatus为0或-3时,则cas操作将waitStatus更新为-1,表示后继结点线程需要唤醒。
parkAndCheckInterrupt():阻塞自个,等待LockSupport.unpark(Thread)唤醒。(有种场景:如果线程A释放锁后,唤醒了线程B,线程B尝试获取锁时被线程F抢到了锁,那么线程B继续阻塞,所以非公平锁就是入队之前先获取锁,这里就会导致B未获取到锁,不公平)
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如果有第三个线程Thread-C来获取锁,那么此时结构图为:
ReentrantLock#unlock()
解锁步骤:
同步状态state值-1,也就是可重入次数-1,如果state为0,则把独占线程值设置为空,表示这个线程不再持有锁。
判断头结点等待状态waitStatus,不为0,则唤醒头结点的后继结点。
对比ReentrantLock和Synchronized区别:
ReentrantLock是JDK提供的锁;Synchronized是JVM内置锁。
两者都是可重入锁,ReentrantLock通过AQS中state值表示重入次数。
ReentrantLock通过lock,unlock加锁解锁。Synchronized通过JVM指令MONITORENTER,MONITOREXIT来加解锁。
ReentrantLock可实现多个队列,newCondition创建条件队列。Synchronized本质一个队列。
Condition
AQS内部类ConditionObject实现Condition接口。条件队列实现为单向链表。通过nextWaiter指针指向下一个结点。
示例:
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
Contion接口中方法可参考Object中方法:
await() — wait();
signal() — notify();
signalAll() — notifyAll()。
内存中队列结构如下:
Semaphore,CountDownLatch
Sempahore 信号量
计数信号量。信号量维护一组许可,通常用于限制线程的数量访问一些(物理或逻辑)资源。
许可采用AQS的state字段表示,初始化若定义10个许可,那么state的值为10。获取许可则cas操作将state值减一。
和ReentrantLock类似,Semaphore有公平版和非公平版实现,也定义了静态内部类Sync继承自AQS。
Sempahore主要方法:
acquire():获取许可。未获取到则入队阻塞,入队创建的结点为共享结点。
release():释放许可。唤醒队列中第一个需要唤醒的线程结点。
availablePermits():返回信号量可用许可数。
getQueueLength():返回等待获取的线程数的预估值。
CountDownLatch
同步辅助类,允许一个线程或多个线程等待直到其它线程中执行的一组操作完成。
创建CountDownLatch对象时需指定初始化计数值count,count值对应的就是AQS中的state同步状态值。此计数值count归0后无法重置。
CountDownLatch主要方法:
countDown():将计数值count减一,当count为0时,则唤醒所有等待的线程。
await():await方法用于入队阻塞。
getCount():返回当前count值。
countDown()会调用到doReleaseShared方法。
doReleaseShared():共享模式下释放锁会唤醒第一个等待的线程,然后会条件判断是否继续唤醒下一个等待的线程。符合条件一直向下传递唤醒所有线程。
至于代码中创建线程为空的头结点,我理解为:
初始化时创建虚拟头结点,是在代码中避免null判断。
可理解头结点为已获取锁的线程,把thread设置null,方便垃圾回收。
声明:来自阿飞技术,仅代表创作者观点。链接:https://eyangzhen.com/6882.html