一、简介
ReentrantLock
是一个可重入 且独占式 的锁,相较于传统的 Synchronized
,它增加了轮询、超时、中断等高级功能。其类图如下:
ReentrantLock 是 java.util.concurrent(J.U.C)包中的锁,相比于 synchronized,它多了以下高级功能:
1. 等待可中断
当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。
2. 可实现公平锁
公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。
synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但可以通过带布尔值的构造函数要求使用公平锁。
3. 锁绑定多个条件
一个 ReentrantLock 对象可以同时绑定多个 Condition 对象。
ReentrantLock 有一个内部类 Sync
,它继承了 AbstractQueuedSynchronizer(下文简称“AQS”),抽象了锁的获取和释放操作。Sync 有两个实现类,分别是 FairSync
和 NonfairSync
,分别公平锁实现和非公平锁实现。
一、基本使用
ReentrantLock 的使用十分简单,如下所示。通过 lock()
方法加锁,通过 unlock()
方法释放锁,为了避免死锁,释放锁应当放在 finally 块中,确保锁一定能够释放。
1 2 3 4 5 6 7 8 9 10 11 12 class X { private final ReentrantLock lock = new ReentrantLock(); public void m () { lock.lock(); try { doSomething... } finally { lock.unlock() } } }
下面来简单实验下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class ReentrantLockDemo { private Lock lock = new ReentrantLock(); public static void main (String[] args) { ReentrantLockDemo demo = new ReentrantLockDemo(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(demo::func); executorService.execute(demo::func); } public void func () { lock.lock(); try { for (int i = 0 ; i < 10 ; i++) { System.out.print(i + " " ); } } finally { lock.unlock(); } } }
三、公平锁与非公平锁
ReentrantLock 的公平锁和非公平锁是通过构造方法实现的,默认无参情况下构造的是非公平锁。
1 2 3 4 5 6 7 public ReentrantLock () { sync = new NonfairSync(); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
3.1 NonfairSync
3.1.1 Lock
首先说下非公平锁的获取锁操作。当调用 lock()
方法时,首先判断 compareAndSetState(0, 1)
,该方法实际上做的事情就是对 state
变量做了一个 CAS 操作(利用反射实现),如果 state 值为 0,就将其修改为 1,且继续执行 setExclusiveOwnerThread() 方法,那么这个 state 是什么呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); } private static final long stateOffset;static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state" )); } catch (Exception ex) { throw new Error(ex); } } private volatile int state;
一开始在类图中我们说过 Sync 继承了 AQS
, 在 AQS 类中,有一个 volatile 变量 state
,它代表了ReentrantLock 的重入数 。也就是说如果 ReentrantLock 没有线程占用(即 state = 0),那就就将它占用(state 置为 1)。
接下来看之后的 setExclusiveOwnerThread()
方法做了什么。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public abstract class AbstractOwnableSynchronizer implements java .io .Serializable { protected AbstractOwnableSynchronizer () { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread (Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread () { return exclusiveOwnerThread; } }
如上所示,setExclusiveOwnerThread()
方法位于 AQS 的父类 AbstractOwnableSynchronizer(下文简称“AOS”)中。就是将 exclusiveOwnerThread
变量设置为当前线程。
以上都是 CAS 成功的逻辑,如果 CAS 操作失败,也就是说 ReentrantLock 已经被其他线程占用了,看看 acquire(1)
方法逻辑。
1 2 3 4 5 6 7 8 9 10 11 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); }
可以看到 acquire()
方法是定义在 AQS 类中的,内部调用的 tryAcquire()
方法发现也是一个抽象方法,需要子类去具体实现,在非公平锁中,tryAcquire() 方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } private volatile int state;protected final int getState () { return state; }protected final void setState (int newState) { state = newState; }
nonfairTryAcquire()
方法首先根据 state 的值判断 ReentrantLock 是否已经被占用了,如果没有线程占用,则将其占用。如果已经被占用了且当前线程就是 ReentrantLock 的占用者,就增加重入的次数。
在上文的 acquire()
方法中,当 tryAcquire()
方法执行失败,也就是获取锁失败后,继续执行后面的 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
语句,先来说下 addWaiter(Node.EXCLUSIVE)
。
在 addWaiter()
方法中,根据当前线程加上传入的 Node 节点类型 Node.EXCLUSIVE(独占类型),构造了一个新的 Node 节点,然后取到 AQS 类的 tail 尾节点,然后尝试将新构造出来的节点加入到队列的末尾中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } private transient volatile Node tail;
通过 addWaiter(Node.EXCLUSIVE)
方法已经成功将当前线程加入到 AQS 队列末尾,然后就是调用 acquireQueued()
自旋判断是否处于 AQS 队列头部,如果处于头部,说明轮到当前线程执行了,就可以结束自旋返回了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; }
3.1.2 UnLock
下面来看下重入锁的释放操作,底层调用 AQS 类的 release()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
其中的判断条件 tryRelease()
实现如下,代码比较简单,看注释就应该能够明白含义了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
如果该方法返回 true,即代表锁已经没有线程独占了,下面的处理就是一些对 AQS 同步队列的收尾工作,这里暂且不做展开。
3.2 FairSync
说完了非公平锁,下面来看看公平锁的实现,公平锁相较于非公平锁主要的不同就是 lock()
方法的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final void lock () { acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); }
以上的逻辑都是 AQS 类的逻辑,直接看 tryAcquire()
方法的公平锁实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
该方法和非公平锁的 nonfairTryAcquire()
比较,唯一不同的是判断条件多了 hasQueuedPredecessors()
方法,其定义如下:
1 2 3 4 5 6 7 8 9 public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
该方法是实现“公平 ”的具体逻辑。它对 AQS 同步队列中当前节点是否有前驱节点进行判断,如果该方法返回 true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁 ,以此来实现公平锁。
3.3 测试
下面来分别测试下公平锁和非公平锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 import java.util.ArrayList;import java.util.Collection;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ReentrantLockDemo { private static CountDownLatch latch; public static void main (String[] args) { Lock fairLock = new MyReentrantLock(true ); Lock unFairLock = new MyReentrantLock(false ); testLock(fairLock); } private static void testLock (Lock lock) { latch = new CountDownLatch(1 ); for (int i = 0 ; i < 5 ; i++) { Thread thread = new Worker(lock, latch); thread.setName("Thread-" + i); thread.start(); } latch.countDown(); } } class Worker extends Thread { private Lock lock; private CountDownLatch latch; public Worker (Lock lock, CountDownLatch latch) { this .lock = lock; this .latch = latch; } @Override public void run () { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0 ; i < 2 ; i++) { lock.lock(); try { System.out.println("Lock by [" + getName() + "], Waiting by " + ((MyReentrantLock) lock).getQueuedThreads()); } finally { lock.unlock(); } } } @Override public String toString () { return getName(); } } class MyReentrantLock extends ReentrantLock { MyReentrantLock(boolean fair) { super (fair); } @Override public Collection<Thread> getQueuedThreads () { List<Thread> arrayList = new ArrayList<>(super .getQueuedThreads()); Collections.reverse(arrayList); return arrayList; } }
当使用公平锁运行时,输出大致如下:
1 2 3 4 5 6 7 8 9 10 Lock by [Thread-3], Waiting by [Thread-4] Lock by [Thread-4], Waiting by [Thread-0, Thread-1, Thread-2, Thread-3] Lock by [Thread-0], Waiting by [Thread-1, Thread-2, Thread-3, Thread-4] Lock by [Thread-1], Waiting by [Thread-2, Thread-3, Thread-4, Thread-0] Lock by [Thread-2], Waiting by [Thread-3, Thread-4, Thread-0, Thread-1] Lock by [Thread-3], Waiting by [Thread-4, Thread-0, Thread-1, Thread-2] Lock by [Thread-4], Waiting by [Thread-0, Thread-1, Thread-2] Lock by [Thread-0], Waiting by [Thread-1, Thread-2] Lock by [Thread-1], Waiting by [Thread-2] Lock by [Thread-2], Waiting by []
当使用非公平锁运行时,输出大致如下:
1 2 3 4 5 6 7 8 9 10 Lock by [Thread-3], Waiting by [Thread-4] Lock by [Thread-3], Waiting by [Thread-4, Thread-1, Thread-0, Thread-2] Lock by [Thread-4], Waiting by [Thread-1, Thread-0, Thread-2] Lock by [Thread-4], Waiting by [Thread-1, Thread-0, Thread-2] Lock by [Thread-1], Waiting by [Thread-0, Thread-2] Lock by [Thread-1], Waiting by [Thread-0, Thread-2] Lock by [Thread-0], Waiting by [Thread-2] Lock by [Thread-0], Waiting by [Thread-2] Lock by [Thread-2], Waiting by [] Lock by [Thread-2], Waiting by []
从上述结果可以看到,公平锁每次都是队列中的第一个节点获取到锁,而非公平锁出现了一个线程连续获取锁的情况。
为什么会出现连续获取锁的情况呢?因为在 nonfairTryAcquire(int)
方法中,每当一个线程请求锁时,只要获取了同步状态就成功获取了锁。在此前提下,刚刚释放锁的线程再次获取到同步状态的几率很大,而其他线程只能在同步队列中等待。
3.4 总结
事实上,公平锁往往没有非公平锁的效率高,但是,并不是任何场景都是以 TPS 作为唯一指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越能够得到优先满足。
非公平锁有可能使线程饥饿,那为什么还要将它设置为默认模式呢?我们再次观察上面的运行结果,如果把每次不同线程获取到锁定义为1次切换,公平锁在测试中进行了10次切换,而非公平锁只有5次切换,这说明非公平锁的开销更小。
四、lockInterruptibly & tryLock
4.1 lockInterruptibly
一开始就说过 ReentrantLock 支持等待可中断。在使用 synchronized 时,阻塞在锁上的线程除非获得锁否则将一直等待下去,也就是说这种无限等待获取锁的行为无法被中断。而 ReentrantLock 给我们提供了一个可以响应中断的获取锁的方法 lockInterruptibly()
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
在前面介绍 lock()
方法时,其中的 acquireQueued()
方法在加锁失败后会设置一个中断标识 interrupted,死循环休眠加锁。而 doAcquireInterruptibly()
方法相较于 acquireQueued()
方法取消了中断标识,直接返回来实现响应中断。
4.2 tryLock
获取锁除了使用 lock()
和 lockInterruptibly ()
这类阻塞方法以外,ReentrantLock 还提供了非阻塞加锁方法,也就是 tryLock()
。
tryLock()
立即返回,获取成功返回 true,获取失败返回 false。
tryLock(long timeout, TimeUnit unit)
在给定时间内,获取成功返回 true,获取失败返回 false。
五、对比 Synchronized
5.1 相同点
1. 都是独占锁
ReentrantLock 和 synchronized 都是独占锁,只允许线程互斥的访问临界区。但是实现上两者不同,synchronized 加锁解锁的过程是隐式的,用户不用手动操作,优点是操作简单,但显得不够灵活,ReentrantLock 需要手动加锁和解锁。
2. 都是可重入
ReentrantLock 和 synchronized 都是可重入的。synchronized 因为可重入因此可以放在被递归执行的方法上,且不用担心线程最后能否正确释放锁。而 ReentrantLock 在重入时要却确保重复获取锁的次数必须和重复释放锁的次数一样,否则可能导致其他线程无法获得该锁。
5.2 不同点
1. 锁的实现
synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。
2. 性能
新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等。目前来看它和 ReentrantLock 的性能基本持平了,因此性能因素不再是选择 ReentrantLock 的理由。synchronized 有更大的性能优化空间,应该优先考虑 synchronized。
3. 功能
ReentrantLock 多了一些高级功能。
4. 使用选择
除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。