百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

如何理解 Java 并发编程中的 CyclicBarrier 和 ReentrantLock?

cac55 2024-09-19 17:04 22 浏览 0 评论


在 Java 并发编程中,如何有效地控制多个线程的协作与同步是一个重要的课题。本文将详细探讨两个关键工具类:CyclicBarrier 和 ReentrantLock。通过对其原理、源码和应用场景的深入分析,希望能帮助读者更好地理解和应用这些工具类。


CyclicBarrier 的应用与原理解析


CyclicBarrier 是 Java 并发编程中的一个重要工具类,用于协调多个线程在某个同步点共同等待,直到所有线程都到达同步点后才继续执行。它的主要用途是让一组线程在某个固定点上等待,直到所有线程都到达此点后再一起继续执行,类似于“栅栏”的功能。


基本用法


CyclicBarrier 的构造函数主要有两个:


  1. CyclicBarrier(int parties):创建一个新的 CyclicBarrier,其计数器的初始值为 parties。
  2. CyclicBarrier(int parties, Runnable barrierAction):创建一个新的 CyclicBarrier,其计数器的初始值为 parties,并且在所有线程都到达屏障时执行指定的 barrierAction 任务。


java


import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        // 创建一个CyclicBarrier,指定需要同步的线程数为3
        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
            // 所有线程到达屏障后执行的任务
            System.out.println("所有线程都到达屏障,继续执行...");
        });

        // 创建并启动三个线程
        for (int i = 0; i < 3; i++) {
            new Thread(new Worker(barrier)).start();
        }
    }
}

class Worker implements Runnable {
    private CyclicBarrier barrier;

    public Worker(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " 正在执行");
            Thread.sleep(1000);  // 模拟任务执行
            System.out.println(Thread.currentThread().getName() + " 到达屏障");
            barrier.await();  // 等待其他线程到达屏障
            System.out.println(Thread.currentThread().getName() + " 继续执行");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}



原理解析


CyclicBarrier 的内部实现基于 ReentrantLock 和 Condition,并通过一个 count 变量作为计数器来记录当前到达屏障的线程数。当计数器的值降为 0 时,表示所有线程都已到达屏障,此时屏障打开,所有线程继续执行。


构造方法


java


public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;  // 初始化拦截线程数
    this.count = parties;    // 初始化计数器
    this.barrierCommand = barrierAction;  // 初始化屏障任务
}



await 方法


当线程调用 await() 方法时,实际上调用的是 dowait(false, 0L) 方法。await() 方法会将当前线程阻塞,直到所有线程都到达屏障。


java


public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // 不会发生
    }
}



dowait 方法


dowait 方法是 CyclicBarrier 的核心逻辑所在,负责实现线程的等待与屏障的打开。


java


private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();  // 获取锁
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;  // 递减计数器
        if (index == 0) {  // 当计数器为0,表示最后一个线程到达屏障
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();  // 执行屏障任务
                ranAction = true;
                nextGeneration();  // 开启下一轮
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 继续等待
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();  // 释放锁
    }
}



实际应用场景


  1. 多线程任务的阶段性同步:在一些复杂的多线程任务中,需要各个线程在某些阶段进行同步,然后再继续下一阶段的执行。
  2. 并行计算的结果合并:多个线程进行并行计算,计算完成后需要将结果汇总,此时可以使用 CyclicBarrier 来等待所有计算线程完成后再进行汇总。


ReentrantLock 的概述与深入解析


ReentrantLock 是 Java 并发包(java.util.concurrent)中的一个重要类,用于实现显式锁(Explicit Lock)。它提供了比 synchronized 关键字更灵活和更强大的锁机制。在实际开发中,ReentrantLock 常用于替代 synchronized 关键字,以实现更复杂的同步需求。


基本用法


ReentrantLock 提供了显式的锁和解锁操作,需要手动进行锁的获取和释放。下面是一个简单的示例,展示了 ReentrantLock 的基本用法:


java


import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void performTask() {
        lock.lock();  // 显式获取锁
        try {
            // 临界区代码
            System.out.println(Thread.currentThread().getName() + " 正在执行任务");
        } finally {
            lock.unlock();  // 显式释放锁
        }
    }

    public static void main(String[] args) {
        ReentrantLockExample example = new ReentrantLockExample();
        for (int i = 0; i < 5; i++) {
            new Thread(example::performTask).start();
        }
    }
}



