如何手寫一個輕量級消息隊列?

xjtuxjt 8年前發布 | 47K 次閱讀 消息系統

寫在前面

最近因為項目需要,自己寫了個單生產者-多消費者的消息隊列模型。多線程真的不是等閑之輩能玩兒的,我花了兩個小時進行設計與編碼,卻花了兩天的時間調試與運行。在這里,我把我遇到的坑與大家分享。

需求的由來

一開始我需要實現一個記錄用戶操作日志的功能,目的是給商家用戶提供客戶行為分析的能力。要記錄的信息包括客戶的訪問時間、IP、在網站上所做的操作等。其中,客戶的地域信息是個重要的分析項,所以必須要把IP轉化成省市縣。那么究竟何時完成這個轉化的動作呢?有兩種方案:

1. 在用戶進行數據分析 完成轉化

2. 在用戶進行數據分析 完成轉化

第一種方案顯然不靠譜,因為需要轉化的IP數量很大,而且轉化采用第三方接口,因此整個轉化過程將持續很長很長很長……的時間。

而在分析前就把轉化過程完成掉,這樣當用戶需要分析的時候就可以減少這部分時間的開銷,提高了響應速度。因此第二種方案顯然比較合理。

那么隨之而來的問題是:究竟在數據分析前的哪個時機進行轉化?

這個問題又有兩種方案:

1. 在記錄日志的時候就立即完成IP向省市縣的轉換;

2. 每天半夜服務器統一把當天的IP轉化成省市縣;

這兩種方案應該來說各有千秋。

第一種方案比較消耗服務器資源,因為IP向省市縣轉化需要向第三方接口發送GET請求,因此需要消耗一定的出口帶寬和內存資源,在服務器資源一定的前提下,分給用戶訪問的資源就會被減少,從而可能會影響請求響應速度。但這個問題可以用錢來解決,只要花錢砸服務器就行了;而第二種方案在服務器空閑的時候進行轉化雖然節約了服務器資源,但這也導致了商家的分析結果會有一天的滯后,影響用戶體驗。

于是,這個問題就變成了老板的錢重要還是用戶體驗重要。因此我毫不猶豫地選擇了第一種方案。

初步設計

我使用Servlet Filter攔截用戶的所有請求,并在Filter中獲取用戶的各項信息(其中包括IP),然后再請求第三方接口,完成IP向省市縣的轉化,最后將這些信息入庫。

這個流程很顯然有重大缺陷:請求響應時間將被拉的很長。

因為Filter是同步的,只有當Filter中的任務完成后才會放行用戶的請求,而這個Filter中有兩處耗時操作:請求第三方接口、數據入庫,這無疑增加了用戶的等待時間。

因此,我需要將耗時操作異步執行,減少Filter的阻塞時間。

我把這兩個耗時操作放入一個新線程中,只要請求一來,就創建一條新線程去處理這兩步操作。和先前的方式比對之后發現,確實響應速度提高了不少!

但仔細一想,發現不妙。這種方式沒辦法控制線程的數量,當訪問量很高的情況下,線程數量將會無限增加,這時候會搞垮服務器的!

所以需要一個機制來管理所有的線程,于是我就設計了一個消息隊列模型。

模型設計

這個模型很簡單,由一個任務隊列和多個工作線程組成。生產者只需不停地往任務隊列中添加任務,消費者(工作線程)不停地從任務隊列的另一端取任務執行。

這個模型在項目中的應用是這樣的:當一個請求被Filter攔截后,Filter從請求中獲取用戶的各項信息,然后把這些信息封裝成一個任務對象,扔給任務隊列,此刻這個Filter的使命就完成了,它完全不用管任務的執行過程。工作線程會不停地從任務隊列中取任務執行。

類圖設計

從代碼層面來看,整個消息隊列由三個類構成:

消息隊列類MsgQueue

