百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

最常用的CountDownLatch, CyclicBarrier你知道多少?(必会知识)

cac55 2024-09-19 17:03 46 浏览 0 评论

CountdownLatch,CyclicBarrier是非常常用并发工具类,可以说是Java工程师必会技能了。不但在项目实战中经常涉及,而且在编写压测程序,多线程demo也是必不可少,所以掌握它们的用法和实现原理非常有必要。

等待多线程完成的CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。也就是说通过使用CountDownLatch工具类,可以让一组线程等待彼此执行完毕后在共同执行下一个操作。具体流程如下图所示,箭头表示任务,矩形表示栅栏,当三个任务都到达栅栏时,栅栏后wait的任务才开始执行。

CountDownLatch维护有个int型的状态码,每次调用countDown时状态值就会减1;调用wait方法的线程会阻塞,直到状态码为0时才会继续执行。

在多线程协同工作时,可能需要等待其他线程执行完毕之后,主线程才接着往下执行。首先我们可能会想到使用线程的join方法(调用join方法的线程优先执行,该线程执行完毕后才会执行其他线程),显然这是可以完成的。

使用Thread.join()方法实现


public class RunningRaceTest {
    public static void main(String[] args) throws InterruptedException {
        Thread runner1 = new Thread(new Runner(), "1号");
        Thread runner2 = new Thread(new Runner(), "2号");
        Thread runner3 = new Thread(new Runner(), "3号");
        Thread runner4 = new Thread(new Runner(), "4号");
        Thread runner5 = new Thread(new Runner(), "5号");
        runner1.start();
        runner2.start();
        runner3.start();
        runner4.start();
        runner5.start();

        runner1.join();
        runner2.join();
        runner3.join();
        runner4.join();
        runner5.join();

        // 裁判等待5名选手准备完毕
        System.out.println("裁判:比赛开始~~");
    }
}

class Runner implements Runnable {
    @Override
    public void run() {
        try {
            int sleepMills = ThreadLocalRandom.current().nextInt(1000);
            Thread.sleep(sleepMills);
            System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Thread.join()完全可以实现这个需求,不过存在一个问题,如果调用join的线程一直存活,则当前线程则需要一直等待。这显然不够灵活,并且当前线程可能会出现死等的情况。

更加灵活的CountDownLatch

jdk1.5之后的并发包中提供了CountDownLatch并发工具了,也可以实现join的功能,并且功能更加强大。

// 参赛选手线程
class Runner implements Runnable {
    private CountDownLatch countdownLatch;
    
    public Runner(CountDownLatch countdownLatch) {
        this.countdownLatch = countdownLatch;
    }

    @Override
    public void run() {
        try {
            int sleepMills = ThreadLocalRandom.current().nextInt(1000);
            Thread.sleep(sleepMills);
            System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 准备完毕,举手示意
            countdownLatch.countDown();
        }
    }
}

public class RunningRaceTest {
    public static void main(String[] args) throws InterruptedException {
        // 使用线程池的正确姿势
        int size = 5;
        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy());
        
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < size; i++) {
            threadPoolExecutor.submit(new Runner(countDownLatch));
        }

        // 裁判等待5名选手准备完毕
        countDownLatch.await(); // 为了避免死等,也可以添加超时时间
        System.out.println("裁判:比赛开始~~");

        threadPoolExecutor.shutdownNow();
    }
}

输出结果:

5 号  选手已就位, 准备共用时: 20ms
4 号  选手已就位, 准备共用时: 156ms
1 号  选手已就位, 准备共用时: 288ms
2 号  选手已就位, 准备共用时: 519ms
3 号  选手已就位, 准备共用时: 945ms
比赛开始~~


同步屏障CyclicBarrier

CyclicBarrier可以实现CountDownLatch一样的功能,不同的是CountDownLatch属于一次性对象,声明后只能使用一次,而CyclicBarrier可以循环使用

从字面意义上来看,CyclicBarrier表示循环的屏障,当一组线程全部都到达屏障时,屏障才会被移除,否则只能阻塞在屏障处。

public class RunningRace {
    public static void main(String[] args) {
        // 使用线程池的正确姿势
        int size = 5;
        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy());

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比赛开始~~"));
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.submit(new Runner(cyclicBarrier));
        }
    }
}

class Runner implements Runnable {
    private CyclicBarrier cyclicBarrier;

