Fork me on GitHub

并发编程学习(三):CountDownLatch的实现原理及使用

什么是CountDownLatch?

什么是CountDownLatch

在本篇博客的封面,我放了一个截图,上面对于CountDownLatch的翻译是这样的:闭锁,倒计时门闩。其实顾名思义,CountDownLatch实际上就是一个计数器:计数-计数完成后做一些事。其实这个东西可以类比为一个水坝:当水还没有装满水库的时候水坝是关闭的,当水装满之后开闸放水,水库中的水”一起”涌出水库。

拥有同样功能的还有CyclicBarrier这个类,但是这个类相对较复杂,并且相对于CountDownLatch还可以重复使用,实际上前者一般被叫做线程计数器,后者被叫做循环屏障,还是有很大区别的。这个 在后面再进行源码学习

CountDownLatch是如何实现的?

ReentrantLock类似,内部也是有一个实现了AbstractQueueSynchronizer的内部类。内部类做了父类的共享式的显示锁的方法实现,维护一个初始为N的状态state,每次有线程调用之后阻塞,然后state减1,直到减为0之后所有阻塞的线程重新开始执行。

首先是内部类Sync的实现

构造器接收一个int参数初始化state的值。tryAcquireShared()方法不会对state做改变,当state不为0的时候返回-1即失败,当state等于0其返回1,表示计数器已经计数完成,await()方法不再阻塞。tryReleaseShared()方法会使用原子操作当countDown()被调用的时候释放一个state的占用,即state-1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

CountDownLatch的countDown方法

countDown方法主要作用就是使state-1

1
2
3
public void countDown() {
sync.releaseShared(1);
}

AQS中的releaseShared()方法的实现,如果释放成功执行doReleaseShared();

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

CountDownLatch的await方法

await方法会等待当前state值是否是0,如果不是的话就一直阻塞。直到state为0。

1
2
3
4
5
6
7
8
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

AQS中的acquireSharedInterruptibly()方法实现如下,在AQS的实现中,判断当前线程是否中断,是的话抛出中断异常,否则判断当前线程是否继续需要阻塞,即调用tryAcquireShared()。是的话进入doAcquireSharedInterruptibly()方法,不断的判断int r = tryAcquireShared(arg);,state如果一直不等于0,r就一直是负数,就会继续进入循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

其实以上代码的整体流程非常简单,即初始化CountDownLatch的state=N,每次调用countDown时state-1,减到0的时候停止阻塞,继续向下执行。

我可以用CountDownLatch来做什么事情?

使用CountDownLatch模拟并发场景

  • 可以使用CountDownLatch,创建多个线程并等待线程全部就绪之后唤醒所有线程。可以用这种方式测试代码的可用性,或者测试单例类等;

我在自己学习过程中也有写过类似的测试类 - github

使用CountDownLatch等待依赖线程执行

  • CountDownLatch用来等待其他依赖服务都启动好之后在进行自身线程的任务处理

总结

CountDownLatch是面试的时候多线程这块很容易被问到的点,实际上会考察这几个方面:

  • 1、内部实现原理 —— 使用内部类继承AQS实现;
  • 2、需要注意的方面 —— 计数器为0时,await后面的方法才会执行,否则一直阻塞,countDown方法尽量写在finally代码块中,避免出现异常导致死锁;
  • 3、使用场景 —— 监控一些依赖服务启动完成之后执行代码,或者造“水坝”,即模拟大量并发场景等。
陈年风楼 wechat
Add my WeChat, share tech-skills to each other 🙆‍