java并发-(4)同步队列AQS原理

前面我们介绍了使用ReentrantLock对象进行数据同步,以及使用ReentrantLock.newCondition()创建一个同步条件实现了一个生产者-消费者的编程模型。那么其内部机制是如何实现的呢?其实在ReentrantLock内部使用了一个AbstractQueuedSynchronizer来实现,这一节我们来分析一下ReentrantLock的源码,试着分析一下其原理吧

从ReentrantLock入手

《同步机制》这篇文章我们知道可以利用ReentrantLock,在多个线程之间对共享资源进行正确的同步。其实在ReentrantLock内部使用一个叫做队列同步器(AbstractQueuedSynchronizer)的类作为基础框架,实现了数据的同步。

ReentrantLock的类结构


可以看出,ReentrantLock在内部持有一个AbstractQueuedSynchronizer实现类

加锁

加锁的过程分析
1
2
3
4
5
6
7
8
public class ReentrantLock implements Lock, java.io.Serializable {
// 省略部分代码...

public void lock() {
// 在ReentrantLock内部通过代理模式,间接调用NonfairSync的lock()方法
sync.lock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
static final class NonfairSync extends Sync {
// 省略部分代码...

final void lock() {
// 成功修改同步状态,说明可以立即获取到锁
if (compareAndSetState(0, 1))
// 将当前获取到锁的线程绑定到NonfairSync上
setExclusiveOwnerThread(Thread.currentThread());
else
// 未成功修改同步状态,需要加入同步队列,通过自旋的形式再次尝试获取锁
acquire(1);
}
}
尝试获取锁时,同步状态立即修改成功的情况

从图中可以看出,当获取锁时内部通过调用compareAndSetState(0, 1)方法修改了同步状态state,如果该方法的调用结果为true时,则调用setExclusiveOwnerThread(Thread.currentThread())将当前线程对象绑定到AQS实现类(NonfairSync)的对象中,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

// 省略部分代码 ...

/**
* The synchronization state.
*/
// volatile修饰的字段,其内存语义包括(了解下特性即可,具体理论可以参考JVM内存模型相关资料):
// 1. 当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存
// 2. 当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效,线程接下来将从主内存中读取共享变量
private volatile int state;

/**
* The current owner of exclusive mode synchronization.
*/
// 该属性定义在AbstractOwnableSynchronizer类中,用于在AQS记录当前持有锁的线程
private transient Thread exclusiveOwnerThread;

// unsafe相关定义 -- begin
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
} catch (Exception ex) { throw new Error(ex); }
}
// unsafe相关定义 -- end

protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
// 通过Unsafe本地方法,对同步状态字段state进行CAS的原子性修改,确保了各线程之间的内存可见性
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

protected final void setExclusiveOwnerThread(Thread thread) {
// 将传入线程对象绑定到
exclusiveOwnerThread = thread;
}
}

尝试获取锁时,同步状态立即修改失败的情况