這個類管理整個消息隊列的運行,是主控程序,它包含以下方法:

  • init:初始化整個消息隊列

    在初始化過程中,它會依次做以下事情:

    1. 創建一個任務隊列
    2. 調用initWorkThread函數,創建指定數量的工作線程(工作線程一旦被創建,就會不停地讀取任務隊列中的任務)
    3. 調用loadTask函數,從數據庫中加載所有任務
  • loadTask:加載數據庫中的所有任務

    這是一個抽象函數,若要使用這個消息隊列,必須實現這個函數。

    消息隊列初始化的時候會調用這個函數,從數據庫中加載上次沒有執行完的任務。

    作為消息隊列來講,它并不知道你提供的任務是啥,因此它沒辦法知道你的任務應該存在哪里,以何種形式存儲?因此,這個過程就需要讓消息隊列使用者自己去實現。

  • saveTask:持久化當前任務隊列中的任務

    這也是個抽象函數,若要使用這個消息隊列,也必須實現這個函數。

    當使用者調用消息隊列的stop函數時,它會被執行,用于存儲當前消息隊列中尚未被執行的任務,并且在下次啟動消息隊列的時候通過loadTask函數再次加載進任務隊列,這樣能確保所有任務不會被遺漏。

  • addTask:向任務隊列添加一個任務

  • stop:停止所有工作線程
  • initWorkThread:初始化所有工作線程
    這是一個私有函數,當初始化整個消息隊列的時候被init函數調用。

工作線程類WorkThread

工作線程會不斷地檢查任務隊列中是否有任務,若有任務,就會取一個任務執行;若沒有任務,就會等待一定時間后再次檢查。

它是MsgQueue的一個內部類。因為WorkThread的行為完全由MsgQueue管理,外界不需要知道它的存在。

任務類Task

它是一個接口,并且只有一個函數run,用于封裝任務具體的執行過程。

附上代碼

以下代碼還沒將消息隊列單獨抽象出來,相當于是一個專門用于IP向省市縣轉化的消息隊列,有空把它整一下。

代碼中有詳細的注釋來解釋線程安全性問題。

  • 消息隊列主控程序
package com.sdata.foundation.web.filter;

import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map;

import javax.servlet.FilterConfig; import javax.servlet.ServletContext;

import org.apache.log4j.Logger; import org.springframework.web.context.support.WebApplicationContextUtils; import org.springframework.web.context.support.XmlWebApplicationContext;

import com.sdata.foundation.web.service.util.DelDataUtilService; import com.sdata.foundation.web.service.util.InsertDataUtilService; import com.sdata.foundation.web.service.util.QueryDataUtilService; import com.thinkgem.jeesite.modules.sys.service.LogService;

/**

  • 記錄請求IP的消息隊列
  • @author Chai / public class RecordLocationMQ { // 工作線程的個數 private static int MaxWorkThread; // 工作線程隊列 private static List<WorkThread> workThreadQueue = new ArrayList<WorkThread>(); // 任務隊列(存放等待執行的任務) private static List<RecordLocationTask> msgQueue = new ArrayList<RecordLocationTask>(); // 控制所有工作線程的運行與否 private static boolean isRunning = true; private static LogService LogService; // 數據庫查詢的service(用于任務的持久化) private static QueryDataUtilService QueryService; // 數據庫刪除的service(用于任務的持久化) private static DelDataUtilService DelService; // 數據庫插入的service(用于任務的持久化) private static InsertDataUtilService InsertService; // 日志 private static final Logger logger = Logger.getLogger(RecordLocationMQ.class);

    // 一些常量 private static final int SUCCESS = 1; private static final int FAIL = 0;

/**
 * 本消息隊列的初始化函數
 * @param config 用于獲取數據庫操作的service
 */
public static void init (  FilterConfig config ) {
    RecordLocationMQ.init( 10, config );
}


/**
 * 本消息隊列的初始化函數
 * @param MaxWorkThread 工作線程的個數
 * @param config 用于獲取數據庫操作的service
 */
