前面刚学了CountDownLatch
,文末提到了CyclicBarrier
,可以重置计数,那么这篇笔记就顺便学习下CyclicBarrier
。
从CyclicBarrier
类单词就可以看出来,这是一个循环的屏障、栅栏,让所有线程都被阻止在这个栅栏之前,等到所有线程都到达栅栏(也就是都被阻塞了),然后打开栅栏,让所有被阻止的线程都执行。
show me code!还是写个栗子:
1 | public class CyclicBarrierTest { |
运行结果:
1 | 铁柱借钱凑彩礼,当前已有1百万 |
还是和CountDownLatch
例子一样现实的例子,为了体现CyclicBarrier
的可重置,这里假设铁柱借钱凑彩礼钱结婚(在我的老家,借钱凑彩礼钱的现象还是很普遍的),但这次比较倒霉,结婚后老婆卷钱跟隔壁老王跑了,然后忍痛卖血搬砖还钱,然后继续攒钱,最后不断地继续重蹈覆辙。。。
在这个例子中,「借钱凑彩礼」这类子线程在调用await()
方法后会被阻塞,直到被阻塞的子线程数量达到了CyclicBarrier
初始化时定义的3个,那么将会放开,唤醒继续执行这些被阻塞的线程(也就是例子中借钱后的还钱~),如果CyclicBarrier
使用的是和例子一样的构造方法(参数是阻塞线程数量和栅栏放开触发执行的线程),那么将会先执行定义的栅栏放开触发执行的线程,再继续执行被阻塞的线程。
从运行结果来看,似乎是当阻塞的线程达到了定义的数量,就会触发栅栏放开,并且栅栏又重置了,那么是怎么实现的呢,还是从学习源码入手。
await()
1 | public int await() throws InterruptedException, BrokenBarrierException { |
CyclicBarrier
是基于ReentrantLock
实现的,内部有一个ReentrantLock
对象属性,在调用的核心实现dowait()
方法中,首先使用lock.lock()
上锁,用来保证dowait()
操作是同步操作。
CyclicBarrier
中有个静态内部类Generation
,每个CyclicBarrier
实例中每一回合都会初始化一个Generation
对象属性: generation
(每一回合是指栅栏没被破坏或重置,如果被破坏或重置,就是新的一回合),Generation
类中只有一个属性:boolean broken
,用于记录当前栅栏是否被破坏(breakBarrier()
),当栅栏被破坏或重置(reset()
)时,该值都会改变。
在dowait()
中,获取generation
,判断当前栅栏当前这一回合的broken状态,如果为true,说明被破坏或者重置过,则抛出异常。再判断当前线程是否被中断,如果当前线程被中断,那么这个线程将不可能会到达栅栏,为了避免死锁,调用breakBarrier()
方法破坏栅栏,并抛出异常。校验过栅栏的broken状态和线程状态之后,将count
值减一(count
是CyclicBarrier
中用于记录栅栏前仍需要等待的线程数量),如果count
等于0,说明设定数量的线程都已到达了栅栏前,如果在CyclicBarrier
构造初始化时设置了栅栏放开触发执行的线程,那么执行栅栏放开触发执行的线程(从command.run()
可以看出,这里执行是同步执行该线程,而非异步并发执行),然后调用nextGeneration()
方法:
1 | private void nextGeneration() { |
调用nextGeneration()
方法唤醒在条件队列等待的线程(这里是条件队列,trip是ReentrantLock
的Condition
对象,调用signalAll()
方法将条件队列等待的线程添加到同步队列中参与竞争锁,这里参考前面学习AQS关于Condition的笔记,那么条件队列等待的线程是什么时候插入的呢,在后面await()
将会看到),并重置栅栏前仍需等待的线程数量,以及重新初始化generation对象,CyclicBarrier的循环重置就是在这里实现的。
调用nextGeneration()
重置之后返回,如果这里出现了异常,则调用breakBarrier()
破坏栅栏,使失败。
而对于count不等于0,也就是当前线程到达后,当前线程不是最后一个到达的线程,栅栏前还有线程没到达,则进入循环,直到栅栏被破坏或者线程中断或者超时。在循环中,根据timed参数判断是否是需要超时等待,如果不是,则调用trip.await()
直接进入等待,即AQS中ConditionObject#await()
:
1 | public final void await() throws InterruptedException { |
该方法实现可中断的条件等待,
CyclicBarrier
通过调用该方法将当前线程加入到AQS的条件队列中等待。
如果需要超时等待,并且参数nanos等待时间大于0,则调用trip.awaitNanos(nanos)
进行超时等待:
1 | public final long awaitNanos(long nanosTimeout) |
和
await()
差不多,只不过多了超时时间。
再回到dowait()
,在加入当前线程到条件队列进行等待之后,判断栅栏当前回合的broken状态,如果已被破坏,则抛出异常;g != generation
如果已经是新的回合,则直接返回;如果是需要超时等待,但时间已到,则调用breakBarrier()
破坏栅栏,唤醒在条件队列等待的线程,然后抛出异常。
CyclicBarrier
还可以使用reset()
来主动介入使栅栏重置:
1 | public void reset() { |
通过breakBarrier()
破坏当前回合栅栏,并重新开启新的回合。
CyclicBarrier
相对来说是比CountDownLatch
复杂的,巧妙地使用ReentrantLock
和Condition
实现了阻塞、计数,并且可以重用,相比CountDownLatch
更灵活,还可以使到达栅栏后先执行指定的触发任务再执行后续任务。