前言

JUC 包里面有很多内容,AQS是核心,锁和队列的实现都要依赖于AQS。

本文纯个人学习笔记,为了加深印象。

一、AQS

AbstractOwnableSynchronizer 抽象类,只有一个属性

当前独占线程

private transient Thread exclusiveOwnerThread;
AbstractQueuedSynchronizer

Node 存储当前线程,FIFO,尾部入队,有前后两个Node引用

static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

// 这些表示节点状态
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
// 头部节点
private transient volatile Node head;
// 尾部节点
private transient volatile Node tail;
// 状态 子类实现的 tryAcquire tryRelease 等 都是在改变这个值
private volatile int state;
获取锁
public final void acquire(int arg) {
// 子类实现 当前线程获取锁 获取了就正常执行,失败,就要入队
if (!tryAcquire(arg) &&
// 独占锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
// 构造线程节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 如果尾节点存在,那就通过Cas 把当前节点放到尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 没有则进行初始化入队
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 不存在则需要构造一个空节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 初始化好尾部节点后,通过Cas将当前节点设置到尾部,则addWaiter 结束,进行acquireQueued操作
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

// 入队后
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 看前置节点是不是头节点,是就尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 不是的话 看需要不需要 park,只有前置节点的waitstatus = -1 也就是 SIGNAL 才能park,park后就等待 unpark;如果不满足自旋
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 只有前置节点就是 SIGNAL,当前节点才能park
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 大于0 就是 canceled ,需要从队列中剔除,一直往前找,找到不是大于0的,然后将当前节点挂在这个后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
如果线程被unpark后,重新开始自旋,直到成功获取锁,或者被打断;如果被中断,就是进入acquireQueued 的finally cancelAcquire(node);

cancelAcquire 主要是做:

继续从当前节点往前找,找到 waitStatus <= 0 ,如果当前节点是 尾节点,就把找到的节点cas 设置成尾节点;

将当前节点设置 CANCELLED

如果当前是头节点,就会去unpark 最前面一个 waitStatus <= 0 的节点
**/
释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果头节点不存在 或者 是空节点 说明 线程队伍不存在 直接 true
if (h != null && h.waitStatus != 0)
// 进行unpark 操作
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

// FIFO,取头节点的后置节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 如果后置节点不存在,或者后置节点 已经 Canceled,就从尾部开始找,找到最前面一个有效节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到了 就唤醒 当前节点
if (s != null)
LockSupport.unpark(s.thread);
}

共享模式区别在于会唤醒后继的共享节点,一直到临界值后(tryAcquireShared 返回的值),在进入自选阻塞

AbstractQueuedLongSynchronizer 跟上面区别在用 用 long 控制 state
  • ReentrantLock

    基于AQS实现,通过

    abstract static class Sync extends AbstractQueuedSynchronizer {

    // FairSync NonfairSync 实现
    abstract void lock();

    // 非公平锁的 tryAcquire 实现
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    // 重入
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

    // 释放
    protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    // 为0 就释放 不是0 就未释放,因为重入 会多次获取,对应要多次释放
    if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
    }
    }
    // 非公平锁
    static final class NonfairSync extends Sync {

    final void lock() {
    if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }
    }
    // 公平锁
    static final class FairSync extends Sync {

    final void lock() {
    acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    /** 这里体现公平锁,释放锁时,会unpark 前置的节点是head的node节点,开始尝试获取锁,但此时又有线程尝试获取锁,这个时候就会跟unpark的node节点产生竞争;但是公平锁,会去检查是否有等待够久的节点,
    **/
    if (!hasQueuedPredecessors() &&
    compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }
    }

二、其他组件

Semaphore

基于AQS 共享模式,permits 信号量这个值其实就是 设置给AQS的state变量,通过对state变量的操作,判断是否获取到锁,获取失败入队

abstract static class Sync extends AbstractQueuedSynchronizer {
// 构造器 传入 state 变量
Sync(int permits) {
setState(permits);
}

final int getPermits() {
return getState();
}

//默认非公平的 tryAcquire 实现方式
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// 释放
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
// 非公平
static final class NonfairSync extends Sync {

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 公平
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
// 公平在于判断 队列中是否有node
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

CyclicBarrier

同步屏障 基于 ReentrantLock Condition

每次调用 dowait count —,到0 则到达屏障,开始唤醒

CountDownLatch

调用await 如果不是0,就会阻塞

countDown 每次减1,到0的时候唤醒,因为是共享模式,所有的共享节点都会唤醒

基于AQS

private static final class Sync extends AbstractQueuedSynchronizer {

Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 所有节点唤醒,就会重新执行 tryAcquireShared 操作,当state 为0,为真,这样所有的工作线程都可以工作了
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// countDown 的实现,如果当前state 不是0,那就执行cas 操作,减到0,就可以执行AQS.doReleaseShared 唤醒阻塞线程
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}