public static void init ( int MaxWorkThread, FilterConfig config ) {

    RecordLocationMQ.MaxWorkThread = MaxWorkThread;

    // 初始化LogService
    if (null == RecordLocationMQ.LogService) {
        ServletContext sc = config.getServletContext();
        XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
        if (cxt != null && cxt.getBean("logService") != null) {
            RecordLocationMQ.LogService = (LogService) cxt.getBean("logService");
        }
    }

    // 初始化QueryService
    if (null == RecordLocationMQ.QueryService) {
        ServletContext sc = config.getServletContext();
        XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
        if (cxt != null && cxt.getBean("queryDataUtilService") != null) {
            RecordLocationMQ.QueryService = (QueryDataUtilService) cxt.getBean("queryDataUtilService");
        }
    }

    // 初始化DelService
    if (null == RecordLocationMQ.DelService) {
        ServletContext sc = config.getServletContext();
        XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
        if (cxt != null && cxt.getBean("delDataUtilService") != null) {
            RecordLocationMQ.DelService = (DelDataUtilService) cxt.getBean("delDataUtilService");
        }
    }

    // 初始化InsertService
    if (null == RecordLocationMQ.InsertService) {
        ServletContext sc = config.getServletContext();
        XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
        if (cxt != null && cxt.getBean("insertDataUtilService") != null) {
            RecordLocationMQ.InsertService = (InsertDataUtilService) cxt.getBean("insertDataUtilService");
        }
    }

    // 從DB中加載尚未完成的任務
    // PS:在新線程中執行,防止tomcat啟動時間過長
    new RecordLocationMQ().new loadTaskThread().start();

    // 初始化工作線程,并開始工作
    initWorkThread( MaxWorkThread, workThreadQueue );
}


/**
 * 初始化工作線程,并開始工作
 * @param maxWorkThread 工作線程數量
 * @param workThreadQueue 工作線程隊列
 */
private static void initWorkThread(int maxWorkThread, List<WorkThread> workThreadQueue) {
    for ( int i=0; i<maxWorkThread; i++ ) {
        WorkThread workThread = new RecordLocationMQ().new WorkThread("WorkThread"+(i+1));
        workThreadQueue.add( workThread );
        workThread.start();
        System.out.println("已開啟線程:WorkThread"+(i+1));
    }
}


/**
 * 從DB中加載尚未完成的任務
 * 并插入傳入的消息隊列中
 * @param msgQueue
 * @param logger 
 * @param logService 
 */
private static void loadTask ( List<RecordLocationTask> msgQueue, QueryDataUtilService QueryService, DelDataUtilService DelService ) {

    String querySQL = "select * from sys_log_temp";
    String delSQL = "delete from sys_log_temp";

    // 查詢DB中的任務
    try {
        List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );
        for ( Map<String,Object> map : queryResultList ) {

            String ip = map.get("ip").toString();
            String logId = map.get("log_id").toString();

            if ( null!=ip && null!=logId ) {
                RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );
            }
        }
    } 
    // 查詢失敗,不能執行delte操作
    catch (Exception e) {
        e.printStackTrace();
        return;
    }

    // 清空DB中的任務
    DelService.del( Arrays.asList( delSQL ) );
}


/**
 * 持久化當前任務隊列
 */
