Java 7 Phaser 有什么新特點

openkk 12年前發布 | 33K 次閱讀 Java Java開發

1 Overview

Java 7的并發包中推出了Phaser,其功能跟CyclicBarrier和CountDownLatch有些重疊,但是提供了更靈活的用法,例如支持動態調整注冊任務的數量等。本文在Phaser自帶的示例代碼基礎上進行一下簡單的分析。

2 Glossary

2.1 Registration

Phaser支持通過register()和bulkRegister(int parties)方法來動態調整注冊任務的數量,此外也支持通過其構造函數進行指定初始數量。在適當的時機,Phaser支持減少注冊任務的數量,例如arriveAndDeregister()。單個Phaser實例允許的注冊任務數的上限是65535。


2.2 Arrival

正如Phaser類的名字所暗示,每個Phaser實例都會維護一個phase number,初始值為0。每當所有注冊的任務都到達Phaser時,phase number累加,并在超過Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于記錄到 達,arriveAndAwaitAdvance()方法用于記錄到達,并且等待其它未到達的任務。


2.3 Termination

Phaser支持終止。Phaser終止之后,調用register()和bulkRegister(int parties)方法沒有任何效果,arriveAndAwaitAdvance()方法也會立即返回。觸發終止的時機是在protected boolean onAdvance(int phase, int registeredParties)方法返回時,如果該方法返回true,那么Phaser會被終止。默認實現是在注冊任務數為0時返回true(即return registeredParties == 0;)。此外,forceTermination()方法用于強制終止,isTerminated()方法用于判斷是否已經終止。


2.4 Tiering

Phaser支持層次結構,即通過構造函數Phaser(Phaser parent)和Phaser(Phaser parent, int parties)構造一個樹形結構。這有助于減輕因在單個的Phaser上注冊過多的任務而導致的競爭,從而提升吞吐量,代價是增加單個操作的開銷。

3 Sample Usage

3.1 Sample 1

在有些場景下,我們希望控制多個線程的啟動時機:例如在并發相關的單元測試中,有時需要控制線程的啟動時機,以期獲得最大程度的并發,通常我們會使用CountDownLatch,以下是使用Phaser的版本。

import java.util.concurrent.Phaser;

public class PhaserTest1 {

    public static void main(String args[]) {
        //
        final int count = 5;
        final Phaser phaser = new Phaser(count);
        for(int i = 0; i < count; i++) {
            System.out.println("starting thread, id: " + i);
            final Thread thread = new Thread(new Task(i, phaser));
            thread.start();
        }
    }

    public static class Task implements Runnable {
        //
        private final int id;
        private final Phaser phaser;

        public Task(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;

        }

        @Override
        public void run() {
            phaser.arriveAndAwaitAdvance();
            System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
        }
    }
}

以上例子中,由于線程是在一個循環中start,因此start的時機有一定的間隔。本例中這些線程實際開始工作的時機是在所有的線程都調用了phaser.arriveAndAwaitAdvance()之后。

此外,如果留心arriveAndAwaitAdvance()方法的簽名,會發現它并沒有拋出InterruptedException,實際上,即使 當前線程被中斷,arriveAndAwaitAdvance()方法也不會返回,而是繼續等待。如果在等待時希望可中斷,或者可超時,那么需要使用以下 方法:

awaitAdvance(arrive())  // 等效于arriveAndAwaitAdvance()
awaitAdvanceInterruptibly(int phase)
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)

3.2 Sample 2

有些時候我們希望只有在某些外部條件滿足時,才真正開始任務的執行,例如:

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.Phaser;

public class PhaserTest2 {

    public static void main(String args[]) throws Exception {
        //
        final Phaser phaser = new Phaser(1);
        for(int i = 0; i < 5; i++) {
            phaser.register();
            System.out.println("starting thread, id: " + i);
            final Thread thread = new Thread(new Task(i, phaser));
            thread.start();
        }

        //
        System.out.println("Press ENTER to continue");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        reader.readLine();
        phaser.arriveAndDeregister();
    }

    public static class Task implements Runnable {
        //
        private final int id;
        private final Phaser phaser;

        public Task(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            phaser.arriveAndAwaitAdvance();
            System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
        }
    }
}

以上例子中,只有當用戶按下回車之后,任務才真正開始執行。需要注意的是,arriveAndDeregister()方法不會被阻塞,并且返回到達時的phase number(arrive方法也是如此)。

3.3 Sample 3

CyclicBarrier支持barrier action, Phaser同樣也支持。不同之處是Phaser的barrier action需要改寫onAdvance方法來進行定制。

import java.util.concurrent.Phaser;

public class PhaserTest3 {

    public static void main(String args[]) throws Exception {
        //
        final int count = 5;
        final int phaseToTerminate = 3;
        final Phaser phaser = new Phaser(count) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("====== " + phase + " ======");
                return phase >= phaseToTerminate || registeredParties == 0;
            }
        };

        //
        for(int i = 0; i < count; i++) {
            System.out.println("starting thread, id: " + i);
            final Thread thread = new Thread(new Task(i, phaser));
            thread.start();
        }
    }

    public static class Task implements Runnable {
        //
        private final int id;
        private final Phaser phaser;

        public Task(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            do {
                try {
                    Thread.sleep(500);
                } catch(InterruptedException e) {
                    // NOP
                }
                System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
                phaser.arriveAndAwaitAdvance();
            } while(!phaser.isTerminated());
        }
    }
}

本例中的barrier action只是簡單地打印了一條信息,此外在超過指定的迭代次數后終止了Phaser。


3.4 Sample 4

在Smaple 3的例子中,主線程在其它工作線程結束之前已經終止。如果希望主線程等待這些工作線程結束,除了使用Thread.join()之外,也可以嘗試以下的方式:

import java.util.concurrent.Phaser;

public class PhaserTest4 {

    public static void main(String args[]) throws Exception {
        //
        final int count = 5;
        final int phaseToTerminate = 3;
        final Phaser phaser = new Phaser(count) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("====== " + phase + " ======");
                return phase == phaseToTerminate || registeredParties == 0;
            }
        };

        //
        for(int i = 0; i < count; i++) {
            System.out.println("starting thread, id: " + i);
            final Thread thread = new Thread(new Task(i, phaser));
            thread.start();
        }

        //
        phaser.register();
        while (!phaser.isTerminated()) {
            phaser.arriveAndAwaitAdvance();
        }
        System.out.println("done");
    }

    public static class Task implements Runnable {
        //
        private final int id;
        private final Phaser phaser;

        public Task(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            while(!phaser.isTerminated()) {
                try {
                    Thread.sleep(500);
                } catch(InterruptedException e) {
                    // NOP
                }
                System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
                phaser.arriveAndAwaitAdvance();
            }
        }
    }
}

如果希望主線程在特定的phase結束之后終止,那么可以在主線程中調用下述方法:
public static void awaitPhase(Phaser phaser, int phase) {
    int p = phaser.register(); // assumes caller not already registered
    while (p < phase) {
        if (phaser.isTerminated()) {
            break; // ... deal with unexpected termination
        } else {
            p = phaser.arriveAndAwaitAdvance();
        }
    }
    phaser.arriveAndDeregister();
}

需要注意的是,awaitPhase方法中的if (phaser.isTerminated()) 分支里需要能夠正確處理Phaser終止的情況。否則由于在Phaser終止之后, phaser.register()和arriveAndAwaitAdvance()方法均返回負值,那么上述方法可能陷入死循環。


3.5 Sample 5

以下對Phaser進行分層的例子:

import java.util.concurrent.Phaser;

public class PhaserTest6 {
    //
    private static final int TASKS_PER_PHASER = 4;

    public static void main(String args[]) throws Exception {
        //
        final int phaseToTerminate = 3;
        final Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("====== " + phase + " ======");
                return phase == phaseToTerminate || registeredParties == 0;
            }
        };

        //
        final Task tasks[] = new Task[10];
        build(tasks, 0, tasks.length, phaser);
        for (int i = 0; i < tasks.length; i++) {
            System.out.println("starting thread, id: " + i);
            final Thread thread = new Thread(tasks[i]);
            thread.start();
        }
    }

    public static void build(Task[] tasks, int lo, int hi, Phaser ph) {
        if (hi - lo > TASKS_PER_PHASER) {
            for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
                int j = Math.min(i + TASKS_PER_PHASER, hi);
                build(tasks, i, j, new Phaser(ph));
            }
        } else {
            for (int i = lo; i < hi; ++i)
                tasks[i] = new Task(i, ph);
        }
    }

    public static class Task implements Runnable {
        //
        private final int id;
        private final Phaser phaser;

        public Task(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;
            this.phaser.register();
        }

        @Override
        public void run() {
            while (!phaser.isTerminated()) {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    // NOP
                }
                System.out.println("in Task.run(), phase: " + phaser.getPhase()    + ", id: " + this.id);
                phaser.arriveAndAwaitAdvance();
            }
        }
    }
}

需要注意的是,TASKS_PER_PHASER的值取決于具體的Task實現。對于Task執行時間很短的場景(也就是競爭相對激烈),可以考慮使用較小的TASKS_PER_PHASER值,例如4。反之可以適當增大TASKS_PER_PHASER。

原文出處:http://whitesock.iteye.com/blog/1135457

 本文由用戶 openkk 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!