聊聊并發(10)生產者消費者模式
來自: http://www.importnew.com/17592.html
本系列:
聊聊并發(2)Java SE1.6中的Synchronized
聊聊并發(6)ConcurrentLinkedQueue的實現原理分析
在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這種生產消費能力不均衡的問題,所以便有了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
這個阻塞隊列就是用來給生產者和消費者解耦的。縱觀大多數 設計模式 ,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學習一些設計模式的過程中,如果先找到這個模式的第三者,能幫助我們快速熟悉一個設計模式。
生產者消費者模式實戰
我和同事一起利用業余時間開發的Yuna工具中使用了生產者和消費者模式。首先我先介紹下Yuna工具,在阿里巴巴很多同事都喜歡通過郵件分享技術文章,因為通過郵件分享很方便,同學們在網上看到好的技術文章,復制粘貼發送就完成了分享,但是我們發現技術文章不能沉淀下來,對于新來的同學看不到以前分享的技術文章,大家也很難找到以前分享過的技術文章。為了解決這問題,我們開發了Yuna工具。Yuna取名自我非常喜歡的一款RPG游戲”最終幻想”中女主角的名字。
首先我們申請了一個專門用來收集分享郵件的郵箱,比如share@alibaba.com,同學將分享的文章發送到這個郵箱,讓同學們每次都抄送到這個郵箱肯定很麻煩,所以我們的做法是將這個郵箱地址放在部門郵件列表里,所以分享的同學只需要象以前一樣向整個部門分享文章就行,Yuna工具通過讀取郵件服務器里該郵箱的郵件,把所有分享的郵件下載下來,包括郵件的附件,圖片,和郵件回復,我們可能會從這個郵箱里下載到一些非分享的文章,所以我們要求分享的郵件標題必須帶有一個關鍵字,比如[內貿技術分享],下載完郵件之后,通過confluence的web service接口,把文章插入到confluence里,這樣新同事就可以在confluence里看以前分享過的文章,并且Yuna工具還可以自動把文章進行分類和歸檔。
為了快速上線該功能,當時我們花了三天業余時間快速開發了Yuna1.0版本。在1.0版本中我并沒有使用生產者消費模式,而是使用單線程來處理,因為當時只需要處理我們一個部門的郵件,所以單線程明顯夠用,整個過程是串行執行的。在一個線程里,程序先抽取全部的郵件,轉化為文章對象,然后添加全部的文章,最后刪除抽取過的郵件。代碼如下:
public void extract() {
logger.debug("開始" + getExtractorName() + "。。");
//抽取郵件
List<Article> articles = extractEmail();
//添加文章
for (Article article : articles) {
addArticleOrComment(article);
}
//清空郵件
cleanEmail();
logger.debug("完成" + getExtractorName() + "。。");
} Yuna工具在推廣后,越來越多的部門使用這個工具,處理的時間越來越慢,Yuna是每隔5分鐘進行一次抽取的,而當郵件多的時候一次處理可能就花了幾分鐘,于是我在Yuna2.0版本里使用了生產者消費者模式來處理郵件,首先生產者線程按一定的規則去郵件系統里抽取郵件,然后存放在阻塞隊列里,消費者從阻塞隊列里取出文章后插入到conflunce里。代碼如下:
public class QuickEmailToWikiExtractor extends AbstractExtractor {
private ThreadPoolExecutor threadsPool;
private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;
public QuickEmailToWikiExtractor() {
emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000));
}
public void extract() {
logger.debug("開始" + getExtractorName() + "。。");
long start = System.currentTimeMillis();
//抽取所有郵件放到隊列里
new ExtractEmailTask().start();
// 把隊列里的文章插入到Wiki
insertToWiki();
long end = System.currentTimeMillis();
double cost = (end - start) / 1000;
logger.debug("完成" + getExtractorName() + ",花費時間:" + cost + "秒");
}
/**
* 把隊列里的文章插入到Wiki
*/
private void insertToWiki() {
//登錄wiki,每間隔一段時間需要登錄一次
confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);
while (true) {
//2秒內取不到就退出
ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);
if (email == null) {
break;
}
threadsPool.submit(new insertToWikiTask(email));
}
}
protected List<Article> extractEmail() {
List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();
if (allEmails == null) {
return null;
}
for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {
emailQueue.offer(exchangeEmailShallowDTO);
}
return null;
}
/**
* 抽取郵件任務
*
* @author tengfei.fangtf
*/
public class ExtractEmailTask extends Thread {
public void run() {
extractEmail();
}
}
} 使用了生產者和消費者模式后,郵件的整體處理速度比以前要快了很多。
多生產者和多消費者場景
在多核時代,多線程并發處理速度比單線程處理速度更快,所以我們可以使用多個線程來生產數據,同樣可以使用多個消費線程來消費數據。而更復雜的情況是,消費者消費的數據,有可能需要繼續處理,于是消費者處理完數據之后,它又要作為生產者把數據放在新的隊列里,交給其他消費者繼續處理。如下圖:
我們在一個長連接服務器中使用了這種模式,生產者1負責將所有客戶端發送的消息存放在阻塞隊列1里,消費者1從隊列里讀消息,然后通過消息ID進行hash得到N個隊列中的一個,然后根據編號將消息存放在到不同的隊列里,每個阻塞隊列會分配一個線程來消費阻塞隊列里的數據。如果消費者2無法消費消息,就將消息再拋回到阻塞隊列1中,交給其他消費者處理。
以下是消息總隊列的代碼;
/**
* 總消息隊列管理
*
* @author tengfei.fangtf
*/
public class MsgQueueManager implements IMsgQueue{
private static final Logger LOGGER
= LoggerFactory.getLogger(MsgQueueManager.class);
/**
* 消息總隊列
*/
public final BlockingQueue<Message> messageQueue;
private MsgQueueManager() {
messageQueue = new LinkedTransferQueue<Message>();
}
public void put(Message msg) {
try {
messageQueue.put(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public Message take() {
try {
return messageQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
}
} 啟動一個消息分發線程。在這個線程里子隊列自動去總隊列里獲取消息。
/**
* 分發消息,負責把消息從大隊列塞到小隊列里
*
* @author tengfei.fangtf
*/
static class DispatchMessageTask implements Runnable {
@Override
public void run() {
BlockingQueue<Message> subQueue;
for (;;) {
//如果沒有數據,則阻塞在這里
Message msg = MsgQueueFactory.getMessageQueue().take();
//如果為空,則表示沒有Session機器連接上來,
需要等待,直到有Session機器連接上來
while ((subQueue = getInstance().getSubQueue()) == null) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
//把消息放到小隊列里
try {
subQueue.put(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
} 使用Hash算法獲取一個子隊列。
/**
* 均衡獲取一個子隊列。
*
* @return
*/
public BlockingQueue<Message> getSubQueue() {
int errorCount = 0;
for (;;) {
if (subMsgQueues.isEmpty()) {
return null;
}
int index = (int) (System.nanoTime() % subMsgQueues.size());
try {
return subMsgQueues.get(index);
} catch (Exception e) {
//出現錯誤表示,在獲取隊列大小之后,隊列進行了一次刪除操作
LOGGER.error("獲取子隊列出現錯誤", e);
if ((++errorCount) < 3) {
continue;
}
}
}
} 使用的時候我們只需要往總隊列里發消息。
//往消息隊列里添加一條消息
IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue();
Packet msg = Packet.createPacket(Packet64FrameType.
TYPE_DATA, "{}".getBytes(), (short) 1);
messageQueue.put(msg); 線程池與生產消費者模式
Java中的線程池類其實就是一種生產者和消費者模式的實現方式,但是我覺得其實現方式更加高明。生產者把任務丟給線程池,線程池創建線程并處理任務,如果將要運行的任務數大于線程池的基本線程數就把任務扔到阻塞隊列里,這種做法比只使用一個阻塞隊列來實現生產者和消費者模式顯然要高明很多,因為消費者能夠處理直接就處理掉了,這樣速度更快,而生產者先存,消費者再取這種方式顯然慢一些。
我們的系統也可以使用線程池來實現多生產者消費者模式。比如創建N個不同規模的Java線程池來處理不同性質的任務,比如線程池1將數據讀到內存之后,交給線程池2里的線程繼續處理壓縮數據。線程池1主要處理IO密集型任務,線程池2主要處理CPU密集型任務。
小結
本章講解了生產者消費者模式,并給出了實例。讀者可以在平時的工作中思考下哪些場景可以使用生產者消費者模式,我相信這種場景應該非常之多,特別是需要處理任務時間比較長的場景,比如上傳附件并處理,用戶把文件上傳到系統后,系統把文件丟到隊列里,然后立刻返回告訴用戶上傳成功,最后消費者再去隊列里取出文件處理。比如調用一個遠程接口查詢數據,如果遠程服務接口查詢時需要幾十秒的時間,那么它可以提供一個申請查詢的接口,這個接口把要申請查詢任務放數據庫中,然后該接口立刻返回。然后服務器端用線程輪詢并獲取申請任務進行處理,處理完之后發消息給調用方,讓調用方再來調用另外一個接口拿數據。