public static void saveTask () {
    // PS1:為什么要使用同步?
    //    Java不允許在遍歷集合過程中新增/刪除元素, 
    //    因此在遍歷任務隊列前必須要先凍結任務隊列,
    //    防止其他線程新增/刪除元素;
    //    此外,為了凍結任務隊列,就必須使用msgQueue鎖。
    // PS2:為何要先使isRunning為false?
    //     將isRunning設為false能立即停止所有工作線程(見PS3),
    //     從而所有工作線程都將釋放msgQueue鎖,
    //     從而確保這里的同步塊能順利拿到msgQueue鎖。
    //     若不執行isRunning = false的話,
    //     所有工作線程就會繼續執行,
    //     如果任務隊列為空,工作線程就會一直持有msgQueue鎖,并等待任務的到來,
    //     然而添加任務的功能得在當前同步塊執行完成后才會執行,
    //     因此就出現了死鎖。
    // PS3:為何將isRunning設為false就能立即停止所有工作線程?
    //     因為isRunning為工作線程的共享資源,
    //     并且工作線程的運行依賴于它的值;
    //     因此當isRunning設為false后,
    //     工作線程執行完當前任務或發現任務隊列為空后,就會紛紛停止。
    isRunning = false;
    synchronized ( msgQueue ) {
        for ( RecordLocationTask task : msgQueue ) {
            if ( task!=null && !task.persisted ) {
                int result = InsertService.insert( Arrays.asList( "insert into sys_log_temp (id, ip, log_id) values('"+new Date().getTime()+"', '"+task.getIp()+"', '"+task.getLogId()+"')" ) );
                if ( result == SUCCESS ) {
                    task.persisted = true;
                }
            }
        }
        isRunning = true;
    }
}

/**
 * 向任務隊列中添加一個任務
 * @param task 任務對象
 */
public static void addTask ( RecordLocationTask task ) {

    // 添加任務
    // PS:加判斷的原因:由于當前這個類的這個函數是提供給別人使用的,
    //    我們沒辦法保證別人一定會傳入一個非空的task,
    //    因此加個判斷能提高程序的健壯性。
    if ( task!=null ) {
        // PS1:加同步塊的原因:由于msgQueue是ArrayList類型,
        //     ArrayList所有函數都是線程不安全的,
        //     這里加一個同步塊使add函數具有原子性。
        // PS2:千萬不能使用msgQueue作為鎖!
        //     因為工作線程獲取一個任務的過程,使用的鎖就是msgQueue,
        //     并且在這個過程中,如果任務隊列為空就會一直循環等待,
        //     因此在等待的過程中工作線程就一直占用的msgQueue鎖;
        //     然而如果這里添加任務還需要msgQueue鎖,那么就會出現死鎖,
        //     工作線程因為任務隊列為空就一直占用著msgQueue鎖,
        //     而添加任務的進程獲取不到msgQueue鎖就無法添加任務。
        synchronized ( new Object() ) {
            msgQueue.add(task);
            // System.out.println("向消息隊列添加了一條task!");
        }
    }

    // 持久化任務隊列
    // PS:不使用同步的原因:這里對于數據的實時性要求沒那么高。
    if ( msgQueue.size() > 100 ) { 
        saveTask();
    }
}


public static void stop () {
    isRunning = false;
}


/**
 * 工作線程內部類
 */
private class WorkThread extends Thread {

    public WorkThread ( String threadName ) {
        super(threadName);
    }

    @Override
    public void run() {
        RecordLocationTask task = null;

        while ( isRunning ) {
            // 獲取一個任務
            synchronized ( msgQueue ) {
                // 任務隊列為空,則等待
                while ( isRunning && msgQueue.isEmpty() ) {
                    // System.out.println("消息隊列為空!");
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                // 取一個任務
                // PS:加判斷的原因:上述while循環的結束有兩種可能:
                //              1.msgQueue不為空;
                //              2.isRunning為false
                // 因此要加判斷排除msgQueue為空,但isRunning為false的情況,
                // 防止msgQueue.remove時出現空指針!
                if ( !msgQueue.isEmpty() ) {
                    task = msgQueue.remove(0);
                }
                // System.out.println(this.getName() + "取了一個task!");
            }

            // 執行任務
            // PS1:加try-catch的原因:捕獲任務執行過程中發生的一切異常,  
            //   只要發生異常,就說明該任務執行失敗,
            //   因此需要把它重新放進任務隊列等待下一次執行。
            // PS2:加判斷的原因:同上述“取一個任務”加判斷的原因一樣。
            try {
                if ( task!=null ) {
                    task.run();
                }
            } catch (Exception e) {
                // e.printStackTrace();
                RecordLocationMQ.addTask( task ); // 使用addTask函數添加,統一添加的入口
            }
        }
    }
}

/**
 * 從數據庫加載任務的內部類
 */
private class loadTaskThread extends Thread {

    @Override
    public void run() {

        String querySQL = "select * from sys_log_temp";
        String delSQL = "delete from sys_log_temp";

        // 查詢DB中的任務
        try {
            List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );
            for ( Map<String,Object> map : queryResultList ) {

                String ip = map.get("ip").toString();
                String logId = map.get("log_id").toString();

                if ( null!=ip && null!=logId ) {
                    RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );
                }
            }
        } 
        // 查詢失敗,不能執行delte操作
        catch (Exception e) {
            e.printStackTrace();
            return;
        }

        // 清空DB中的任務
        DelService.del( Arrays.asList( delSQL ) );

    }
}

// 禁用構造函數
private RecordLocationMQ () {}

}</code></pre>

  • 任務接口
package com.sdata.foundation.web.filter;

public interface Task { public void run() throws Exception; }</code></pre>

  • 用于IP向省市縣轉化的任務線程
package com.sdata.foundation.web.filter;

import java.util.Date;

import org.apache.log4j.Logger;

import com.thinkgem.jeesite.common.utils.IdGen; import com.thinkgem.jeesite.modules.sys.entity.Log; import com.thinkgem.jeesite.modules.sys.service.LogService;

public class RecordLocationTask implements Task { private static final Logger logger = Logger.getLogger(RecordLocationTask.class); private LogService logService; private String ip; private String logId; // public boolean persisted = false;

public RecordLocationTask(String ip, String logId, LogService logService ) {
    super();
    this.logService = logService;
    this.ip = ip;
    this.logId = logId;
}

@Override
public void run() throws Exception {

    // 查詢IP
    if ( (new Date().getTime() - TransferIPTool.lastOperaTime) < 100 ){
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    String location = TransferIPTool.transferIP(ip);

    // 更新log
    Log log = new Log();
    log.setIsNewRecord(false);
    log.setId(logId);
    log.setLocation(location);
    logService.save(log);

    System.out.println("完成一個task!");

}

public String getIp() {
    return ip;
}

public String getLogId() {
    return logId;
}

}</code></pre>

  • 用于IP向省市縣轉化的工具類
package com.sdata.foundation.web.filter;

import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List;

import com.alibaba.fastjson.JSONObject; import com.sdata.foundation.web.service.util.HTTPHelper;

/**

  • @author Chai
  • 本類用于將IP轉化為位置信息 */ public class TransferIPTool {

    // 本次請求taobao接口的開始時間 public static long lastOperaTime = new Date().getTime(); // 模擬IP隊列 private static List<String> IPList = Arrays.asList("49.65.250.135","115.28.217.42","114.80.166.240","122.92.218.0","218.28.191.23","218.12.41.179","221.239.16.227","59.108.49.35","124.117.66.0","218.21.128.31","116.52.147.50"); // IP轉換接口 private static final String RequestIP = "

/**
 * 將IP轉化為省份
 * @param ip
 * @return 省份字符串
 * @throws Exception 
 */
public static String transferIP ( String ip ) throws Exception {

    // 記錄本次請求taobao接口的開始時間
    TransferIPTool.lastOperaTime = new Date().getTime();

    // 打亂IPList
    Collections.shuffle( IPList );

    String resultJsonStr = HTTPHelper.executeGet(RequestIP + "?ip=" + IPList.get(0));
    JSONObject resultJsonObj = JSONObject.parseObject( resultJsonStr );
    JSONObject data = resultJsonObj.getJSONObject("data");
    return data.getString("region");

}

}</code></pre>

 

來自:http://blog.csdn.net/u010425776/article/details/53837721

 

 本文由用戶 xjtuxjt 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!