通过在NonfairSync的lock()方法中调用AbstractQueuedSynchronizer的acquire(int arg)方法,尝试修改同步状态,并获取锁。其内部逻辑分为两步:

  1. 调用一次tryAcquire(arg)来获取并修改同步状态,成功则表示获取到锁,直接返回
  2. 调用一次tryAcquire(arg)之后,未成功获取到锁,将当前线程以串行的方式放入同步队列
    PS: tryAcquire(arg)由AbstractQueuedSynchronizer的子类NonfairSync实现,具体代码读者可以查看java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    // 省略部分代码...

    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

    // 将当前线程构造为一个Node的实例加入到同步队列
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    enq(node);
    return node;
    }

    // 通过CAS和for循环,促使所有线程以串行的方式进入队列
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }

    // 对于已经在队列中的线程,以独占不间断模式获取同步状态
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    // 如果前驱节点为head节点(即头节点),并且可以获取到同步状态时
    // 设置绑定当前线程的Node实例为头节点
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    // 如果获取同步状态失败时,检查并修改节点的状态(即waitStatus)
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

    // 线程中断,则取消获取同步状态
    private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
    return;

    node.thread = null;

    // Skip cancelled predecessors
    // 从当前节点回溯,直到回溯到的节点的waitStatus状态不为CANCLE状态为止
    Node pred = node.prev;
    while (pred.waitStatus > 0)
    node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 当前节点的waitStatus置为CANCELLED
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // 当前节点为尾节点时,tail指向回溯后的节点。并且设置回溯节点的next为null
    if (node == tail && compareAndSetTail(node, pred)) {
    compareAndSetNext(pred, predNext, null);
    } else {
    // If successor needs signal, try to set pred's next-link
    // so it will get one. Otherwise wake it up to propagate.
    // 回溯后的节点非head、tail节点时,如果waitStatus等于Node.SIGNAL将它的next指向当前节点的next
    int ws;
    if (pred != head &&
    ((ws = pred.waitStatus) == Node.SIGNAL ||
    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
    pred.thread != null) {
    Node next = node.next;
    if (next != null && next.waitStatus <= 0)
    compareAndSetNext(pred, predNext, next);
    } else {
    // 唤醒当前节点的后继节点中的线程
    unparkSuccessor(node);
    }

    node.next = node; // help GC
    }
    }

    // 获取同步状态失败时执行该方法。
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
    /*
    * This node has already set status asking a release
    * to signal it, so it can safely park.
    */
    // 第二次尝试获取同步状态失败时,需要阻塞当前线程
    return true;
    if (ws > 0) {
    /*
    * Predecessor was cancelled. Skip over predecessors and
    * indicate retry.
    */
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    /*
    * waitStatus must be 0 or PROPAGATE. Indicate that we
    * need a signal, but don't park yet. Caller will need to
    * retry to make sure it cannot acquire before parking.
    */
    // 第一次尝试获取同步状态失败时,将节点的waitStatus设为Node.SIGNAL
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }

    private final boolean parkAndCheckInterrupt() {
    // LockSupport.park内部调用了UNSAFE.park(false, 0L)阻塞了当前线程
    // 要解除阻塞,需要两个条件
    // 1. 其他线程执行了UNSAFE.unpark(thread)
    // 2. 其他线程执行了UNSAFE.park(false, 0L)
    // 3. 其他异常情况导致了该方法返回
    LockSupport.park(this);
    return Thread.interrupted();
    }

    }

解锁

解锁的过程分析
1
2
3
4
5
6
7
8
public class ReentrantLock implements Lock, java.io.Serializable {
// 省略部分代码...

// 调用其内部sync成员(即NonfaireSync)的release方法
public void unlock() {
sync.release(1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

// 省略部分代码...

public final boolean release(int arg) {
// 由子类实现的tryRelease方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点绑定的线程
unparkSuccessor(h);
return true;
}
return false;
}
}

Condition原理

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象由Lock对象(调用Lock对象的newCondition()方法)创建出来的。Condition依赖Lock对象。在需要等待的时刻,调用condition对象的await()方法,在需要通知的时刻,调用condition对象的signal()或者signalAll()方法。

await()源码分析

在触发线程等待时,当前持有锁的线程进行了步骤:

  1. 构造一个绑定了当前线程的Node对象,将其加入到等待队列中
  2. 修改同步状态,释放锁,唤醒后继节点绑定的线程
  3. 确保node未加入到同步队列时,挂起当前线程
  4. 在当前线程被唤醒时,调用acquireQueued(Node node, int savedState)方法尝试获取锁
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public final void await() throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    // 将当前线程构造为一个Node对象,加入到ConditionObject的等待队列中
    Node node = addConditionWaiter();
    // 释放同步状态,唤醒后继节点绑定的线程
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 当节点还未移动到同步队列时,挂起当前线程
    while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;
    }
    // 其他线程调用了signal或者signalAll方法之后,该node对应的线程被唤醒。尝试获取锁,获取成功则继续往下执行,否则继续挂起(此时是在同步队列中被挂起)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
    if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
    }
signal()源码分析

在触发线程通知时,当前持有锁的线程进行了:

  1. 当firstWaiter不为空时,将node放入到同步队列的末尾(PS:等待另一个线程修改同步状态,并唤醒后继节点携带的线程)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public final void signal() {
    // 如果调用该方法的线程未获取到condition对象对应的锁,将抛出异常
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
    // 将ConditionObject对象中的firstWaiter中的Node对象唤醒
    doSignal(first);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doSignal(Node first) {
do {
// 如果firstWaiter存在nextWaiter时,解除指向
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// transferForSignal(first)将node对象移动到同步队列
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 等待队列中的node对象,移动到同步队列中
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

总结

  1. AbstractQueuedSynchronizer在内部维护着一个FIFO队列来完成资源获取线程的排队工作
  2. LockSupport.park内部调用了UNSAFE.park(false, 0L)阻塞了当前线程。要解除阻塞,需要满足以下任一条件
    2.1 其他线程执行了UNSAFE.unpark(thread)
    2.2 其他线程执行了UNSAFE.park(false, 0L)
    2.3 其他异常情况导致了该方法返回
  3. 通过CAS机制来对同步状态进行原子性操作