JGroups入門 - 實現簡單的基于文本的聊天程序
這篇文章的目的就是寫一個簡單的基于文本的聊天程序(SimpleChat),包含如下功能:
- 所有 SimpleChat 的實例通過一個組來查找彼此
- 無需運行一個中央的聊天服務器來供 SimpleChat 連接,因此也就沒有單點故障
- 聊天信息將發送給組中的所有成員
- 當某個實例離開或者加入組或者崩潰時,其他的成員會收到通知
- (可選) 我們維護一個公用的組范圍內的狀態分享,例如聊天歷史記錄。新的實例可從已有的實例處獲取這些歷史記錄
JGroups 概要
JGroups 使用 JChannel 作為連接到組、發送和接收消息的主 API,并可通過 JChannel 注冊用來處理這些事件(成員加入、退出和發送消息)的偵聽器。
而 Messages 是發送的消息,它包含一個字節緩沖區、發送和接受者地址。Addresses 是 org.jgroups.Address 的子類,通常包含一個 IP 地址和端口。
組中的實例列表被成為 View,每個實例包含相同的 View,可通過 View.getMembers() 來獲取所有實例的地址列表。
實例 Instances 只能在加入組后才能發送和接收消息。
當一個實例要離開組時,JChannel.disconnect() 或者 JChannel.close() 方法會被調用,后者實際上會判斷當連接還保持時調用了 disconnect() 方法來關閉通道。
創建并加入組通道
要加入一個組,我需要使用 JChannel。一個 JChannel 的實例可以通過一個配置來創建,配置中定義了通道的屬性。然后通過 connect(String clustername) 來連接到組中。所有通道實例都是調用 connect() 并使用相同的參數來加入相同的組中。下面讓我們實際創建一個 JChannel 并連接到名為 ChatCluster 的組中:
import org.jgroups.JChannel; public class SimpleChat { JChannel channel; String user_name=System.getProperty("user.name", "n/a"); private void start() throws Exception { channel=new JChannel(); // use the default config, udp.xml channel.connect("ChatCluster"); } public static void main(String[] args) throws Exception { new SimpleChat().start(); } }
首先我們使用空的構造器來創建一個 JChannel 實例,該方法使用默認的配置。你也可以傳遞一個 XML 文件來配置這個 JChannel,例如 new JChannel("/home/bela/udp.xml").
connect() 方法加入 ChatCluster 組中。注意我們并不需要事先明確的創建一個組,connect() 方法會判斷如果是組中的第一個實例時自動創建該組。之后其他的實例連接過來就加入了相同的組,例如:
- ch1 joining "cluster-one"
- ch2 joining "cluster-two"
- ch3 joining "cluster-two"
- ch4 joining "cluster-one"
- ch5 joining "cluster-three"
這樣我們就有三個組:"cluster-one" 包含 ch1 和 ch4, "cluster-two" 包含 ch2 和 ch3, 而 "cluster-three" 只有一個 ch5 實例.
主事件循環和發送聊天消息
現在我們運行一個事件循環來從標準控制臺輸入中讀取文本消息,然后發送到組中所有成員。當輸入 exit 或者 quit 命令時,將會退出循環并關閉通道
private void start() throws Exception { channel=new JChannel(); channel.connect("ChatCluster"); eventLoop(); channel.close(); } private void eventLoop() { BufferedReader in=new BufferedReader(new InputStreamReader(System.in)); while(true) { try { System.out.print("> "); System.out.flush(); String line=in.readLine().toLowerCase(); if(line.startsWith("quit") || line.startsWith("exit")) break; line="[" + user_name + "] " + line; Message msg=new Message(null, null, line); channel.send(msg); } catch(Exception e) { } } }
在這里添加了 eventLoop() 的調用,然后關閉通道的方法。
事件處理循環當輸入回車時就會發送消息到組中,這是通過構造一個新的 Message 實例然后調用 Channel.send() 方法來發送。
Message 類構造器的首個參數是目標地址,該參數設置為 null 表示將發送給組中所有成員(非空的參數表示要發送到指定的某個成員)。
第二個參數是發送者的地址,這也是 null,JGroups 會自動使用當前的恰當的地址。
第三個參數是我們從標準控制臺輸入中讀到的鍵盤輸入的內容,這個內容會通過 Java 的序列化機制變成 byte[] 數據并設置為 Message 的內容。注意我們也可以自己來序列化一個對象(而且也推薦這樣做),然后給 Message 構造器第三個參數傳遞 byte[] 值。
到這里應用程序功能就已差不多完整,不過還缺少一樣,沒有對接收到的消息進行提醒和顯示,接下來我們介紹消息的接收。
接收消息并查看更改通知
現在我們要注冊一個 Receiver 來接收消息并查看組中成員的變動。我們可以實現 org.jgroups.Receiver,不過這里我選用直接繼承 ReceiverAdapter 類,因為該類已經有一些默認的實現方法了。然后只需要重載回調函數 receive() 和 viewChange() 即可:
public class SimpleChat extends ReceiverAdapter {
設置接收器的 start() 方法:
private void start() throws Exception { channel=new JChannel(); channel.setReceiver(this); channel.connect("ChatCluster"); eventLoop(); channel.close(); }實現 receive() 和 viewAccepted() 方法:
public void viewAccepted(View new_view) { System.out.println("** view: " + new_view); } public void receive(Message msg) { System.out.println(msg.getSrc() + ": " + msg.getObject()); }
viewAccepted() 回調函數會在新成員加入組中,或者已有成員崩潰了或離開組時被調用。其 toString() 方法會打印 View ID(也就是成員ID)以及當前成員列表。
在 receive() 方法中我們可以得到一個 Message 的參數,只需要讀取其緩沖區內容并輸出到控制臺,同時我們也把發送者的地址打印出來 (Message.getSrc()).
注意我們也可以通過調用 Message.getBuffer() 來獲取消息實體中的 byte[] 數據然后通過自己的反序列化來處理,例如 String line=new String(msg.getBuffer()).
測試 SimpleChat 程序
現在我們這個演示程序的所有功能均已完成,可使用如下命令來運行試試(譯者注:請自行將 jgroup-xxx.jar 添加到類路徑):
[linux]/home/bela$ java SimpleChat ------------------------------------------------------------------- GMS: address=linux-48776, cluster=ChatCluster, physical address=192.168.1.5:42442 ------------------------------------------------------------------- ** view: [linux-48776|0] [linux-48776] >
我們啟動的實例名是 linux-48776 ,其物理地址是 192.168.1.5:42442 (IP address:port). 這個名字是由 JGroups 來生成的(如果用戶沒有設置的話,通常是主機名加一個隨機的數字),該名字一直存在并在整個生命周期中保持不變,同時映射到一個基礎的 UUID,而 UUID 映射到物理地址。
接下來啟動第二個實例:
[linux]/home/bela$ java SimpleChat ------------------------------------------------------------------- GMS: address=linux-37238, cluster=ChatCluster, physical address=192.168.1.5:40710 ------------------------------------------------------------------- ** view: [linux-48776|1] [linux-48776, linux-37238] >
現在組中有兩個成員,包括 [linux-48776, linux-37238], 這里顯示的是加入組中兩個成員的名稱。注意第一個成員 (linux-48776) 同樣也接收到相同的 view,因此兩個成員包含了同樣的成員列表,順序也一致。實例是根據加入組中的先后順序列表的,因此最早加入的成員排在最前面。
只需要輸入消息并按回車鍵就可以發送消息。消息被發送到組中,因此組中所有成員都將接收到該消息,包括發送者自己。
當輸入 exit 或者 quit 并按回車,實例將會退出組。
為了模擬成員崩潰的情況(例如在控制臺中按 Ctrl + C),其他的幸存者將接收到一個新的 view ,而當前的組中只有一個成員。
額外信息:維護共享的組數據
JGroups 另外一個使用場景是可以幫你維護一份可在組中共享的數據。例如,Web 服務器中的所有 HTTP 會話。如果這些會話通過組來復制,那么客戶端就可以訪問組中的任意成員來獲得這些數據,就算某個成員崩潰或者退出了,這些數據依然可用。
對會話的更新也會通過組進行復制,例如某個序列化屬性被修改了,那么其他成員也會得知這個修改,這樣就使得組中所有成員包含了相同的狀態。
可當組中加入一個新成員會怎么樣呢?新加入的成員必須通過某些方法來獲取這些狀態數據,這被稱為是“狀態傳輸”。
JGroups 的狀態傳輸是通過兩個回調函數來實現的 (getState() and setState()) ,我們需要調用 getState() 方法來獲取狀態數據。注意,為了在應用中使用狀態傳輸,協議堆棧必須有一個狀態傳輸協議(我們這個演示程序使用了默認的堆棧)。
我們修改一下 start() 方法,加入 getState() 的調用:
private void start() throws Exception { channel=new JChannel(); channel.setReceiver(this); channel.connect("ChatCluster"); channel.getState(null, 10000); eventLoop(); channel.close(); }getState() 方法第一個參數是目標成員,null 表示首個成員(協調者)。第二個參數是超時時間,這里我們設置了 10 秒鐘的超時時間,以為著狀態傳輸的時間必須在 10 秒內完成,否則將會拋出異常,0 代表沒有超時時間。
ReceiverAdapter 定義了 getState() 回調函數,當組中實例(一般是第一個實例,或者也叫協調者)收到一個已有實例要獲取組狀態時被調用。在我們的示例程序中,我們為聊天會話定義了一個狀態,這是一個簡單的列表,包含最新的幾條聊天信息(這個可能不是一個好的組狀態的例子,因為這個狀態數據一直在增長).
聊天信息列表并定義為實例變量:
final List<String> state=new LinkedList<String>();
我們還需要修改 receive() 方法來將接收到的消息追加到狀態數據中:
public void receive(Message msg) { String line=msg.getSrc() + ": " + msg.getObject(); System.out.println(line); synchronized(state) { state.add(line); } }getState() 回調函數實現如下:
public void getState(OutputStream output) throws Exception { synchronized(state) { Util.objectToStream(state, new DataOutputStream(output)); } }getState() 方法在 “狀態提供者” 里調用,當實例返回狀態數據后會被轉換成輸出流。JGroups 會在狀態數據寫入完畢后關閉流,就算有異常發送也會這樣,因此你不需要自己來關閉流。
因為訪問狀態數據可能是并發的,我們必須做同步控制。然后調用 Util.objectToStream() 這個工具方法來將對象寫入流中。
setState() 方法在“狀態請求者”處調用,也就是調用了 getState() 方法的成員上,其任務就是從輸入流中讀取狀態數據并保存:
public void setState(InputStream input) throws Exception { List<String> list; list=(List<String>)Util.objectFromStream(new DataInputStream(input)); synchronized(state) { state.clear(); state.addAll(list); } System.out.println(list.size() + " messages in chat history):"); for(String str: list) { System.out.println(str); } }
我們再一次調用 JGroups 工具方法 (Util.objectFromStream()) 來從輸入流中創建一個對象,然后對 state 同步并賦值。
在接收完狀態數據后,我們打印了狀態數據中的聊天信息數。注意這里并沒有處理超大列表的情況,可能會發生不可預知的問題。
結論
在這篇教程中,我們向你展示了如何創建、加入和離開組,并給組成員發送和接收消息,獲取組成員的變化情況,同時實現了狀態的傳輸。所有這些都是通過 JGroups 的核心 API —— JChannel 和 Receiver 提供的。
還有兩個關于 JGroups 方面的內容沒有涉及到的,分別是:構建塊 (Building blocks) 和協議堆棧。
構建塊是一些類繼承自 JChannel 提供了更高級別的抽象層,例如請求響應、組范圍內的方法調用、復制哈希等等。
而協議堆棧允許對 JGroups 底層通訊協議進行定制,包括配置、移除、增強或者重寫全新的協議。
SimpleChat 的代碼可從 這里 獲取。
這里還有一些其他 JGroups 相關的資料:
- SimpleChat code: SimpleChat.java
- JGroups web site: http://www.jgroups.org
- Downloads: here
- JIRA bug tracking: http://jira.jboss.com/jira/browse/JGRP
- Mailing lists: http://sourceforge.net/mail/?group_id=6081