CyclicBarrier用于使线程彼此等待。当不同的线程处理计算的一部分并且当所有线程都完成执行时,使用它,结果需要在父线程中组合。换句话说,当多个线程执行不同的子任务时,使用CyclicBarrier,并且需要组合这些子任务的输出以形成最终输出。完成执行后,线程调用await()方法并等待其他线程到达屏障。一旦所有线程到达,障碍就会为线程提供方法。

CyclicBarrier的工作

CyclicBarriers在java.util.concurrent包中定义。首先创建一个CyclicBarriers的新实例,指定屏障应该等待的线程数。

CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads);

每个线程都进行一些计算,在完成它的执行后,调用await()方法,如下所示:

public void run()
{
    //线程进行计算
    newBarrier.await();
}

Java中的Java.util.concurrent.CyclicBarrier 一旦调用await()的线程数等于numberOfThreads,屏障就会为等待线程提供一种方法。CyclicBarrier也可以通过在所有线程到达屏障后执行的某些操作进行初始化。该动作可以组合/利用在屏障中等待的各个线程的计算结果。

可行动作= ... 
//当所有线程到达障碍时要执行的操作;
CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads,action);

CyclicBarrier的重要方法:

  1. getParties:返回跳过此障碍所需的参与方数量。 句法:
public int getParties()

返回: 跳过此障碍所需的参与方数量

  • reset:将屏障重置为其初始状态。 句法:

 

public void reset()

返回: void但将屏障重置为其初始状态。如果任何一方当前正在屏障等待,他们将返回BrokenBarrierException。

  • isBroken:查询此屏障是否处于损坏状态。 句法:

 

public boolean isBroken()

返回: 如果一方或多方因构建或上次重置导致中断或超时,或由于异常而导致屏障操作失败,则返回true; 否则是假的。

  • getNumberWaiting:返回当前在屏障处等待的方数。 句法:

 

public int getNumberWaiting()

返回: 当前在await()中被阻止的聚会数量

  • await:等到所有各方都在这个障碍上等待。 句法:

 

public int await()抛出InterruptedException,BrokenBarrierException

返回: 当前线程的到达索引,其中index getParties() - 1表示第一个到达,零表示最后到达。

  • await:等待所有各方在此障碍上等待或等待指定的等待时间。 句法:

 

public int await(long timeout,TimeUnit unit) 
抛出InterruptedException,
BrokenBarrierException,TimeoutException

返回: 当前线程的到达索引,其中index getParties() - 1表示第一个到达,零表示最后到达

//JAVA program to demonstrate execution on Cyclic Barrier
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Computation1 implements Runnable
{
    public static int product = 0;
    public void run()
    {
        product = 2 * 3;
        try
        {
            Tester.newBarrier.await();
        }
        catch (InterruptedException | BrokenBarrierException e)
        {
            e.printStackTrace();
        }
    }
}
class Computation2 implements Runnable
{
    public static int sum = 0;
    public void run()
    {
        // check if newBarrier is broken or not
        System.out.println("Is the barrier broken? - " + Tester.newBarrier.isBroken());
        sum = 10 + 20;
        try
        {
            Tester.newBarrier.await(3000, TimeUnit.MILLISECONDS);
        
            // number of parties waiting at the barrier
            System.out.println("Number of parties waiting at the barrier "+
            "at this point = " + Tester.newBarrier.getNumberWaiting());
        }
        catch (InterruptedException | BrokenBarrierException e)
        {
            e.printStackTrace();
        }
        catch (TimeoutException e)
        {
            e.printStackTrace();
        }
    }
}
public class Tester implements Runnable
{
    public static CyclicBarrier newBarrier = new CyclicBarrier(3);
    
    public static void main(String[] args)
    {
        // parent thread
        Tester test = new Tester();
        
        Thread t1 = new Thread(test);
        t1.start();
    }
    public void run()
    {
        System.out.println("Number of parties required to trip the barrier = "+
        newBarrier.getParties());
        System.out.println("Sum of product and sum = " + (Computation1.product +
        Computation2.sum));
        
        // objects on which the child thread has to run
        Computation1 comp1 = new Computation1();
        Computation2 comp2 = new Computation2();
        
        // creation of child thread
        Thread t1 = new Thread(comp1);
        Thread t2 = new Thread(comp2);
        
        // moving child thread to runnable state
        t1.start();
        t2.start();
        try
        {
            Tester.newBarrier.await();
        }
        catch (InterruptedException | BrokenBarrierException e)
        {
            e.printStackTrace();
        }
        
        // barrier breaks as the number of thread waiting for the barrier
        // at this point = 3
        System.out.println("Sum of product and sum = " + (Computation1.product +
        Computation2.sum));
                
        // Resetting the newBarrier
        newBarrier.reset();
        System.out.println("Barrier reset successful");
    }
}

输出:

<Number of parties required to trip the barrier = 3
Sum of product and sum = 0
Is the barrier broken? - false
Number of parties waiting at the barrier at this point = 0
Sum of product and sum = 36
Barrier reset successful

说明:(sum + product)= 0的值打印在控制台上,因为子线程尚未运行以设置sum和product变量的值。在此之后,(sum + product)= 36将打印在控制台上,因为子线程运行时设置了sum和product的值。此外,屏障上等待线程的数量达到3,由此屏障然后允许所有线程通过并最终打印36。“此时在屏障处等待的参与方数量”的值= 0,因为所有三个线程都已调用await()方法,因此屏障不再处于活动状态。最后,newBarrier被重置并可以再次使用。

BrokenBarrierException

当任何等待的线离开障碍物时,障碍物会破裂。当一个或多个等待线程被中断或等待时间完成时,会发生这种情况,因为线程调用带有超时的await()方法,如下所示:

newBarrier.await(1000,TimeUnit.MILLISECONDS);
//线程调用此await() 
//方法只等待1000毫秒。

当障碍因一个或多个参与线程而中断时,所有其他线程的await()方法都会抛出BrokenThreadException。然而,已经在障碍中等待的线程已经终止了await()调用。

CyclicBarrier和CountDownLatch之间的区别

  • CountDownLatch只能在程序中使用一次(直到它的计数达到0)。
  • 一旦释放障碍中的所有线程,就可以反复使用CyclicBarrier。

 
转载请保留页面地址:https://www.breakyizhan.com/java/4977.html
扫描二维码添加微信 
  • ,每次淘宝领取淘宝优惠券,淘宝购物更优惠。现在添加微信,还可以领取机械键盘50元优惠券!添加微信后回复机械键盘即可领取!
    支持我们,就用微信淘宝!