• Tags         
  • 2018-07-23  16:27:13        
  • 66 °C    

    CyclicBarrier用于使线程彼此等待。当不同的线程处理计算的一部分并且当所有线程都完成执行时,使用它,结果需要在父线程中组合。换句话说,当多个线程执行不同的子任务时,使用CyclicBarrier,并且需要组合这些子任务的输出以形成最终输出。完成执行后,线程调用await()方法并等待其他线程到达屏障。一旦所有线程到达,障碍就会为线程提供方法。

    CyclicBarrier的工作

    CyclicBarriers在java.util.concurrent包中定义。首先创建一个CyclicBarriers的新实例,指定屏障应该等待的线程数。

    CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads);

    每个线程都进行一些计算,在完成它的执行后,调用await()方法,如下所示:

    public void run()
    {
        //线程进行计算
        newBarrier.await();
    }
    

    Java中的Java.util.concurrent.CyclicBarrier
    一旦调用await()的线程数等于numberOfThreads,屏障就会为等待线程提供一种方法。CyclicBarrier也可以通过在所有线程到达屏障后执行的某些操作进行初始化。该动作可以组合/利用在屏障中等待的各个线程的计算结果。

    可行动作= ... 
    //当所有线程到达障碍时要执行的操作;
    CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads,action);

    CyclicBarrier的重要方法:

    1. getParties:返回跳过此障碍所需的参与方数量。
      句法:
    public int getParties()

    返回:
    跳过此障碍所需的参与方数量

    • reset:将屏障重置为其初始状态。
      句法:

     

    public void reset()

    返回:
    void但将屏障重置为其初始状态。如果任何一方当前正在屏障等待,他们将返回BrokenBarrierException。

    • isBroken:查询此屏障是否处于损坏状态。
      句法:

     

    public boolean isBroken()

    返回:
    如果一方或多方因构建或上次重置导致中断或超时,或由于异常而导致屏障操作失败,则返回true; 否则是假的。

    • getNumberWaiting:返回当前在屏障处等待的方数。
      句法:

     

    public int getNumberWaiting()

    返回:
    当前在await()中被阻止的聚会数量

    • await:等到所有各方都在这个障碍上等待。
      句法:

     

    public int await()抛出InterruptedException,BrokenBarrierException

    返回:
    当前线程的到达索引,其中index getParties() - 1表示第一个到达,零表示最后到达。

    • await:等待所有各方在此障碍上等待或等待指定的等待时间。
      句法:

     

    public int await(long timeout,TimeUnit unit) 
    抛出InterruptedException,
    BrokenBarrierException,TimeoutException

    返回:
    当前线程的到达索引,其中index getParties() - 1表示第一个到达,零表示最后到达

    //JAVA program to demonstrate execution on Cyclic Barrier
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    class Computation1 implements Runnable
    {
        public static int product = 0;
        public void run()
        {
            product = 2 * 3;
            try
            {
                Tester.newBarrier.await();
            }
            catch (InterruptedException | BrokenBarrierException e)
            {
                e.printStackTrace();
            }
        }
    }
    class Computation2 implements Runnable
    {
        public static int sum = 0;
        public void run()
        {
            // check if newBarrier is broken or not
            System.out.println("Is the barrier broken? - " + Tester.newBarrier.isBroken());
            sum = 10 + 20;
            try
            {
                Tester.newBarrier.await(3000, TimeUnit.MILLISECONDS);
            
                // number of parties waiting at the barrier
                System.out.println("Number of parties waiting at the barrier "+
                "at this point = " + Tester.newBarrier.getNumberWaiting());
            }
            catch (InterruptedException | BrokenBarrierException e)
            {
                e.printStackTrace();
            }
            catch (TimeoutException e)
            {
                e.printStackTrace();
            }
        }
    }
    public class Tester implements Runnable
    {
        public static CyclicBarrier newBarrier = new CyclicBarrier(3);
        
        public static void main(String[] args)
        {
            // parent thread
            Tester test = new Tester();
            
            Thread t1 = new Thread(test);
            t1.start();
        }
        public void run()
        {
            System.out.println("Number of parties required to trip the barrier = "+
            newBarrier.getParties());
            System.out.println("Sum of product and sum = " + (Computation1.product +
            Computation2.sum));
            
            // objects on which the child thread has to run
            Computation1 comp1 = new Computation1();
            Computation2 comp2 = new Computation2();
            
            // creation of child thread
            Thread t1 = new Thread(comp1);
            Thread t2 = new Thread(comp2);
            
            // moving child thread to runnable state
            t1.start();
            t2.start();
            try
            {
                Tester.newBarrier.await();
            }
            catch (InterruptedException | BrokenBarrierException e)
            {
                e.printStackTrace();
            }
            
            // barrier breaks as the number of thread waiting for the barrier
            // at this point = 3
            System.out.println("Sum of product and sum = " + (Computation1.product +
            Computation2.sum));
                    
            // Resetting the newBarrier
            newBarrier.reset();
            System.out.println("Barrier reset successful");
        }
    }
    

    输出:

    <Number of parties required to trip the barrier = 3
    Sum of product and sum = 0
    Is the barrier broken? - false
    Number of parties waiting at the barrier at this point = 0
    Sum of product and sum = 36
    Barrier reset successful

    说明:(sum + product)= 0的值打印在控制台上,因为子线程尚未运行以设置sum和product变量的值。在此之后,(sum + product)= 36将打印在控制台上,因为子线程运行时设置了sum和product的值。此外,屏障上等待线程的数量达到3,由此屏障然后允许所有线程通过并最终打印36。“此时在屏障处等待的参与方数量”的值= 0,因为所有三个线程都已调用await()方法,因此屏障不再处于活动状态。最后,newBarrier被重置并可以再次使用。

    BrokenBarrierException

    当任何等待的线离开障碍物时,障碍物会破裂。当一个或多个等待线程被中断或等待时间完成时,会发生这种情况,因为线程调用带有超时的await()方法,如下所示:

    newBarrier.await(1000,TimeUnit.MILLISECONDS);
    //线程调用此await() 
    //方法只等待1000毫秒。

    当障碍因一个或多个参与线程而中断时,所有其他线程的await()方法都会抛出BrokenThreadException。然而,已经在障碍中等待的线程已经终止了await()调用。

    CyclicBarrier和CountDownLatch之间的区别

    • CountDownLatch只能在程序中使用一次(直到它的计数达到0)。
    • 一旦释放障碍中的所有线程,就可以反复使用CyclicBarrier。

     
    转载请保留页面地址:https://www.breakyizhan.com/java/4977.html