主要功能


  1. 可重入性:ReentrantLock 是可重入的,这意味着一个线程可以多次获取同一个锁而不会被自己阻塞,类似于 synchronized。每次调用 lock() 方法都会增加锁的重入次数,调用 unlock() 方法会减少锁的重入次数,直到重入次数为 0 时才真正释放锁。
  2. 公平锁和非公平锁:ReentrantLock 可以选择公平锁和非公平锁。公平锁(Fair Lock)按照线程请求锁的顺序来分配锁,而非公平锁(Non-Fair Lock)可能会让某些线程“插队”。默认情况下,ReentrantLock 是非公平锁,但可以通过构造函数指定为公平锁。
  3. 条件变量(Condition):ReentrantLock 提供了条件变量(Condition)来实现等待/通知机制,比 Object 的 wait 和 notify 更加灵活和强大。


可重入性示例


java


import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockReentrantExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void outer() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 进入外层方法");
            inner();
        } finally {
            lock.unlock();
        }
    }

    public void inner() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 进入内层方法");
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ReentrantLockReentrantExample example = new ReentrantLockReentrantExample();
        example.outer();
    }
}



公平锁和非公平锁


java


import java.util.concurrent.locks.ReentrantLock;

public class FairLockExample {
    private final ReentrantLock fairLock = new ReentrantLock(true);  // 公平锁
    private final ReentrantLock nonFairLock = new ReentrantLock(false);  // 非公平锁

    public void performTaskWithFairLock() {
        fairLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 获取了公平锁");
        } finally {
            fairLock.unlock();
        }
    }

    public void performTaskWithNonFairLock() {
        nonFairLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 获取了非公平锁");
        } finally {
            nonFairLock.unlock();
        }
    }

    public static void main(String[] args) {
        FairLockExample example = new FairLockExample();
        for (int i = 0; i < 5; i++) {
            new Thread(example::performTaskWithFairLock).start();
            new Thread(example::performTaskWithNonFairLock).start();
        }
    }
}



源码解析


ReentrantLock 的实现基于 AbstractQueuedSynchronizer(AQS),这是一个用于构建锁和同步器的框架。下面我们通过源码来深入解析 ReentrantLock 的实现。


构造函数


java


public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}



ReentrantLock 通过 sync 变量来持有锁的具体实现,sync 可以是 FairSync(公平锁)或 NonfairSync(非公平锁)。


锁的获取


java


public void lock() {
    sync.lock();
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    abstract void lock();

    // 非公平锁的获取
    static final class NonfairSync extends Sync {
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    // 公平锁的获取
    static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() && compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}



在 NonfairSync 中,lock() 方法首先尝试通过 CAS 操作直接获取锁,如果失败则调用 acquire(1),这是 AQS 提供的模板方法,用于获取独占锁。在 FairSync 中,直接调用 acquire(1) 来获取锁。


锁的释放


java


public void unlock() {
    sync.release(1);
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}



unlock() 方法调用 sync.release(1) 来释放锁,release(1) 是 AQS 提供的模板方法,用于释放独占锁。在 tryRelease 方法中,通过减小锁的重入次数来释放锁,当重入次数为 0 时,真正释放锁。


具体应用场景


  1. 高度竞争的共享资源访问:在多个线程频繁竞争同一资源的情况下,使用 ReentrantLock 可以提供更高效的锁机制,特别是当需要实现公平性时。


java


import java.util.concurrent.locks.ReentrantLock;

public class SharedResource {
    private final ReentrantLock lock = new ReentrantLock(true);  // 公平锁

    public void accessResource() {
        lock.lock();
        try {
            // 资源访问操作
            System.out.println(Thread.currentThread().getName() + " 正在访问资源");
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        SharedResource resource = new SharedResource();
        for (int i = 0; i < 10; i++) {
            new Thread(resource::accessResource).start();
        }
    }
}



  1. 实现读写锁:虽然 Java 提供了专门的 ReadWriteLock 接口和 ReentrantReadWriteLock 实现,但我们也可以通过 ReentrantLock 自己实现一个简单的读写锁。


java


import java.util.concurrent.locks.ReentrantLock;

public class SimpleReadWriteLock {
    private final ReentrantLock lock = new ReentrantLock();
    private int readers = 0;

    public void readLock() {
        lock.lock();
        try {
            readers++;
        } finally {
            lock.unlock();
        }
    }

    public void readUnlock() {
        lock.lock();
        try {
            readers--;
            if (readers == 0) {
                lock.notifyAll();
            }
        } finally {
            lock.unlock();
        }
    }

    public void writeLock() {
        lock.lock();
        try {
            while (readers > 0) {
                lock.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void writeUnlock() {
        lock.unlock();
    }
}



  1. 中断响应锁:ReentrantLock 提供了可中断的锁获取方法 lockInterruptibly(),允许线程在等待锁的过程中响应中断。


java


import java.util.concurrent.locks.ReentrantLock;

public class InterruptibleLockExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void performTask() {
        try {
            lock.lockInterruptibly();  // 可中断的锁获取
            try {
                // 临界区代码
                System.out.println(Thread.currentThread().getName() + " 正在执行任务");
                Thread.sleep(2000);  // 模拟长时间操作
            } finally {
                lock.unlock();
            }
        }
catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " 被中断");
        }
    }

    public static void main(String[] args) {
        InterruptibleLockExample example = new InterruptibleLockExample();
        Thread t1 = new Thread(example::performTask);
        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(1000);
                t1.interrupt();  // 中断线程t1
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        t1.start();
        t2.start();
    }
}



在这个示例中,performTask 方法使用 lockInterruptibly() 获取锁,这允许线程在等待锁的过程中响应中断。当线程 t2 中断线程 t1 时,线程 t1 能够响应中断并退出锁的等待。


  1. 超时锁获取:ReentrantLock 提供了带超时的锁获取方法 tryLock(long timeout, TimeUnit unit),允许线程在指定时间内尝试获取锁,如果超时则返回 false。


java


import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class TimeoutLockExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void performTask() {
        try {
            if (lock.tryLock(2, TimeUnit.SECONDS)) {  // 带超时的锁获取
                try {
                    // 临界区代码
                    System.out.println(Thread.currentThread().getName() + " 获取到锁并执行任务");
                } finally {
                    lock.unlock();
                }
            } else {
                System.out.println(Thread.currentThread().getName() + " 获取锁超时");
            }
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " 被中断");
        }
    }

    public static void main(String[] args) {
        TimeoutLockExample example = new TimeoutLockExample();
        Thread t1 = new Thread(example::performTask);
        Thread t2 = new Thread(example::performTask);

        t1.start();
        t2.start();
    }
}