    public Runner(CyclicBarrier countdownLatch) {
        this.cyclicBarrier = countdownLatch;
    }

    @Override
    public void run() {
        try {
            int sleepMills = ThreadLocalRandom.current().nextInt(1000);
            Thread.sleep(sleepMills);
            System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

由于CyclicBarrier可以循环使用,所以CyclicBarrier的构造方法中可以传入一个Runnable参数,在每一轮执行完毕之后就会立刻执行这个Runnable任务

CountDownLatch设计与实现

CountDownLath是基于AQS框架的一种简单实现,有两个核心的方法,即await()和countDown(),通过构造方法传入一个状态值,调用await()方法时线程会阻塞,直到状态码被修改成0时才会返回,每次调用countDown()时会将状态值减1。


wait方法:执行wait方法后,会尝试获取同步状态,如果为状态为0则方法继续执行,否择当前线程会被加入到同步队列中,详情可见笔者关于AQS的两篇文章。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 如果状态码不为0,尝试获取同步状态,如果失败则被加入到同步队列中
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// 当状态码为0时返回1,否择返回-1,这个方法中参数没有任何用处
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

countDown方法:每次执行countDown方法时,会将状态码的值减1.

public void countDown() {
    sync.releaseShared(1);
}


CyclicBarrier的设计与实现

CyclicBarrier与CountDownLatch实现思想相同,也是基于AQS框架实现。不同的是CyclicBarrier内部维护一个状态值借助基于AQS实现的锁ReentrantLock来实现状态值的同步更新,以及AQS除了同步状态之外的另一个核心概念条件队列来完成线程的阻塞。

parties: 和CountdownLatch中的状态值一样,用来记录每次要相互等待的线程数量,只有parties个线程同时到达屏障时,才会唤醒阻塞的线程。

count临时计数器: 由于CyclicBarrier是可以循环使用的,count可以理解为是一个临时变量,每一轮执行完毕或者被打断都会重置count为parties值。

Generation内部类: 只有一个属性 broken表示当前这一轮执行是否被中断,如果被中断后其他线程再执行await方法会抛出异常(目的是停止本轮线程未执行线程的继续执行)。

await方法: 当执行await方法时,会同步得对内部的count执行--count操作, 如果count = 0,则执行barrierCommand任务(通过构造方法传来的Runnable参数)。

reset方法:中断本轮执行,重置count值,唤醒等待的线程然后开始下一轮,此时本轮正在执行的线程调用await方法会抛出异常。

// await方法实际执行的代码
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    // 加锁,保证并发操作的一致性
    lock.lock();
    try {
        // 如果当前这一轮操作被中断,抛出中断异常(该异常只是起警示作用,没有任何其他信息)
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // 本轮执行的计数器 数值-1
        int index = --count;
        if (index == 0) {  // 计数器值=1, 本轮线程全部到达屏障,执行barrierCommand任务
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();// 唤醒所有等待在条件队列上的任务
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 如果状态不等于0,循环等待直到计数器值为0,本轮执行被打破,线程被中断,或者等待超时
        for (;;) {
            try {
                if (!timed)
                    // 状态码不为0,将当前线程加入到条件队列中,进入阻塞状态
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();// 唤醒所有条件队列中的线程,重置count的值
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

重置栅栏的状态

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}
/**
 * Sets current barrier generation as broken and wakes up everyone.
 * Called only while holding lock.
 */
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

当一轮执行完毕之后,既count=0后,CyclicBarrier的临时状态会重置为parties

/**
 * 进入下一轮
 * 唤醒所有等待线程,充值count
 */
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

总结

  1. CountDownLatch创建后只能使用一次,而CyclicBarrier可以循环使用,并且CyclicBarrier功能更完善。
  2. CountDownLatch内部的状态是基于AQS中的状态信息,而CyclicBarrier中的状态值是单独维护的,使用ReentrantLock加锁保证并发修改状态值的数据一致性。
  3. 它们的使用场景:允许一个或多个线程等待其他线程完成操作, 即当指定数量线程执行完某个操作再继续执行下一个操作。

笔者是95后java工程师,专注后端技术,小伙伴本可以关注wx公众号cruder,一起探讨学习和工作,一起进步!

相关推荐

如何屏蔽色情网站?_怎么能屏蔽网站

一、基础防御:全网DNS劫持阻断1.修改全网DNS服务器推荐DNS:安全DNS:CleanBrowsing(成人内容过滤):185.228.168.168/185.228.169.168Open...

容器、Pod、虚拟机与宿主机网络通信全解:看这一篇就够了

在日常开发与部署过程中,很多人一开始都会有这样的疑惑:容器之间是怎么通信的?容器怎么访问宿主机?宿主机又如何访问容器?Kubernetes中Pod的网络和Docker容器一样吗?容器跨机器是...

Win11专业版找不到共享打印机的问题

有很多深度官网的用户,都是在办公室上班的。而上班就需要使用打印机,但更新win11系统后,却出现同一个办公室里面的打印机都找不到的问题,这该如何处理呢?其实,可能是由于我们并没有打开共享打印机而造成的...

常用电脑快捷键大全,摆脱鼠标依赖,建议收藏

Ctrl+C复制Ctrl+X剪切Ctrl+V粘贴Ctrl+Z撤销Ctrl+Y重做Ctrl+B加粗Ctrl+A全选所有文件Ctrl+S保存Ctrl+N新建Ctrl+O打开Ctrl+E...

Win11实现自动追剧Jellyfin硬解,免NAS复杂操作

大家好,欢迎来到思赞数码。本期将详细介绍如何通过安装和配置Sonarr、Radarr、Prowlarr、qBittorrent和Jellyfin,打造一套自动化的影视管理系统。很多人认为,要实现自动追...

微软Win11安卓子系统WSA 2308.40000.3.0更新推送下载

IT之家9月21日消息,微软官方博客今日宣布,已面向所有WindowsInsider用户推送了Windows11安卓子系统的2308.40000.3.0版本更新。本次更新和之前...

路由器总掉线 一个命令就能猜出八九分

明明网络强度满格或有线图标正常,但视频卡成PPT、网页刷不开、游戏动不了,闲心这些问题很多小伙伴都碰到过。每次都要开关路由、宽带/光猫、插拔网线……一通忙。有没有啥办法能快速确定故障到底在哪儿,方便处...

windows电脑如何修改hosts文件?_windows怎么修改hosts

先来简单说下电脑host的作用hosts文件的作用:hosts文件是一个用于储存计算机网络中各节点信息的计算机文件;作用是将一些常用的网址域名与其对应的IP地址建立一个关联“数据库”,当用户在浏览器中...

win10广告弹窗ShellExperienceHost.exe

win10右下角老是弹出广告弹窗,排查为以下程序引起,但是这个是系统菜单的程序不能动:C:\Windows\SystemApps\ShellExperienceHost_cw5n1h2txyewy\S...

Win10 Mobile预览版10512/10166越狱解锁部署已被黑客攻破

看起来统一的WindowsPhone和Windows越加吸引人们的关注,特别是黑客们的好奇心。XDA论坛宣称,在Win10Mobile预览版10512/10166上,已取得越狱/解锁部署突破,比如可...

6款冷门小众软件,都是宝藏,建议收藏

真的很不错(。-ω-)zzzBearhttps://bear.app/cn/Bear是一个漂亮,灵活的Markdown的写作工具。它一样只支持苹果家的全平台。它一出现就惊艳四方,就被AppSto...

如何让不符合条件的设备升级Windows 11

如果你是最近(6月24日之后)加入WindowsInsider项目并且你的设备并不符合升级条件,那么当你在尝试升级Windows11的时候可能会看到以下错误:你的PC不符合Wi...

windows host文件怎么恢复?局域网访问全靠这些!

windowshost文件怎么恢复?windowshost文件是常用网址域名及其相应IP地址建立一个关联文件,通过这个host文件配置域名和IP的映射关系,以提高域名解析的速度,方便局域网用户使用...

Mac Hosts管理工具---SwitchHosts

switchhosts!formac是一款帮助用户快速切换hosts文件的工具,switchhosts!formac能够帮助你快速方便的打造个人专用的网络环境,支持本地和在线两种方式,并且支持...

「浅谈趣说网络知识」 第十二弹 老而不死的Hosts,它还很有用

【浅谈趣说网络知识】第十二弹老而不死的Hosts,它还很有用什么时候才觉得自己真的老了,不是35岁以上的数字,不是头上的点点白发,而是不知觉中的怀旧。风口上的IT界讲的就是"长江后浪推前浪...

取消回复欢迎 发表评论: