一、前言
今天来介绍下 concurrent 包下的一个工具类——CountDownLatch
,这算是一个比较实用的工具类,在我们日常开发中使用的比较多,而且 API 也很简单,总结记录下。
二、基本使用
CountdownLatch
的主要功能是允许一个或多个线程等待直到在其他线程中一组操作执行完成,用人话说就是多个线程分别执行任务,另外某个线程等待这些线程全部执行完毕后,再做其他操作。
举个例子,麻麻在炒菜的时候,总是喜欢先将炒菜材料都准备好后,再准备开锅炒菜。由于今天家里来了客人,要准备的炒菜材料太多了,妈妈让你和你的弟弟一起来准备炒菜材料,麻麻等你们都准备好后开始炒菜。
在这个例子中,你和弟弟就分别是一个线程,你们做的工作就是准备炒菜材料。麻麻是另外一个线程,她会先休息会,等待你们都准备好后继续执行炒菜任务。
CountdownLatch 的主要 API 只有三个,如下所示:
1 | public CountDownLatch(int count); |
第一个是构造方法,参数指定了麻麻要等待的次数。假设要等待你和弟弟一人一次,那么值就是2。
第二个 countDown()
方法是当你或者弟弟执行完毕后,调用一次,麻麻要等待的次数就减1了。
第三个 await()
方法是对于麻麻用的,当麻麻执行该方法时,如果等待的次数不为0,则一直等待。如果为0,表示你和弟弟都准备完毕了,可以开始炒菜了。
1 | public class CountdownLatchDemo { |
程序运行结果如下:
1 | 你开始干活了... |
下面有一些需要注意的:
countdown()
方法并没有规定一个线程只能调用一次,每调用一次计数器就会减一。await()
方法并没有规定只能一个线程调用,如果有多个线程调用,那么这几个线程都会阻塞直到计数器为0。
三、源码浅析
CountDownLatch
的底层实现其实和 ReentrantLock 是一样的,都是 AQS。对这两个有兴趣的,可以移步《Java 并发编程——ReentrantLock》。
下面是 CountDownLatch 的类图,可以看到它有一个内部类 Sync,Sync 的父类也就是 AQS。
3.1 constructors
构造方法很简单,实参 count 最后设置到了 AQS 的 state 变量上,注意该变量是多线程可共享的。
1 | public CountDownLatch(int count) { |
3.2 countdown
countdown() 方法调用的底层实际上是 AQS 的 releaseShared()
方法,releaseShared() 方法代码如下:
1 | // java.util.concurrent.CountDownLatch#countDown |
releaseShared() 方法中判断 tryReleaseShared() 是否为 true,为 true 执行 doReleaseShared()
方法。先看下 releaseShared() 代码:
1 | // java.util.concurrent.CountDownLatch.Sync#tryReleaseShared |
ReleaseShared 的意思是释放计数器,因此 tryReleaseShared()
中只有 state 减1后为0时才需要调用 doReleaseShared()
方法,看下该方法逻辑:
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared |
在 doReleaseShared()
方法中,就是对 AQS 的同步队列中其它线程的唤醒操作。首先判断头结点不为空且不为尾节点,说明等待队列中有等待唤醒的线程,这里需要说明的是,在等待队列中,头节点中并没有保存正在等待的线程,其只是一个空的 Node 对象,真正等待的线程是从头节点的下一个节点开始存放的,因而会有对头结点是否等于尾节点的判断。
在判断等待队列中有正在等待的线程之后,其会清除头结点的状态信息,并且调用 unparkSuccessor(Node)
方法唤醒头结点的下一个节点。
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor |
可以看到,unparkSuccessor(Node)
方法的作用是唤醒离传入节点最近的一个处于等待状态的线程,使其继续往下执行。
前面我们讲到过,等待队列中的线程可能有多个,而调用 countDown()
方法的线程只唤醒了一个处于等待状态的线程,这里剩下的等待线程是如何被唤醒的呢?其实这些线程是被当前唤醒的线程唤醒的。具体的我们可以看看 await()
方法的具体执行过程。
3.3 await
1 | public void await() throws InterruptedException { |
可以看到 await() 实际调用了 acquireSharedInterruptibly()
方法,作用是判断当前线程是否需要以共享状态获取执行权限。首先判断 state 值是否为0。如果为0,表示当前线程不需要进行权限获取,返回-1则表示当前线程需要进行共享权限,具体看 doAcquireSharedInterruptibly()
方法:
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly |
在 doAcquireSharedInterruptibly()
方法中,首先使用当前线程创建一个共享模式的节点。然后死循环判断当前线程是否获取到执行权限,如果有则将当前节点设置为头节点,并且唤醒后续处于共享模式的节点;如果没有,则调用shouldParkAfterFailedAcquire()
和 parkAndCheckInterrupt()
方法使当前线程处于阻塞状态,该阻塞状态是由操作系统进行的,这样可以避免该线程无限循环而获取不到执行权限,造成资源浪费。
当有多个线程调用 await() 方法而进入等待状态时,这几个线程都将等待在此处。这里回过头来看前面将的 countDown() 方法,其会唤醒处于等待队列中离头节点最近的一个处于等待状态的线程,也就是说该线程被唤醒之后会继续从这个位置开始往下执行,此时执行到 tryAcquireShared() 方法时,发现大于0(因为 state 已经被置为 0 了),该线程就会调用 setHeadAndPropagate() 方法,并且退出当前循环,也就开始执行 awat() 方法之后的代码。
下面我们看看 setHeadAndPropagate()
方法的具体实现:
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate |
setHeadAndPropagate() 方法主要作用是设置当前节点为头结点,并且将唤醒工作往下传递,在传递的过程中,其会判断被传递的节点是否是以共享模式尝试获取执行权限的,如果不是,则传递到该节点处为止(一般情况下,等待队列中都只会都是处于共享模式或者处于独占模式的节点)。
也就是说,头结点会依次唤醒后续处于共享状态的节点。这里doReleaseShared()方法也就是我们前面讲到的会将离头结点最近的一个处于等待状态的节点唤醒的方法。
四、超时阻塞
在上面介绍 await()
时,只有当计数器为 0 时,或者线程被中断后,才会结束阻塞。但是万一执行任务的线程出现了死锁或者死循环,那么岂不是一直阻塞在那,永远都无法继续下去了?
因此同时提供了一个带有超时时间的 await() 方法,当到达超时时间后,立即返回。根据返回值来决定是否执行完毕。
1 | public boolean await(long timeout, TimeUnit unit) throws InterruptedException { |
在实际使用过程中,CountdownLatch 有可能被应用于多个微服务之间的数据获取,一旦接口没有做好超时处理,那么 CountDownLatch 就有可能长时间甚至一直阻塞,因此使用带超时的 await() 方法是十分必要的。
五、使用实例
CountDownLatch 非常适合对于任务进行拆分,使子任务能够并行执行,这也是我目前实际开发使用中的一个主要应用点。后续我接触到了其它应用场景,会继续分享给大家~
下面是一个任务拆分模拟实现,将 List 中 0 ~ 100 这些数据进行分批处理,每个线程处理 10 个,等到全部处理完毕后,打印一下 CountDownLatch complete!
1 | public class CountdownLatchDemo { |
运行结果如下:
1 | Thread-pool-1-thread-1: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] |