在这个示例中,performTask 方法使用 tryLock 方法尝试在 2 秒内获取锁,如果超时则打印超时信息。这避免了线程长时间等待锁的情况。


  1. 实现线程间的条件等待:ReentrantLock 提供了 Condition 对象,用于实现线程间的条件等待和通知机制。Condition 类似于 Object 的 wait 和 notify 方法,但提供了更灵活的功能,如指定多个条件变量。


java


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean ready = false;

    public void awaitTask() {
        lock.lock();
        try {
            while (!ready) {
                System.out.println(Thread.currentThread().getName() + " 等待条件");
                condition.await();  // 等待条件满足
            }
            System.out.println(Thread.currentThread().getName() + " 条件满足,继续执行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void signalTask() {
        lock.lock();
        try {
            ready = true;
            condition.signalAll();  // 通知所有等待的线程
            System.out.println(Thread.currentThread().getName() + " 通知所有等待的线程");
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ConditionExample example = new ConditionExample();
        Thread t1 = new Thread(example::awaitTask);
        Thread t2 = new Thread(example::awaitTask);
        Thread t3 = new Thread(example::signalTask);

        t1.start();
        t2.start();
        try {
            Thread.sleep(1000);  // 确保t1和t2进入等待状态
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t3.start();
    }
}



在这个示例中,awaitTask 方法使用 condition.await() 进行条件等待,signalTask 方法使用 condition.signalAll() 通知所有等待的线程。这展示了 ReentrantLock 如何通过条件变量实现线程间的条件等待和通知机制。


总结


CyclicBarrier 和 ReentrantLock 是 Java 并发编程中非常重要的工具类。CyclicBarrier 主要用于多个线程在某个固定点上进行同步,常用于并行计算结果的汇总和阶段性同步。ReentrantLock 提供了比 synchronized 更灵活和强大的锁机制,适用于各种复杂的同步需求,如高度竞争的共享资源访问、实现读写锁、中断响应锁、超时锁获取以及条件变量等功能。


理解这些工具类的内部原理和使用方法,有助于我们在实际开发中更好地进行并发编程,避免一些常见的并发问题。希望通过本文的讲解,能让您对 CyclicBarrier 和 ReentrantLock 有一个更深入的理解,并在实际开发中灵活应用。

相关推荐

无力吐槽的自动续费(你被自动续费困扰过吗?)

今天因为工作需要,需要在百度文库上下载一篇文章。没办法,确实需要也有必要,只能老老实实的按要求买了个VIP。过去在百度文库上有过类似经历,当时为了写论文买了一个月的VIP,后面也没有太注意,直到第二个...

百度文库推出“文源计划”创作者可一键认领文档

11月7日,百度文库发布了旨在保护创作者权益的“文源计划”。所谓“文源计划”,即为每一篇文档找到源头,让创作者享受更多的权益。据百度文库总经理李小婉介绍,文源计划分为三部分,分别是版权认证、版权扶持和...

有开放大学学号的同学,百度文库高校版可以用了。

还在网上找百度文库的下载方式,只要从身边的朋友在读开放大学的,那他(她)的学号就可以登陆到国家开放大学图书馆,还使用百度文库高校版来下载。与百度文库稍有不同,但足够使用了。现转国图链接如下:htt...

搜索资源方法推荐(搜索资源的方法)

今天msgbox就要教大家如何又快又准的搜到各类资源,第一点,排除干扰百度搜索出来啊经常前排展示它的产品以及百度文库,如何去除呢?很简单,后面输入空格减号百度文库,比如你搜高等数学百度文库很多,只要后...

一行代码搞定百度文库VIP功能(2021百度文库vip账号密码共享)

百度文库作为大家常用查资料找文档的平台,大多数文档我们都可以直接在百度文库找到,然而百度文库也有让人头痛的时候。好不容易找到一篇合适的文档,当你准备复制的时候他却提示你需要开通VIP才能复制~~~下载...

百度文库文档批量上传工具用户说明书

百度文库文档批量上传工具用户说明书1、软件主要功能1、批量上传文档到百度文库,支持上传到收费、VIP专享、优享以及共享。2、支持自动分类和自动获取标签3、支持多用户切换,一个账户传满可以切换到...

百度文库现在都看不到文档是否上传成功,要凉了吗?

打开知识店铺,百度文库文档里显示都是下载这一按键,上传的文档也看不到是否成功?咋情况,要取消了吗?没通过审核的也不让你删除,是几个意思,想通吃吗?现在百度上传文档也很费劲,有时弄了半天的资料上传审核过...

微信推广引流108式:利用百度文库长期分享软文引流

百度文库相对于百度知道、百度百科来说,操作上没那么多条条框框,规则上也相对好把握些。做一条百度知道所花费的精力一般都会比做一条百度文库的要多些,老马个人操作下来觉得百度文库更好把握。但见仁见智吧,今天...

职场“避雷”指南 百度文库推出标准化劳动合同范本

轰轰烈烈的毕业季结束了,众多应届生在经过了“职场海选”后,已正式成为职场生力军的一员。这一阶段,除了熟悉业务,签订劳动合同、了解职场福利也迅速被提上日程。而随着国人法律意识的增强,百度文库内《劳动合同...

《百度文库》:素材精选宝库(百度文库官网首页)

《百度文库》:独特功能助力选择高质量素材在当今信息爆炸的时代,如何高效地获取并利用有价值的素材成为了许多人面临的挑战。而《百度文库》作为百度公司推出的一款在线文档分享平台,凭借其丰富的资源、强大的功能...

深度整合和开放AI能力 百度文库和网盘推出内容操作系统「沧舟OS」

【TechWeb】4月25日消息,Create2025百度AI开发者大会上,百度文库和百度网盘推出全球首个内容操作系统——沧舟OS。基于沧舟OS,百度文库APP全新上线「GenFlow超能搭子」...

女子发现大二作业被百度文库要求付费下载,律师:平台侵权,应赔偿

近日,28岁的黎女士在百度百科搜索家乡的小地名时,发现了自己在大二完成的课题作业。她继续搜索,发现多个平台收录了该文,比如豆丁网和文档之家等,有的还设置了付费或积分下载。2月15日,九派新闻记者以用户...

2016杀入百度文库的新捷径,只有少数人才知道的喔

百度的产品在SEO优化中的分量真不用多说,其实很多人都像我一样一直在找捷径。但是我经常发现很多人都是在用死方法。比如发贴吧发帖而不知道去申请一个吧主,知道自问自答而不知道去申请一个合作资格。口碑和贴吧...

百度文库付费文档搜索方法(百度文库付费文档搜索方法有哪些)

一直以来,百度文库中无论是个人中心还是个人主页,都没有像淘宝一样的店内搜索功能,连最近新开的知识店铺也没有设计店内搜索功能,这无论是对上传用户还是下载用户都不方便,上传用户想要搜索自己的文档无法办到...

供读者免费使用!泰达图书馆机构版百度文库新年上新啦

在泰达图书馆读者使用百度文库数字资源不需要VIP,免-费-用!惊不惊喜?快来了解一下吧……新年伊始,为满足区域企业、高校、科研院所以及居民群众在教学、科研及学习过程中,对各类文献资源的需求,泰达图书馆...

取消回复欢迎 发表评论: