如何手寫一個輕量級消息隊列?
寫在前面
最近因為項目需要,自己寫了個單生產者-多消費者的消息隊列模型。多線程真的不是等閑之輩能玩兒的,我花了兩個小時進行設計與編碼,卻花了兩天的時間調試與運行。在這里,我把我遇到的坑與大家分享。
需求的由來
一開始我需要實現一個記錄用戶操作日志的功能,目的是給商家用戶提供客戶行為分析的能力。要記錄的信息包括客戶的訪問時間、IP、在網站上所做的操作等。其中,客戶的地域信息是個重要的分析項,所以必須要把IP轉化成省市縣。那么究竟何時完成這個轉化的動作呢?有兩種方案:
1. 在用戶進行數據分析 時 完成轉化
2. 在用戶進行數據分析 前 完成轉化
第一種方案顯然不靠譜,因為需要轉化的IP數量很大,而且轉化采用第三方接口,因此整個轉化過程將持續很長很長很長……的時間。
而在分析前就把轉化過程完成掉,這樣當用戶需要分析的時候就可以減少這部分時間的開銷,提高了響應速度。因此第二種方案顯然比較合理。
那么隨之而來的問題是:究竟在數據分析前的哪個時機進行轉化?
這個問題又有兩種方案:
1. 在記錄日志的時候就立即完成IP向省市縣的轉換;
2. 每天半夜服務器統一把當天的IP轉化成省市縣;
這兩種方案應該來說各有千秋。
第一種方案比較消耗服務器資源,因為IP向省市縣轉化需要向第三方接口發送GET請求,因此需要消耗一定的出口帶寬和內存資源,在服務器資源一定的前提下,分給用戶訪問的資源就會被減少,從而可能會影響請求響應速度。但這個問題可以用錢來解決,只要花錢砸服務器就行了;而第二種方案在服務器空閑的時候進行轉化雖然節約了服務器資源,但這也導致了商家的分析結果會有一天的滯后,影響用戶體驗。
于是,這個問題就變成了老板的錢重要還是用戶體驗重要。因此我毫不猶豫地選擇了第一種方案。
初步設計
我使用Servlet Filter攔截用戶的所有請求,并在Filter中獲取用戶的各項信息(其中包括IP),然后再請求第三方接口,完成IP向省市縣的轉化,最后將這些信息入庫。
這個流程很顯然有重大缺陷:請求響應時間將被拉的很長。
因為Filter是同步的,只有當Filter中的任務完成后才會放行用戶的請求,而這個Filter中有兩處耗時操作:請求第三方接口、數據入庫,這無疑增加了用戶的等待時間。
因此,我需要將耗時操作異步執行,減少Filter的阻塞時間。
我把這兩個耗時操作放入一個新線程中,只要請求一來,就創建一條新線程去處理這兩步操作。和先前的方式比對之后發現,確實響應速度提高了不少!
但仔細一想,發現不妙。這種方式沒辦法控制線程的數量,當訪問量很高的情況下,線程數量將會無限增加,這時候會搞垮服務器的!
所以需要一個機制來管理所有的線程,于是我就設計了一個消息隊列模型。
模型設計
這個模型很簡單,由一個任務隊列和多個工作線程組成。生產者只需不停地往任務隊列中添加任務,消費者(工作線程)不停地從任務隊列的另一端取任務執行。
這個模型在項目中的應用是這樣的:當一個請求被Filter攔截后,Filter從請求中獲取用戶的各項信息,然后把這些信息封裝成一個任務對象,扔給任務隊列,此刻這個Filter的使命就完成了,它完全不用管任務的執行過程。工作線程會不停地從任務隊列中取任務執行。
類圖設計
從代碼層面來看,整個消息隊列由三個類構成:
消息隊列類MsgQueue
這個類管理整個消息隊列的運行,是主控程序,它包含以下方法:
-
init:初始化整個消息隊列
在初始化過程中,它會依次做以下事情:
- 創建一個任務隊列
- 調用initWorkThread函數,創建指定數量的工作線程(工作線程一旦被創建,就會不停地讀取任務隊列中的任務)
- 調用loadTask函數,從數據庫中加載所有任務
-
loadTask:加載數據庫中的所有任務
這是一個抽象函數,若要使用這個消息隊列,必須實現這個函數。
消息隊列初始化的時候會調用這個函數,從數據庫中加載上次沒有執行完的任務。
作為消息隊列來講,它并不知道你提供的任務是啥,因此它沒辦法知道你的任務應該存在哪里,以何種形式存儲?因此,這個過程就需要讓消息隊列使用者自己去實現。
-
saveTask:持久化當前任務隊列中的任務
這也是個抽象函數,若要使用這個消息隊列,也必須實現這個函數。
當使用者調用消息隊列的stop函數時,它會被執行,用于存儲當前消息隊列中尚未被執行的任務,并且在下次啟動消息隊列的時候通過loadTask函數再次加載進任務隊列,這樣能確保所有任務不會被遺漏。
-
addTask:向任務隊列添加一個任務
- stop:停止所有工作線程
- initWorkThread:初始化所有工作線程
這是一個私有函數,當初始化整個消息隊列的時候被init函數調用。
工作線程類WorkThread
工作線程會不斷地檢查任務隊列中是否有任務,若有任務,就會取一個任務執行;若沒有任務,就會等待一定時間后再次檢查。
它是MsgQueue的一個內部類。因為WorkThread的行為完全由MsgQueue管理,外界不需要知道它的存在。
任務類Task
它是一個接口,并且只有一個函數run,用于封裝任務具體的執行過程。
附上代碼
以下代碼還沒將消息隊列單獨抽象出來,相當于是一個專門用于IP向省市縣轉化的消息隊列,有空把它整一下。
代碼中有詳細的注釋來解釋線程安全性問題。
- 消息隊列主控程序
package com.sdata.foundation.web.filter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import javax.servlet.FilterConfig;
import javax.servlet.ServletContext;
import org.apache.log4j.Logger;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.context.support.XmlWebApplicationContext;
import com.sdata.foundation.web.service.util.DelDataUtilService;
import com.sdata.foundation.web.service.util.InsertDataUtilService;
import com.sdata.foundation.web.service.util.QueryDataUtilService;
import com.thinkgem.jeesite.modules.sys.service.LogService;
/**
- 記錄請求IP的消息隊列
@author Chai
/
public class RecordLocationMQ {
// 工作線程的個數
private static int MaxWorkThread;
// 工作線程隊列
private static List<WorkThread> workThreadQueue = new ArrayList<WorkThread>();
// 任務隊列(存放等待執行的任務)
private static List<RecordLocationTask> msgQueue = new ArrayList<RecordLocationTask>();
// 控制所有工作線程的運行與否
private static boolean isRunning = true;
private static LogService LogService;
// 數據庫查詢的service(用于任務的持久化)
private static QueryDataUtilService QueryService;
// 數據庫刪除的service(用于任務的持久化)
private static DelDataUtilService DelService;
// 數據庫插入的service(用于任務的持久化)
private static InsertDataUtilService InsertService;
// 日志
private static final Logger logger = Logger.getLogger(RecordLocationMQ.class);
// 一些常量
private static final int SUCCESS = 1;
private static final int FAIL = 0;
/**
* 本消息隊列的初始化函數
* @param config 用于獲取數據庫操作的service
*/
public static void init ( FilterConfig config ) {
RecordLocationMQ.init( 10, config );
}
/**
* 本消息隊列的初始化函數
* @param MaxWorkThread 工作線程的個數
* @param config 用于獲取數據庫操作的service
*/
public static void init ( int MaxWorkThread, FilterConfig config ) {
RecordLocationMQ.MaxWorkThread = MaxWorkThread;
// 初始化LogService
if (null == RecordLocationMQ.LogService) {
ServletContext sc = config.getServletContext();
XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
if (cxt != null && cxt.getBean("logService") != null) {
RecordLocationMQ.LogService = (LogService) cxt.getBean("logService");
}
}
// 初始化QueryService
if (null == RecordLocationMQ.QueryService) {
ServletContext sc = config.getServletContext();
XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
if (cxt != null && cxt.getBean("queryDataUtilService") != null) {
RecordLocationMQ.QueryService = (QueryDataUtilService) cxt.getBean("queryDataUtilService");
}
}
// 初始化DelService
if (null == RecordLocationMQ.DelService) {
ServletContext sc = config.getServletContext();
XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
if (cxt != null && cxt.getBean("delDataUtilService") != null) {
RecordLocationMQ.DelService = (DelDataUtilService) cxt.getBean("delDataUtilService");
}
}
// 初始化InsertService
if (null == RecordLocationMQ.InsertService) {
ServletContext sc = config.getServletContext();
XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
if (cxt != null && cxt.getBean("insertDataUtilService") != null) {
RecordLocationMQ.InsertService = (InsertDataUtilService) cxt.getBean("insertDataUtilService");
}
}
// 從DB中加載尚未完成的任務
// PS:在新線程中執行,防止tomcat啟動時間過長
new RecordLocationMQ().new loadTaskThread().start();
// 初始化工作線程,并開始工作
initWorkThread( MaxWorkThread, workThreadQueue );
}
/**
* 初始化工作線程,并開始工作
* @param maxWorkThread 工作線程數量
* @param workThreadQueue 工作線程隊列
*/
private static void initWorkThread(int maxWorkThread, List<WorkThread> workThreadQueue) {
for ( int i=0; i<maxWorkThread; i++ ) {
WorkThread workThread = new RecordLocationMQ().new WorkThread("WorkThread"+(i+1));
workThreadQueue.add( workThread );
workThread.start();
System.out.println("已開啟線程:WorkThread"+(i+1));
}
}
/**
* 從DB中加載尚未完成的任務
* 并插入傳入的消息隊列中
* @param msgQueue
* @param logger
* @param logService
*/
private static void loadTask ( List<RecordLocationTask> msgQueue, QueryDataUtilService QueryService, DelDataUtilService DelService ) {
String querySQL = "select * from sys_log_temp";
String delSQL = "delete from sys_log_temp";
// 查詢DB中的任務
try {
List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );
for ( Map<String,Object> map : queryResultList ) {
String ip = map.get("ip").toString();
String logId = map.get("log_id").toString();
if ( null!=ip && null!=logId ) {
RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );
}
}
}
// 查詢失敗,不能執行delte操作
catch (Exception e) {
e.printStackTrace();
return;
}
// 清空DB中的任務
DelService.del( Arrays.asList( delSQL ) );
}
/**
* 持久化當前任務隊列
*/
public static void saveTask () {
// PS1:為什么要使用同步?
// Java不允許在遍歷集合過程中新增/刪除元素,
// 因此在遍歷任務隊列前必須要先凍結任務隊列,
// 防止其他線程新增/刪除元素;
// 此外,為了凍結任務隊列,就必須使用msgQueue鎖。
// PS2:為何要先使isRunning為false?
// 將isRunning設為false能立即停止所有工作線程(見PS3),
// 從而所有工作線程都將釋放msgQueue鎖,
// 從而確保這里的同步塊能順利拿到msgQueue鎖。
// 若不執行isRunning = false的話,
// 所有工作線程就會繼續執行,
// 如果任務隊列為空,工作線程就會一直持有msgQueue鎖,并等待任務的到來,
// 然而添加任務的功能得在當前同步塊執行完成后才會執行,
// 因此就出現了死鎖。
// PS3:為何將isRunning設為false就能立即停止所有工作線程?
// 因為isRunning為工作線程的共享資源,
// 并且工作線程的運行依賴于它的值;
// 因此當isRunning設為false后,
// 工作線程執行完當前任務或發現任務隊列為空后,就會紛紛停止。
isRunning = false;
synchronized ( msgQueue ) {
for ( RecordLocationTask task : msgQueue ) {
if ( task!=null && !task.persisted ) {
int result = InsertService.insert( Arrays.asList( "insert into sys_log_temp (id, ip, log_id) values('"+new Date().getTime()+"', '"+task.getIp()+"', '"+task.getLogId()+"')" ) );
if ( result == SUCCESS ) {
task.persisted = true;
}
}
}
isRunning = true;
}
}
/**
* 向任務隊列中添加一個任務
* @param task 任務對象
*/
public static void addTask ( RecordLocationTask task ) {
// 添加任務
// PS:加判斷的原因:由于當前這個類的這個函數是提供給別人使用的,
// 我們沒辦法保證別人一定會傳入一個非空的task,
// 因此加個判斷能提高程序的健壯性。
if ( task!=null ) {
// PS1:加同步塊的原因:由于msgQueue是ArrayList類型,
// ArrayList所有函數都是線程不安全的,
// 這里加一個同步塊使add函數具有原子性。
// PS2:千萬不能使用msgQueue作為鎖!
// 因為工作線程獲取一個任務的過程,使用的鎖就是msgQueue,
// 并且在這個過程中,如果任務隊列為空就會一直循環等待,
// 因此在等待的過程中工作線程就一直占用的msgQueue鎖;
// 然而如果這里添加任務還需要msgQueue鎖,那么就會出現死鎖,
// 工作線程因為任務隊列為空就一直占用著msgQueue鎖,
// 而添加任務的進程獲取不到msgQueue鎖就無法添加任務。
synchronized ( new Object() ) {
msgQueue.add(task);
// System.out.println("向消息隊列添加了一條task!");
}
}
// 持久化任務隊列
// PS:不使用同步的原因:這里對于數據的實時性要求沒那么高。
if ( msgQueue.size() > 100 ) {
saveTask();
}
}
public static void stop () {
isRunning = false;
}
/**
* 工作線程內部類
*/
private class WorkThread extends Thread {
public WorkThread ( String threadName ) {
super(threadName);
}
@Override
public void run() {
RecordLocationTask task = null;
while ( isRunning ) {
// 獲取一個任務
synchronized ( msgQueue ) {
// 任務隊列為空,則等待
while ( isRunning && msgQueue.isEmpty() ) {
// System.out.println("消息隊列為空!");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取一個任務
// PS:加判斷的原因:上述while循環的結束有兩種可能:
// 1.msgQueue不為空;
// 2.isRunning為false
// 因此要加判斷排除msgQueue為空,但isRunning為false的情況,
// 防止msgQueue.remove時出現空指針!
if ( !msgQueue.isEmpty() ) {
task = msgQueue.remove(0);
}
// System.out.println(this.getName() + "取了一個task!");
}
// 執行任務
// PS1:加try-catch的原因:捕獲任務執行過程中發生的一切異常,
// 只要發生異常,就說明該任務執行失敗,
// 因此需要把它重新放進任務隊列等待下一次執行。
// PS2:加判斷的原因:同上述“取一個任務”加判斷的原因一樣。
try {
if ( task!=null ) {
task.run();
}
} catch (Exception e) {
// e.printStackTrace();
RecordLocationMQ.addTask( task ); // 使用addTask函數添加,統一添加的入口
}
}
}
}
/**
* 從數據庫加載任務的內部類
*/
private class loadTaskThread extends Thread {
@Override
public void run() {
String querySQL = "select * from sys_log_temp";
String delSQL = "delete from sys_log_temp";
// 查詢DB中的任務
try {
List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );
for ( Map<String,Object> map : queryResultList ) {
String ip = map.get("ip").toString();
String logId = map.get("log_id").toString();
if ( null!=ip && null!=logId ) {
RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );
}
}
}
// 查詢失敗,不能執行delte操作
catch (Exception e) {
e.printStackTrace();
return;
}
// 清空DB中的任務
DelService.del( Arrays.asList( delSQL ) );
}
}
// 禁用構造函數
private RecordLocationMQ () {}
}</code></pre>
- 任務接口
package com.sdata.foundation.web.filter;
public interface Task {
public void run() throws Exception;
}</code></pre>
- 用于IP向省市縣轉化的任務線程
package com.sdata.foundation.web.filter;
import java.util.Date;
import org.apache.log4j.Logger;
import com.thinkgem.jeesite.common.utils.IdGen;
import com.thinkgem.jeesite.modules.sys.entity.Log;
import com.thinkgem.jeesite.modules.sys.service.LogService;
public class RecordLocationTask implements Task {
private static final Logger logger = Logger.getLogger(RecordLocationTask.class);
private LogService logService;
private String ip;
private String logId;
//
public boolean persisted = false;
public RecordLocationTask(String ip, String logId, LogService logService ) {
super();
this.logService = logService;
this.ip = ip;
this.logId = logId;
}
@Override
public void run() throws Exception {
// 查詢IP
if ( (new Date().getTime() - TransferIPTool.lastOperaTime) < 100 ){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String location = TransferIPTool.transferIP(ip);
// 更新log
Log log = new Log();
log.setIsNewRecord(false);
log.setId(logId);
log.setLocation(location);
logService.save(log);
System.out.println("完成一個task!");
}
public String getIp() {
return ip;
}
public String getLogId() {
return logId;
}
}</code></pre>
- 用于IP向省市縣轉化的工具類
package com.sdata.foundation.web.filter;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.sdata.foundation.web.service.util.HTTPHelper;
/**
- @author Chai
本類用于將IP轉化為位置信息
*/
public class TransferIPTool {
// 本次請求taobao接口的開始時間
public static long lastOperaTime = new Date().getTime();
// 模擬IP隊列
private static List<String> IPList = Arrays.asList("49.65.250.135","115.28.217.42","114.80.166.240","122.92.218.0","218.28.191.23","218.12.41.179","221.239.16.227","59.108.49.35","124.117.66.0","218.21.128.31","116.52.147.50");
// IP轉換接口
private static final String RequestIP = "
/**
* 將IP轉化為省份
* @param ip
* @return 省份字符串
* @throws Exception
*/
public static String transferIP ( String ip ) throws Exception {
// 記錄本次請求taobao接口的開始時間
TransferIPTool.lastOperaTime = new Date().getTime();
// 打亂IPList
Collections.shuffle( IPList );
String resultJsonStr = HTTPHelper.executeGet(RequestIP + "?ip=" + IPList.get(0));
JSONObject resultJsonObj = JSONObject.parseObject( resultJsonStr );
JSONObject data = resultJsonObj.getJSONObject("data");
return data.getString("region");
}
}</code></pre>
來自:http://blog.csdn.net/u010425776/article/details/53837721