百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

最常用的CountDownLatch, CyclicBarrier你知道多少?(必会知识)

cac55 2024-09-19 17:03 35 浏览 0 评论

CountdownLatch,CyclicBarrier是非常常用并发工具类,可以说是Java工程师必会技能了。不但在项目实战中经常涉及,而且在编写压测程序,多线程demo也是必不可少,所以掌握它们的用法和实现原理非常有必要。

等待多线程完成的CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。也就是说通过使用CountDownLatch工具类,可以让一组线程等待彼此执行完毕后在共同执行下一个操作。具体流程如下图所示,箭头表示任务,矩形表示栅栏,当三个任务都到达栅栏时,栅栏后wait的任务才开始执行。

CountDownLatch维护有个int型的状态码,每次调用countDown时状态值就会减1;调用wait方法的线程会阻塞,直到状态码为0时才会继续执行。

在多线程协同工作时,可能需要等待其他线程执行完毕之后,主线程才接着往下执行。首先我们可能会想到使用线程的join方法(调用join方法的线程优先执行,该线程执行完毕后才会执行其他线程),显然这是可以完成的。

使用Thread.join()方法实现


public class RunningRaceTest {
    public static void main(String[] args) throws InterruptedException {
        Thread runner1 = new Thread(new Runner(), "1号");
        Thread runner2 = new Thread(new Runner(), "2号");
        Thread runner3 = new Thread(new Runner(), "3号");
        Thread runner4 = new Thread(new Runner(), "4号");
        Thread runner5 = new Thread(new Runner(), "5号");
        runner1.start();
        runner2.start();
        runner3.start();
        runner4.start();
        runner5.start();

        runner1.join();
        runner2.join();
        runner3.join();
        runner4.join();
        runner5.join();

        // 裁判等待5名选手准备完毕
        System.out.println("裁判:比赛开始~~");
    }
}

class Runner implements Runnable {
    @Override
    public void run() {
        try {
            int sleepMills = ThreadLocalRandom.current().nextInt(1000);
            Thread.sleep(sleepMills);
            System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Thread.join()完全可以实现这个需求,不过存在一个问题,如果调用join的线程一直存活,则当前线程则需要一直等待。这显然不够灵活,并且当前线程可能会出现死等的情况。

更加灵活的CountDownLatch

jdk1.5之后的并发包中提供了CountDownLatch并发工具了,也可以实现join的功能,并且功能更加强大。

// 参赛选手线程
class Runner implements Runnable {
    private CountDownLatch countdownLatch;
    
    public Runner(CountDownLatch countdownLatch) {
        this.countdownLatch = countdownLatch;
    }

    @Override
    public void run() {
        try {
            int sleepMills = ThreadLocalRandom.current().nextInt(1000);
            Thread.sleep(sleepMills);
            System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 准备完毕,举手示意
            countdownLatch.countDown();
        }
    }
}

public class RunningRaceTest {
    public static void main(String[] args) throws InterruptedException {
        // 使用线程池的正确姿势
        int size = 5;
        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy());
        
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < size; i++) {
            threadPoolExecutor.submit(new Runner(countDownLatch));
        }

        // 裁判等待5名选手准备完毕
        countDownLatch.await(); // 为了避免死等,也可以添加超时时间
        System.out.println("裁判:比赛开始~~");

        threadPoolExecutor.shutdownNow();
    }
}

输出结果:

5 号  选手已就位, 准备共用时: 20ms
4 号  选手已就位, 准备共用时: 156ms
1 号  选手已就位, 准备共用时: 288ms
2 号  选手已就位, 准备共用时: 519ms
3 号  选手已就位, 准备共用时: 945ms
比赛开始~~


同步屏障CyclicBarrier

CyclicBarrier可以实现CountDownLatch一样的功能,不同的是CountDownLatch属于一次性对象,声明后只能使用一次,而CyclicBarrier可以循环使用

从字面意义上来看,CyclicBarrier表示循环的屏障,当一组线程全部都到达屏障时,屏障才会被移除,否则只能阻塞在屏障处。

public class RunningRace {
    public static void main(String[] args) {
        // 使用线程池的正确姿势
        int size = 5;
        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy());

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比赛开始~~"));
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.submit(new Runner(cyclicBarrier));
        }
    }
}

class Runner implements Runnable {
    private CyclicBarrier cyclicBarrier;

    public Runner(CyclicBarrier countdownLatch) {
        this.cyclicBarrier = countdownLatch;
    }

    @Override
    public void run() {
        try {
            int sleepMills = ThreadLocalRandom.current().nextInt(1000);
            Thread.sleep(sleepMills);
            System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

由于CyclicBarrier可以循环使用,所以CyclicBarrier的构造方法中可以传入一个Runnable参数,在每一轮执行完毕之后就会立刻执行这个Runnable任务

CountDownLatch设计与实现

CountDownLath是基于AQS框架的一种简单实现,有两个核心的方法,即await()和countDown(),通过构造方法传入一个状态值,调用await()方法时线程会阻塞,直到状态码被修改成0时才会返回,每次调用countDown()时会将状态值减1。


wait方法:执行wait方法后,会尝试获取同步状态,如果为状态为0则方法继续执行,否择当前线程会被加入到同步队列中,详情可见笔者关于AQS的两篇文章。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 如果状态码不为0,尝试获取同步状态,如果失败则被加入到同步队列中
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// 当状态码为0时返回1,否择返回-1,这个方法中参数没有任何用处
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

countDown方法:每次执行countDown方法时,会将状态码的值减1.

public void countDown() {
    sync.releaseShared(1);
}


CyclicBarrier的设计与实现

CyclicBarrier与CountDownLatch实现思想相同,也是基于AQS框架实现。不同的是CyclicBarrier内部维护一个状态值借助基于AQS实现的锁ReentrantLock来实现状态值的同步更新,以及AQS除了同步状态之外的另一个核心概念条件队列来完成线程的阻塞。

parties: 和CountdownLatch中的状态值一样,用来记录每次要相互等待的线程数量,只有parties个线程同时到达屏障时,才会唤醒阻塞的线程。

count临时计数器: 由于CyclicBarrier是可以循环使用的,count可以理解为是一个临时变量,每一轮执行完毕或者被打断都会重置count为parties值。

Generation内部类: 只有一个属性 broken表示当前这一轮执行是否被中断,如果被中断后其他线程再执行await方法会抛出异常(目的是停止本轮线程未执行线程的继续执行)。

await方法: 当执行await方法时,会同步得对内部的count执行--count操作, 如果count = 0,则执行barrierCommand任务(通过构造方法传来的Runnable参数)。

reset方法:中断本轮执行,重置count值,唤醒等待的线程然后开始下一轮,此时本轮正在执行的线程调用await方法会抛出异常。

// await方法实际执行的代码
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    // 加锁,保证并发操作的一致性
    lock.lock();
    try {
        // 如果当前这一轮操作被中断,抛出中断异常(该异常只是起警示作用,没有任何其他信息)
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // 本轮执行的计数器 数值-1
        int index = --count;
        if (index == 0) {  // 计数器值=1, 本轮线程全部到达屏障,执行barrierCommand任务
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();// 唤醒所有等待在条件队列上的任务
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 如果状态不等于0,循环等待直到计数器值为0,本轮执行被打破,线程被中断,或者等待超时
        for (;;) {
            try {
                if (!timed)
                    // 状态码不为0,将当前线程加入到条件队列中,进入阻塞状态
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();// 唤醒所有条件队列中的线程,重置count的值
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

重置栅栏的状态

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}
/**
 * Sets current barrier generation as broken and wakes up everyone.
 * Called only while holding lock.
 */
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

当一轮执行完毕之后,既count=0后,CyclicBarrier的临时状态会重置为parties

/**
 * 进入下一轮
 * 唤醒所有等待线程,充值count
 */
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

总结

  1. CountDownLatch创建后只能使用一次,而CyclicBarrier可以循环使用,并且CyclicBarrier功能更完善。
  2. CountDownLatch内部的状态是基于AQS中的状态信息,而CyclicBarrier中的状态值是单独维护的,使用ReentrantLock加锁保证并发修改状态值的数据一致性。
  3. 它们的使用场景:允许一个或多个线程等待其他线程完成操作, 即当指定数量线程执行完某个操作再继续执行下一个操作。

笔者是95后java工程师,专注后端技术,小伙伴本可以关注wx公众号cruder,一起探讨学习和工作,一起进步!

相关推荐

无力吐槽的自动续费(你被自动续费困扰过吗?)

今天因为工作需要,需要在百度文库上下载一篇文章。没办法,确实需要也有必要,只能老老实实的按要求买了个VIP。过去在百度文库上有过类似经历,当时为了写论文买了一个月的VIP,后面也没有太注意,直到第二个...

百度文库推出“文源计划”创作者可一键认领文档

11月7日,百度文库发布了旨在保护创作者权益的“文源计划”。所谓“文源计划”,即为每一篇文档找到源头,让创作者享受更多的权益。据百度文库总经理李小婉介绍,文源计划分为三部分,分别是版权认证、版权扶持和...

有开放大学学号的同学,百度文库高校版可以用了。

还在网上找百度文库的下载方式,只要从身边的朋友在读开放大学的,那他(她)的学号就可以登陆到国家开放大学图书馆,还使用百度文库高校版来下载。与百度文库稍有不同,但足够使用了。现转国图链接如下:htt...

搜索资源方法推荐(搜索资源的方法)

今天msgbox就要教大家如何又快又准的搜到各类资源,第一点,排除干扰百度搜索出来啊经常前排展示它的产品以及百度文库,如何去除呢?很简单,后面输入空格减号百度文库,比如你搜高等数学百度文库很多,只要后...

一行代码搞定百度文库VIP功能(2021百度文库vip账号密码共享)

百度文库作为大家常用查资料找文档的平台,大多数文档我们都可以直接在百度文库找到,然而百度文库也有让人头痛的时候。好不容易找到一篇合适的文档,当你准备复制的时候他却提示你需要开通VIP才能复制~~~下载...

百度文库文档批量上传工具用户说明书

百度文库文档批量上传工具用户说明书1、软件主要功能1、批量上传文档到百度文库,支持上传到收费、VIP专享、优享以及共享。2、支持自动分类和自动获取标签3、支持多用户切换,一个账户传满可以切换到...

百度文库现在都看不到文档是否上传成功,要凉了吗?

打开知识店铺,百度文库文档里显示都是下载这一按键,上传的文档也看不到是否成功?咋情况,要取消了吗?没通过审核的也不让你删除,是几个意思,想通吃吗?现在百度上传文档也很费劲,有时弄了半天的资料上传审核过...

微信推广引流108式:利用百度文库长期分享软文引流

百度文库相对于百度知道、百度百科来说,操作上没那么多条条框框,规则上也相对好把握些。做一条百度知道所花费的精力一般都会比做一条百度文库的要多些,老马个人操作下来觉得百度文库更好把握。但见仁见智吧,今天...

职场“避雷”指南 百度文库推出标准化劳动合同范本

轰轰烈烈的毕业季结束了,众多应届生在经过了“职场海选”后,已正式成为职场生力军的一员。这一阶段,除了熟悉业务,签订劳动合同、了解职场福利也迅速被提上日程。而随着国人法律意识的增强,百度文库内《劳动合同...

《百度文库》:素材精选宝库(百度文库官网首页)

《百度文库》:独特功能助力选择高质量素材在当今信息爆炸的时代,如何高效地获取并利用有价值的素材成为了许多人面临的挑战。而《百度文库》作为百度公司推出的一款在线文档分享平台,凭借其丰富的资源、强大的功能...

深度整合和开放AI能力 百度文库和网盘推出内容操作系统「沧舟OS」

【TechWeb】4月25日消息,Create2025百度AI开发者大会上,百度文库和百度网盘推出全球首个内容操作系统——沧舟OS。基于沧舟OS,百度文库APP全新上线「GenFlow超能搭子」...

女子发现大二作业被百度文库要求付费下载,律师:平台侵权,应赔偿

近日,28岁的黎女士在百度百科搜索家乡的小地名时,发现了自己在大二完成的课题作业。她继续搜索,发现多个平台收录了该文,比如豆丁网和文档之家等,有的还设置了付费或积分下载。2月15日,九派新闻记者以用户...

2016杀入百度文库的新捷径,只有少数人才知道的喔

百度的产品在SEO优化中的分量真不用多说,其实很多人都像我一样一直在找捷径。但是我经常发现很多人都是在用死方法。比如发贴吧发帖而不知道去申请一个吧主,知道自问自答而不知道去申请一个合作资格。口碑和贴吧...

百度文库付费文档搜索方法(百度文库付费文档搜索方法有哪些)

一直以来,百度文库中无论是个人中心还是个人主页,都没有像淘宝一样的店内搜索功能,连最近新开的知识店铺也没有设计店内搜索功能,这无论是对上传用户还是下载用户都不方便,上传用户想要搜索自己的文档无法办到...

供读者免费使用!泰达图书馆机构版百度文库新年上新啦

在泰达图书馆读者使用百度文库数字资源不需要VIP,免-费-用!惊不惊喜?快来了解一下吧……新年伊始,为满足区域企业、高校、科研院所以及居民群众在教学、科研及学习过程中,对各类文献资源的需求,泰达图书馆...

取消回复欢迎 发表评论: