百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Java并发之CyclicBarrier(java中并发)

cac55 2024-09-19 17:03 27 浏览 0 评论

barrier(屏障)与互斥量、读写锁、自旋锁不同,它不是用来保护临界区的。相反,它跟条件变量一样,是用来协同多线程一起工作的。

条件变量是多线程间传递状态的改变来达到协同工作的效果。屏障是多线程各自做自己的工作,如果某一线程完成了工作,就等待在屏障那里,直到其他线程的工作都完成了,再一起做别的事。举个通俗的例子:

1.对于条件变量。在接力赛跑里,1号队员开始跑的时候,2,3,4号队员都站着不动,直到1号队员跑完一圈,把接力棒给2号队员,2号队员收到接力棒后就可以跑了,跑完再给3号队员。这里这个接力棒就相当于条件变量,条件满足后就可以由下一个队员(线程)跑。

2.对于屏障:在百米赛跑里,比赛没开始之前,每个运动员都在赛场上自由活动,有的热身,有的喝水,有的跟教练谈论。比赛快开始时,准备完毕的运动员就预备在起跑线上,如果有个运动员还没准备完(除去特殊情况),他们就一直等,直到运动员都在起跑线上,裁判喊口号后再开始跑。这里的起跑线就是屏障,做完准备工作的运动员都等在起跑线,直到其他运动员也把准备工作做完。

java.util.concurrent.CyclicBarrier类是一个同步机制。它可以通过一些算法来同步线程处理的过程。换言之,就是所有的线程必须等待对方,直到所有的线程到达屏障,然后继续运行。之所以叫做“循环屏障”,是因为这个屏障可以被重复使用。

CyclicBarrier有两个构造参数,分别是:

CyclicBarrier(int parties)

创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。

CyclicBarrier(int parties, Runnable barrierAction)

创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

让线程在CyclicBarrier中等待

有两个方法可以让线程在CyclicBarrier处等待:

barrier.await();

barrier.await(10, TimeUnit.SECONDS);

第二个方法指线程等待的超时时间,当出现等待超时的时候,当前线程会被释放,但会像其他线程传播出BrokenBarrierException异常。

所有线程在CyclicBarrier等待,是指:

 ? 最后一个线程到达(调用await方法)
 ? 一个线程被被另外一个线程中断(另外一个线程调用了这个现场的interrupt()方法)
 ? 其中一个等待的线程被中断
 ? 其中一个等待的线程超时
 ? 一个外部的线程调用了CyclicBarrier.reset()方法。

下面以5个线程模拟5个运动员。运动员在赛跑的时候都会准备一段时间,当裁判发现所有的运动员都准备完毕的时候,就举起发令枪,比赛开始。

package thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 模拟运动员
**/
public class MyThread extends Thread {
 private CyclicBarrier cyclicBarrier;
 private String name;
 public MyThread(CyclicBarrier cyclicBarrier, String name) {
 super();
 this.cyclicBarrier = cyclicBarrier;
 this.name = name;
 }
 @Override
 public void run() {
 System.out.println(name + "开始准备");
 try {
 Thread.currentThread().sleep(5000);
 System.out.println(name + "准备完毕!等待发令枪");
 try {
 cyclicBarrier.await();
 } catch (BrokenBarrierException e) { 
 e.printStackTrace();
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
}
//测试类
public class Test {
 public static void main(String[] args) {
 CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
 @Override
 public void run() {
 System.out.println("发令枪响了,跑!");
 }
 });
 for (int i = 0; i < 5; i++) {
 new MyThread(barrier, "运动员" + i + "号").start();
 }
 }
}

当执行测试类的时候,输出如下的结果(顺序每次执行可能会不太一样):

运动员1号开始准备
运动员3号开始准备
运动员2号开始准备
运动员0号开始准备
运动员4号开始准备
运动员1号准备完毕!等待发令枪
运动员4号准备完毕!等待发令枪
运动员0号准备完毕!等待发令枪
运动员3号准备完毕!等待发令枪
运动员2号准备完毕!等待发令枪
发令枪响了,跑!

从输出可以看到,当给定数量的参与者(线程)调用了await()方法之后,屏障放开,CyclicBarrier中的屏障动作被触发了。如果没有达到指定的数量,就会一直被阻塞。

Barrier被破坏

BrokenBarrierException如果在参与者(线程)在等待的过程中,Barrier被破坏,就会抛出BrokenBarrierException。可以用isBroken()方法检测Barrier是否被破坏。

1.如果有线程已经处于等待状态,调用reset方法会导致已经在等待的线程出现BrokenBarrierException异常。并且由于出现了BrokenBarrierException,将会导致始终无法等待。

比如,五个运动员,其中一个在等待发令枪的过程中错误地接收到裁判传过来的指令,导致这个运动员以为今天比赛取消就离开了赛场。但是其他运动员都领会的裁判正确的指令,剩余的运动员在起跑线上无限地等待下去,并且裁判看到运动员没有到齐,也不会打发令枪。

package thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class MyThread extends Thread {
 private CyclicBarrier cyclicBarrier;
 private String name;
 private int ID;
 public MyThread(CyclicBarrier cyclicBarrier, String name,int ID) {
 super();
 this.cyclicBarrier = cyclicBarrier;
 this.name = name;
 this.ID=ID;
 }
 @Override
 public void run() {
 System.out.println(name + "开始准备");
 try {
 Thread.sleep(ID*1000); //不同运动员准备时间不一样,方便模拟不同情况
 System.out.println(name + "准备完毕!在起跑线等待发令枪");
 try {
 cyclicBarrier.await();
 System.out.println(name + "跑完了路程!");
 } catch (BrokenBarrierException e) {
 e.printStackTrace();
 System.out.println(name+"看不见起跑线了");
 }
 System.out.println(name+"退场!");
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
}
public class Test {
 public static void main(String[] args) throws InterruptedException {
 CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
 @Override
 public void run() {
 System.out.println("发令枪响了,跑!");
 }
 });
 for (int i = 0; i < 5; i++) {
 new MyThread(barrier, "运动员" + i + "号", i).start();
 }
 Thread.sleep(1000);
 barrier.reset();
 }
}

输出结果:

运动员0号开始准备
运动员1号开始准备
运动员2号开始准备
运动员3号开始准备
运动员4号开始准备
运动员0号准备完毕!在起跑线等待发令枪
运动员1号准备完毕!在起跑线等待发令枪
java.util.concurrent.BrokenBarrierException
运动员0号看不见起跑线了
运动员0号退场!
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
运动员2号准备完毕!在起跑线等待发令枪
运动员3号准备完毕!在起跑线等待发令枪
运动员4号准备完毕!在起跑线等待发令枪

从输出可以看到,运动员0号在等待的过程中,主线程调用了reset方法,导致抛出BrokenBarrierException异常。但是其他线程并没有受到影响,它们会一直等待下去,从而一直被阻塞。

2.如果在等待的过程中,线程被中断,也会抛出BrokenBarrierException异常,并且这个异常会传播到其他所有的线程。

package thread;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;
public class Test {
static Map<Integer,Thread> threads=new HashMap<>();
 public static void main(String[] args) throws InterruptedException {
 CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
 @Override
 public void run() {
 System.out.println("发令枪响了,跑!");
 }
 });
 for (int i = 0; i < 5; i++) {
 MyThread t = new MyThread(barrier, "运动员" + i + "号", i);
 threads.put(i, t);
 t.start();
 }
 Thread.sleep(3000);
 threads.get(1).interrupt();
 }
}

输出:

运动员0号开始准备
运动员2号开始准备
运动员3号开始准备
运动员1号开始准备
运动员0号准备完毕!在起跑线等待发令枪
运动员4号开始准备
运动员1号准备完毕!在起跑线等待发令枪
运动员2号准备完毕!在起跑线等待发令枪
运动员3号准备完毕!在起跑线等待发令枪
java.lang.InterruptedException
运动员3号看不见起跑线了
运动员3号退场!
运动员2号看不见起跑线了
运动员2号退场!
运动员0号看不见起跑线了
运动员0号退场!
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:234)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
java.util.concurrent.BrokenBarrierException
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
java.util.concurrent.BrokenBarrierException
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
java.util.concurrent.BrokenBarrierException
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
运动员4号准备完毕!在起跑线等待发令枪
java.util.concurrent.BrokenBarrierException
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
运动员4号看不见起跑线了
运动员4号退场!

从输出可以看到,其中一个线程被中断,那么所有的运动员都退场了。

3.如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出BrokenBarrierException,屏障被损坏。

这个就好比运动员都没有问题,而是裁判出问题了。裁判权力比较大,直接告诉所有的运动员,今天不比赛了,你们都回家吧!

package thread;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;
public class Test {
 static Map<Integer, Thread> threads = new HashMap<>();
 public static void main(String[] args) throws InterruptedException {
 CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
 @Override
 public void run() {
 String str = null;
 str.substring(0, 1);
 System.out.println("发令枪响了,跑!");
 }
 });
 for (int i = 0; i < 5; i++) {
 MyThread t = new MyThread(barrier, "运动员" + i + "号", i);
 threads.put(i, t);
 t.start();
 }
 }
}

输出:

运动员0号开始准备
运动员3号开始准备
运动员2号开始准备
运动员1号开始准备
运动员4号开始准备
运动员0号准备完毕!在起跑线等待发令枪
运动员1号准备完毕!在起跑线等待发令枪
运动员2号准备完毕!在起跑线等待发令枪
运动员3号准备完毕!在起跑线等待发令枪
运动员4号准备完毕!在起跑线等待发令枪
Exception in thread "Thread-4" java.util.concurrent.BrokenBarrierException
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
运动员0号看不见起跑线了
运动员0号退场!
java.util.concurrent.BrokenBarrierException
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
运动员3号看不见起跑线了
运动员3号退场!
java.util.concurrent.BrokenBarrierException
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
运动员1号看不见起跑线了
运动员1号退场!
java.lang.NullPointerException
 at thread.Test$1.run(Test.java:15)
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:220)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
java.util.concurrent.BrokenBarrierException
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
 at thread.MyThread.run(MyThread.java:27)
运动员2号看不见起跑线了
运动员2号退场!

可以看到,如果在执行屏障动作的过程中出现异常,那么所有的线程都会抛出BrokenBarrierException异常。

4.如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线程会抛出BrokenBarrierException异常。

package thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class MyThread extends Thread {
 private CyclicBarrier cyclicBarrier;
 private String name;
 private int ID;
 public MyThread(CyclicBarrier cyclicBarrier, String name, int ID) {
 super();
 this.cyclicBarrier = cyclicBarrier;
 this.name = name;
 this.ID = ID;
 }
 @Override
 public void run() {
 System.out.println(name + "开始准备");
 try {
 Thread.sleep(ID * 1000);
 System.out.println(name + "准备完毕!在起跑线等待发令枪");
 try {
 try {
 cyclicBarrier.await(ID * 1000, TimeUnit.MILLISECONDS);
 } catch (TimeoutException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 System.out.println(name + "跑完了路程!");
 } catch (BrokenBarrierException e) {
 e.printStackTrace();
 System.out.println(name + "看不见起跑线了");
 }
 System.out.println(name + "退场!");
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
}

输出:

运动员0号开始准备
运动员2号开始准备
运动员3号开始准备
运动员1号开始准备
运动员0号准备完毕!在起跑线等待发令枪
运动员4号开始准备
java.util.concurrent.TimeoutException运动员0号跑完了路程!
运动员0号退场!
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
 at thread.MyThread.run(MyThread.java:29)
运动员1号准备完毕!在起跑线等待发令枪
java.util.concurrent.BrokenBarrierException
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
 at thread.MyThread.run(MyThread.java:29)
运动员1号看不见起跑线了
运动员1号退场!
运动员2号准备完毕!在起跑线等待发令枪
java.util.concurrent.BrokenBarrierException
运动员2号看不见起跑线了
运动员2号退场!
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
 at thread.MyThread.run(MyThread.java:29)
运动员3号准备完毕!在起跑线等待发令枪
java.util.concurrent.BrokenBarrierException
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
 at thread.MyThread.run(MyThread.java:29)
运动员3号看不见起跑线了
运动员3号退场!
运动员4号准备完毕!在起跑线等待发令枪
java.util.concurrent.BrokenBarrierException
运动员4号看不见起跑线了
运动员4号退场!
 at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
 at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
 at thread.MyThread.run(MyThread.java:29)

从输出可以看到,如果其中一个参与者抛出TimeoutException,其他参与者会抛出BrokenBarrierException。

相关推荐

如何屏蔽色情网站?_怎么能屏蔽网站

一、基础防御:全网DNS劫持阻断1.修改全网DNS服务器推荐DNS:安全DNS:CleanBrowsing(成人内容过滤):185.228.168.168/185.228.169.168Open...

容器、Pod、虚拟机与宿主机网络通信全解:看这一篇就够了

在日常开发与部署过程中,很多人一开始都会有这样的疑惑:容器之间是怎么通信的?容器怎么访问宿主机?宿主机又如何访问容器?Kubernetes中Pod的网络和Docker容器一样吗?容器跨机器是...

Win11专业版找不到共享打印机的问题

有很多深度官网的用户,都是在办公室上班的。而上班就需要使用打印机,但更新win11系统后,却出现同一个办公室里面的打印机都找不到的问题,这该如何处理呢?其实,可能是由于我们并没有打开共享打印机而造成的...

常用电脑快捷键大全,摆脱鼠标依赖,建议收藏

Ctrl+C复制Ctrl+X剪切Ctrl+V粘贴Ctrl+Z撤销Ctrl+Y重做Ctrl+B加粗Ctrl+A全选所有文件Ctrl+S保存Ctrl+N新建Ctrl+O打开Ctrl+E...

Win11实现自动追剧Jellyfin硬解,免NAS复杂操作

大家好,欢迎来到思赞数码。本期将详细介绍如何通过安装和配置Sonarr、Radarr、Prowlarr、qBittorrent和Jellyfin,打造一套自动化的影视管理系统。很多人认为,要实现自动追...

微软Win11安卓子系统WSA 2308.40000.3.0更新推送下载

IT之家9月21日消息,微软官方博客今日宣布,已面向所有WindowsInsider用户推送了Windows11安卓子系统的2308.40000.3.0版本更新。本次更新和之前...

路由器总掉线 一个命令就能猜出八九分

明明网络强度满格或有线图标正常,但视频卡成PPT、网页刷不开、游戏动不了,闲心这些问题很多小伙伴都碰到过。每次都要开关路由、宽带/光猫、插拔网线……一通忙。有没有啥办法能快速确定故障到底在哪儿,方便处...

windows电脑如何修改hosts文件?_windows怎么修改hosts

先来简单说下电脑host的作用hosts文件的作用:hosts文件是一个用于储存计算机网络中各节点信息的计算机文件;作用是将一些常用的网址域名与其对应的IP地址建立一个关联“数据库”,当用户在浏览器中...

win10广告弹窗ShellExperienceHost.exe

win10右下角老是弹出广告弹窗,排查为以下程序引起,但是这个是系统菜单的程序不能动:C:\Windows\SystemApps\ShellExperienceHost_cw5n1h2txyewy\S...

Win10 Mobile预览版10512/10166越狱解锁部署已被黑客攻破

看起来统一的WindowsPhone和Windows越加吸引人们的关注,特别是黑客们的好奇心。XDA论坛宣称,在Win10Mobile预览版10512/10166上,已取得越狱/解锁部署突破,比如可...

6款冷门小众软件,都是宝藏,建议收藏

真的很不错(。-ω-)zzzBearhttps://bear.app/cn/Bear是一个漂亮,灵活的Markdown的写作工具。它一样只支持苹果家的全平台。它一出现就惊艳四方,就被AppSto...

如何让不符合条件的设备升级Windows 11

如果你是最近(6月24日之后)加入WindowsInsider项目并且你的设备并不符合升级条件,那么当你在尝试升级Windows11的时候可能会看到以下错误:你的PC不符合Wi...

windows host文件怎么恢复?局域网访问全靠这些!

windowshost文件怎么恢复?windowshost文件是常用网址域名及其相应IP地址建立一个关联文件,通过这个host文件配置域名和IP的映射关系,以提高域名解析的速度,方便局域网用户使用...

Mac Hosts管理工具---SwitchHosts

switchhosts!formac是一款帮助用户快速切换hosts文件的工具,switchhosts!formac能够帮助你快速方便的打造个人专用的网络环境,支持本地和在线两种方式,并且支持...

「浅谈趣说网络知识」 第十二弹 老而不死的Hosts,它还很有用

【浅谈趣说网络知识】第十二弹老而不死的Hosts,它还很有用什么时候才觉得自己真的老了,不是35岁以上的数字,不是头上的点点白发,而是不知觉中的怀旧。风口上的IT界讲的就是"长江后浪推前浪...

取消回复欢迎 发表评论: