Java多線程編程模式實戰指南一:Active Object模式(上)

seanpp 8年前發布 | 14K 次閱讀 Java開發

轉載自:http://www.infoq.com/cn/articles/Java-multithreaded-programming-mode-active-object-part1


Active Object模式簡介

Active Object模式是一種異步編程模式。它通過對方法的調用與方法的執行進行解耦來提高并發性。若以任務的概念來說,Active Object模式的核心則是它允許任務的提交(相當于對異步方法的調用)和任務的執行(相當于異步方法的真正執行)分離。這有點類似于System.gc()這個方法:客戶端代碼調用完gc()后,一個進行垃圾回收的任務被提交,但此時JVM并不一定進行了垃圾回收,而可能是在gc()方法調用返回后的某段時間才開始執行任務——回收垃圾。我們知道,System.gc()的調用方代碼是運行在自己的線程上(通常是main線程派生的子線程),而JVM的垃圾回收這個動作則由專門的線程(垃圾回收線程)來執行的。換言之,System.gc()這個方法所代表的動作(其所定義的功能)的調用方和執行方是運行在不同的線程中的,從而提高了并發性。

再進一步介紹Active Object模式,我們可先簡單地將其核心理解為一個名為ActiveObject的類,該類對外暴露了一些異步方法,如圖1所示。

圖 1. ActiveObject對象示例

doSomething方法的調用方和執行方運行在各自的線程上。在并發的環境下,doSomething方法會被多個線程調用。這時所需的線程安全控制封裝在doSomething方法背后,使得調用方代碼無需關心這點,從而簡化了調用方代碼:從調用方代碼來看,調用一個Active Object對象的方法與調用普通Java對象的方法并無太大差別。如清單1所示。

清單 1. Active Object方法調用示例

</div>
ActiveObject ao=...;
Future future = ao.doSomething("data");
//執行其它操作
String result = future.get();
System.out.println(result);

Active Object模式的架構

當Active Object模式對外暴露的異步方法被調用時,與該方法調用相關的上下文信息,包括被調用的異步方法名(或其代表的操作)、調用方代碼所傳遞的參數等,會被封裝成一個對象。該對象被稱為方法請求(Method Request)。方法請求對象會被存入Active Object模式所維護的緩沖區(Activation Queue)中,并由專門的工作線程負責根據其包含的上下文信息執行相應的操作。也就是說,方法請求對象是由運行調用方代碼的線程通過調用Active Object模式對外暴露的異步方法生成的,而方法請求所代表的操作則由專門的線程來執行,從而實現了方法的調用與執行的分離,產生了并發。

Active Object模式的主要參與者有以下幾種。其類圖如圖2所示。

圖 2. Active Object模式的類圖

(點擊圖像放大)

  • Proxy:負責對外暴露異步方法接口。當調用方代碼調用該參與者實例的異步方法doSomething時,該方法會生成一個相應的MethodRequest實例并將其存儲到Scheduler所維護的緩沖區中。doSomething方法的返回值是一個表示其執行結果的外包裝對象:Future參與者的實例。異步方法doSomething運行在調用方代碼所在的線程中。
  • MethodRequest:負責將調用方代碼對Proxy實例的異步方法的調用封裝為一個對象。該對象保留了異步方法的名稱及調用方代碼傳遞的參數等上下文信息。它使得將Proxy的異步方法的調用和執行分離成為可能。其call方法會根據其所包含上下文信息調用Servant實例的相應方法。
  • ActivationQueue:負責臨時存儲由Proxy的異步方法被調用時所創建的MethodRequest實例的緩沖區。
  • Scheduler:負責將Proxy的異步方法所創建的MethodRequest實例存入其維護的緩沖區中。并根據一定的調度策略,對其維護的緩沖區中的MethodRequest實例進行執行。其調度策略可以根據實際需要來定,如FIFO、LIFO和根據MethodRequest中包含的信息所定的優先級等。
  • Servant:負責對Proxy所暴露的異步方法的具體實現。
  • Future:負責存儲和返回Active Object異步方法的執行結果。

Active Object模式的序列圖如圖3所示。

圖 3. Active Object模式的序列圖

(點擊圖像放大)

第1步:調用方代碼調用Proxy的異步方法doSomething。

第2~7步:doSomething方法創建Future實例作為該方法的返回值。并將調用方代碼對該方法的調用封裝為MethodRequest對象。然后以所創建的MethodRequest對象作為參數調用Scheduler的enqueue方法,以將MethodRequest對象存入緩沖區。Scheduler的enqueue方法會調用Scheduler所維護的ActivationQueue實例的enqueue方法,將MethodRequest對象存入緩沖區。

第8步:doSomething返回其所創建的Future實例。

第9步:Scheduler實例采用專門的工作線程運行dispatch方法。

第10~12步:dispatch方法調用ActivationQueue實例的dequeue方法,獲取一個MethodRequest對象。然后調用MethodRequest對象的call方法

第13~16步:MethodRequest對象的call方法調用與其關聯的Servant實例的相應方法doSomething。并將Servant.doSomething方法的返回值設置到Future實例上。

第17步:MethodRequest對象的call方法返回。

上述步驟中,第1~8步是運行在Active Object的調用者線程中的,這幾個步驟實現了將調用方代碼對Active Object所提供的異步方法的調用封裝成對象(Method Request),并將其存入緩沖區。這幾個步驟實現了任務的提交。第9~17步是運行在Active Object的工作線程中,這些步驟實現從緩沖區中讀取Method Request,并對其進行執行,實現了任務的執行。從而實現了Active Object對外暴露的異步方法的調用與執行的分離。

如果調用方代碼關心Active Object的異步方法的返回值,則可以在其需要時,調用Future實例的get方法來獲得異步方法的真正執行結果。

Active Object模式實戰案例

某電信軟件有一個彩信短號模塊。其主要功能是實現手機用戶給其它手機用戶發送彩信時,接收方號碼可以填寫為對方的短號。例如,用戶13612345678給其同事13787654321發送彩信時,可以將接收方號碼填寫為對方的短號,如776,而非其真實的號碼。

該模塊處理其接收到的下發彩信請求的一個關鍵操作是查詢數據庫以獲得接收方短號對應的真實號碼(長號)。該操作可能因為數據庫故障而失敗,從而使整個請求無法繼續被處理。而數據庫故障是可恢復的故障,因此在短號轉換為長號的過程中如果出現數據庫異常,可以先將整個下發彩信請求消息緩存到磁盤中,等到數據庫恢復后,再從磁盤中讀取請求消息,進行重試。為方便起見,我們可以通過Java的對象序列化API,將表示下發彩信的對象序列化到磁盤文件中從而實現請求緩存。下面我們討論這個請求緩存操作還需要考慮的其它因素,以及Active Object模式如何幫助我們滿足這些考慮。

首先,請求消息緩存到磁盤中涉及文件I/O這種慢的操作,我們不希望它在請求處理的主線程(即Web服務器的工作線程)中執行。因為這樣會使該模塊的響應延時增大,降低系統的響應性。并使得Web服務器的工作線程因等待文件I/O而降低了系統的吞吐量。這時,異步處理就派上用場了。Active Object模式可以幫助我們實現請求緩存這個任務的提交和執行分離:任務的提交是在Web服務器的工作線程中完成,而任務的執行(包括序列化對象到磁盤文件中等操作)則是在Active Object工作線程中執行。這樣,請求處理的主線程在偵測到短號轉長號失敗時即可以觸發對當前彩信下發請求進行緩存,接著繼續其請求處理,如給客戶端響應。而此時,當前請求消息可能正在被Active Object線程緩存到文件中。如圖4所示。

圖 4 .異步實現緩存

其次,每個短號轉長號失敗的彩信下發請求消息會被緩存為一個磁盤文件。但我們不希望這些緩存文件被存在同一個子目錄下。而是希望多個緩存文件會被存儲到多個子目錄中。每個子目錄最多可以存儲指定個數(如2000個)的緩存文件。若當前子目錄已存滿,則新建一個子目錄存放新的緩存文件,直到該子目錄也存滿,依此類推。當這些子目錄的個數到達指定數量(如100個)時,最老的子目錄(連同其下的緩存文件,如果有的話)會被刪除。從而保證子目錄的個數也是固定的。顯然,在并發環境下,實現這種控制需要一些并發訪問控制(如通過鎖來控制),但是我們不希望這種控制暴露給處理請求的其它代碼。而Active Object模式中的Proxy參與者可以幫助我們封裝并發訪問控制。

下面,我們看該案例的相關代碼通過應用Active Object模式在實現緩存功能時滿足上述兩個目標。首先看請求處理的入口類。該類就是本案例的Active Object模式的客調用方代碼。如清單2所示。

清單 2. 彩信下發請求處理的入口類

</div>
public class MMSDeliveryServlet extends HttpServlet {

private static final long serialVersionUID = 5886933373599895099L;

@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp)
        throws ServletException, IOException {
    //將請求中的數據解析為內部對象
    MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream());
    Recipient shortNumberRecipient = mmsDeliverReq.getRecipient();
    Recipient originalNumberRecipient = null;

    try {
        // 將接收方短號轉換為長號
        originalNumberRecipient = convertShortNumber(shortNumberRecipient);
    } catch (SQLException e) {

        // 接收方短號轉換為長號時發生數據庫異常,觸發請求消息的緩存
        AsyncRequestPersistence.getInstance().store(mmsDeliverReq);

        // 繼續對當前請求的其它處理,如給客戶端響應
        resp.setStatus(202);
    }

}

private MMSDeliverRequest parseRequest(InputStream reqInputStream) {
    MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest();
    //省略其它代碼
    return mmsDeliverReq;
}

private Recipient convertShortNumber(Recipient shortNumberRecipient)
        throws SQLException {
    Recipient recipent = null;
    //省略其它代碼
    return recipent;
}

}</pre>

清單2中的doPost方法在偵測到短號轉換過程中發生的數據庫異常后,通過調用AsyncRequestPersistence類的store方法觸發對彩信下發請求消息的緩存。這里,AsyncRequestPersistence類相當于Active Object模式中的Proxy參與者。盡管本案例涉及的是一個并發環境,但從清單2中的代碼可見,AsyncRequestPersistence類的調用方代碼無需處理多線程同步問題。這是因為多線程同步問題被封裝在AsyncRequestPersistence類之后。

AsyncRequestPersistence類的代碼如清單3所示。

清單 3. 彩信下發請求緩存入口類(Active Object模式的Proxy)

</div>
// ActiveObjectPattern.Proxy
public class AsyncRequestPersistence implements RequestPersistence {
    private static final long ONE_MINUTE_IN_SECONDS = 60;
    private final Logger logger;
    private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0);
    private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0);

// ActiveObjectPattern.Servant
private final DiskbasedRequestPersistence 
                    delegate = new DiskbasedRequestPersistence();
// ActiveObjectPattern.Scheduler
private final ThreadPoolExecutor scheduler;

private static class InstanceHolder {
    final static RequestPersistence INSTANCE = new AsyncRequestPersistence();
}

private AsyncRequestPersistence() {
    logger = Logger.getLogger(AsyncRequestPersistence.class);
    scheduler = new ThreadPoolExecutor(1, 3, 
            60 * ONE_MINUTE_IN_SECONDS,
            TimeUnit.SECONDS,
            // ActiveObjectPattern.ActivationQueue
            new LinkedBlockingQueue(200), 
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t;
                    t = new Thread(r, "AsyncRequestPersistence");
                    return t;
                }

            });

    scheduler.setRejectedExecutionHandler(
            new ThreadPoolExecutor.DiscardOldestPolicy());

    // 啟動隊列監控定時任務
    Timer monitorTimer = new Timer(true);
    monitorTimer.scheduleAtFixedRate(
        new TimerTask() {

        @Override
        public void run() {
            if (logger.isInfoEnabled()) {

                logger.info("task count:" 
                        + requestSubmittedPerIterval
                        + ",Queue size:" 
                        + scheduler.getQueue().size()
                        + ",taskTimeConsumedPerInterval:"
                        + taskTimeConsumedPerInterval.get() 
                        + " ms");
            }

            taskTimeConsumedPerInterval.set(0);
            requestSubmittedPerIterval.set(0);

        }
    }, 0, ONE_MINUTE_IN_SECONDS * 1000);
}

public static RequestPersistence getInstance() {
    return InstanceHolder.INSTANCE;
}

@Override
public void store(final MMSDeliverRequest request) {
    /*
     * 將對store方法的調用封裝成MethodRequest對象, 并存入緩沖區。
     */
    // ActiveObjectPattern.MethodRequest
    Callable methodRequest = new Callable() {
        @Override
        public Boolean call() throws Exception {
            long start = System.currentTimeMillis();
            try {
                delegate.store(request);
            } finally {
                taskTimeConsumedPerInterval.addAndGet(
                        System.currentTimeMillis() - start);
            }

            return Boolean.TRUE;
        }

    };
    scheduler.submit(methodRequest);

    requestSubmittedPerIterval.incrementAndGet();
}

}</pre>

AsyncRequestPersistence類所實現的接口RequestPersistence定義了Active Object對外暴露的異步方法:store方法。由于本案例不關心請求緩存的結果,故該方法沒有返回值。其代碼如清單4所示。

清單 4. RequestPersistence接口源碼

</div>
public interface RequestPersistence {

 void store(MMSDeliverRequest request);

}</pre>

AsyncRequestPersistence類的實例變量scheduler相當于Active Object模式中的Scheduler參與者實例。這里我們直接使用了JDK1.5引入的Executor Framework中的ThreadPoolExecutor。在ThreadPoolExecutor類的實例化時,其構造器的第5個參數(BlockingQueue<Runnable> workQueue)我們指定了一個有界阻塞隊列:new LinkedBlockingQueue<Runnable>(200)。該隊列相當于Active Object模式中的ActivationQueue參與者實例。

AsyncRequestPersistence類的實例變量delegate相當于Active Object模式中的Servant參與者實例。

AsyncRequestPersistence類的store方法利用匿名類生成一個java.util.concurrent.Callable實例methodRequest。該實例相當于Active Object模式中的MethodRequest參與者實例。利用閉包(Closure),該實例封裝了對store方法調用的上下文信息(包括調用參數、所調用的方法對應的操作信息)。AsyncRequestPersistence類的store方法通過調用scheduler的submit方法,將methodRequest送入ThreadPoolExecutor所維護的緩沖區(阻塞隊列)中。確切地說,ThreadPoolExecutor是Scheduler參與者的一個“近似”實現。ThreadPoolExecutor的submit方法相對于Scheduler的enqueue方法,該方法用于接納MethodRequest對象,以將其存入緩沖區。當ThreadPoolExecutor當前使用的線程數量小于其核心線程數量時,submit方法所接收的任務會直接被新建的線程執行。當ThreadPoolExecutor當前使用的線程數量大于其核心線程數時,submit方法所接收的任務才會被存入其維護的阻塞隊列中。不過,ThreadPoolExecutor的這種任務處理機制,并不妨礙我們將它用作Scheduler的實現。

methodRequest的call方法會調用delegate的store方法來真正實現請求緩存功能。delegate實例對應的類DiskbasedRequestPersistence是請求消息緩存功能的真正實現者。其代碼如清單5所示。

清單 5. DiskbasedRequestPersistence類的源碼

</div>
public class DiskbasedRequestPersistence implements RequestPersistence {
    // 負責緩存文件的存儲管理
    private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage();
    private final Logger logger = Logger
                                     .getLogger(DiskbasedRequestPersistence.class);

@Override
public void store(MMSDeliverRequest request) {
    // 申請緩存文件的文件名
    String[] fileNameParts = storage.apply4Filename(request);
    File file = new File(fileNameParts[0]);
    try {
        ObjectOutputStream objOut = new ObjectOutputStream(
        new FileOutputStream(file));
        try {
            objOut.writeObject(request);
        } finally {
            objOut.close();
        }
    } catch (FileNotFoundException e) {
        storage.decrementSectionFileCount(fileNameParts[1]);
        logger.error("Failed to store request", e);
    } catch (IOException e) {
        storage.decrementSectionFileCount(fileNameParts[1]);
        logger.error("Failed to store request", e);
    }

}

class SectionBasedDiskStorage {
    private Deque sectionNames = new LinkedList();
    /*
     * Key->value: 存儲子目錄名->子目錄下緩存文件計數器
     */
    private Map sectionFileCountMap 
                    = new HashMap();
    private int maxFilesPerSection = 2000;
    private int maxSectionCount = 100;
    private String storageBaseDir = System.getProperty("user.dir") + "/V*N";

    private final Object sectionLock = new Object();

    public String[] apply4Filename(MMSDeliverRequest request) {
        String sectionName;
        int iFileCount;
        boolean need2RemoveSection = false;
        String[] fileName = new String[2];
        synchronized (sectionLock) {
            //獲取當前的存儲子目錄名
            sectionName = this.getSectionName();
            AtomicInteger fileCount;
            fileCount = sectionFileCountMap.get(sectionName);
            iFileCount = fileCount.get();
            //當前存儲子目錄已滿
            if (iFileCount >= maxFilesPerSection) {
                if (sectionNames.size() >= maxSectionCount) {
                    need2RemoveSection = true;
                }
                //創建新的存儲子目錄
                sectionName = this.makeNewSectionDir();
                fileCount = sectionFileCountMap.get(sectionName);

            }
            iFileCount = fileCount.addAndGet(1);

        }

        fileName[0] = storageBaseDir + "/" + sectionName + "/"
            + new DecimalFormat("0000").format(iFileCount) + "-"
            + request.getTimeStamp().getTime() / 1000 + "-" 
                           + request.getExpiry()
            + ".rq";
        fileName[1] = sectionName;

        if (need2RemoveSection) {
            //刪除最老的存儲子目錄
            String oldestSectionName = sectionNames.removeFirst();
            this.removeSection(oldestSectionName);
        }

        return fileName;
    }

    public void decrementSectionFileCount(String sectionName) {
        AtomicInteger fileCount = sectionFileCountMap.get(sectionName);
        if (null != fileCount) {
            fileCount.decrementAndGet();
        }
    }

    private boolean removeSection(String sectionName) {
        boolean result = true;
        File dir = new File(storageBaseDir + "/" + sectionName);
        for (File file : dir.listFiles()) {
            result = result && file.delete();
        }
        result = result && dir.delete();
        return result;
    }

    private String getSectionName() {
        String sectionName;

        if (sectionNames.isEmpty()) {
            sectionName = this.makeNewSectionDir();

        } else {
            sectionName = sectionNames.getLast();
        }

        return sectionName;
    }

    private String makeNewSectionDir() {
        String sectionName;
        SimpleDateFormat sdf = new SimpleDateFormat("MMddHHmmss");
        sectionName = sdf.format(new Date());
        File dir = new File(storageBaseDir + "/" + sectionName);
        if (dir.mkdir()) {
            sectionNames.addLast(sectionName);
            sectionFileCountMap.put(sectionName, new AtomicInteger(0));
        } else {
            throw new RuntimeException(
                             "Cannot create section dir " + sectionName);
        }

        return sectionName;
    }
}

}</pre>

methodRequest的call方法的調用者代碼是運行在ThreadPoolExecutor所維護的工作者線程中,這就保證了store方法的調用方和真正的執行方是分別運行在不同的線程中:服務器工作線程負責觸發請求消息緩存,ThreadPoolExecutor所維護的工作線程負責將請求消息序列化到磁盤文件中。

DiskbasedRequestPersistence類的store方法中調用的SectionBasedDiskStorage類的apply4Filename方法包含了一些多線程同步控制代碼(見清單5)。這部分控制由于是封裝在DiskbasedRequestPersistence的內部類中,對于該類之外的代碼是不可見的。因此,AsyncRequestPersistence的調用方代碼無法知道該細節,這體現了Active Object模式對并發訪問控制的封裝。

小結

本篇介紹了Active Object模式的意圖及架構,并以一個實際的案例展示了該模式的代碼實現。下篇將對Active Object模式進行評價,并結合本文案例介紹實際運用Active Object模式時需要注意的一些事項。


 本文由用戶 seanpp 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!