Mina實現自定義協議的通信
網絡的傳輸使用需要遵循一定的規則,這些規則我們稱為協議。如在互聯網請求HTML頁面的時候,我們要遵循HTTP協議,HTTP頭的格式就是我們要遵守的規則:
Request Headers Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8 Accept-Charset: GBK,utf-8;q=0.7,*;q=0.3 Accept-Encoding: gzip,deflate,sdch Accept-Language: zh-CN,zh;q=0.8 Cache-Control: max-age=0
比如我們在做一些第三方開發的時候,經常會按照固定的格式向服務器端發送請求,服務器驗證消息后,返回相應的結果。在Mina的開發中,我們也會用到這種模式。首先我們先來描述下這種方式的使用情形:
這樣的方式,目前,我所在的項目組中主要用于通信和傳輸。為了將文件從客戶端傳到云端,我們先在客戶端將數據根據一定的規則切片,然后通過一種特定的格式傳輸到服務器端。Mina,作為這中間的橋梁,秉承了框架很好的優點,快速開發。由于我們的服務器端采用的是C寫的,所以我只給出客戶端的編碼過程,解碼過程原理和開發都一樣。
用Mina執行一定協議的傳輸,主要有以下幾個步驟:
1. 設計通信協議;
2. 編寫請求(返回)對象和業務對象;
3. 編寫解碼(編碼)器;
4. 編寫客戶端、服務器端。

下面根據代碼,一步步介紹這個過程,由于是前提的測試代碼,所以沒有考慮線程安全等問題,只是一個思路:
首先是通信的協議,根據mina的構建規則,將協議設計成抽象類,由請求對象和返回對象分別去繼承,協議格式如下:
package com.a2.desktop.example5.mina.potocol; import org.apache.mina.core.buffer.IoBuffer; /** * 通信協議 * @author Chen.Hui * */ public abstract class AbsMessage { /** * 協議格式: * * tag | header length | Filename | File length | offset | checksum | temps | data * */ /** 請求或訪問類型 請求Tag:0x00 返回Tag:0x01 共 8 bit */ public abstract byte getTag(); /** 頭文件長度 共 2^16 可表示 65535 */ public abstract short getHeaderlen(); /** 根據UUID生成文件唯一標識,共 8*36=288 bit */ public abstract byte[] getFilename();//需要設計一個算法 /** 獲取文件長度 2^32=4GB 共 32 bit */ public abstract int getFileLen(); /** 獲取文件的偏移量offset 共 32 bit */ public abstract int getOffset(); /** 獲取文件的MD5校驗碼 共 32 bit */ public abstract byte[] getChecksum(); /** 預留字段 長度不超過 128 bit */ public abstract byte[] getTmp(); /**data 方式傳輸內容 不超過1024bit*/ public abstract IoBuffer getData(); }
下面是請求對象(返回)和業務對象,這里的業務對象主要功能是將文件切片:
package com.a2.desktop.example5.mina.potocol; import java.io.IOException; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 請求對象 * * @author Chen.Hui * */ public class InfoRequest extends AbsMessage { Logger logger = LoggerFactory.getLogger(InfoRequest.class); FilePiece piece; Charset charset; public InfoRequest(FilePiece piece) { this.piece = piece; } public InfoRequest(){ //empty } @Override public byte getTag() {// 0x01 請求包 return (byte) 0x01; } @Override public short getHeaderlen() { if (getTmp() == null) { short len = (short) (1 + 2 + 36 + 4 + 4 + 4 ); return len; } else { short len = (short) (1 + 2 + 36 + 4 + 4 + 4 + (short) getTmp().length); return len; } } @Override public int getFileLen() {// 文件總長度 try { return (int) piece.getFc().size(); } catch (IOException e) { e.printStackTrace(); } return 0; } @Override public int getOffset() {// 傳輸 偏移量 return piece.getOffset(); } @Override public byte[] getFilename() {// 文件名稱 /** check the bits of name */ byte[] name = new byte[36]; name = piece.getFilename().getBytes(); return name; } @Override public byte[] getChecksum() {// checksum byte[] checksum = new byte[4]; checksum = piece.getChecksum().getBytes(); return checksum; } @Override public byte[] getTmp() { byte[] b=new byte[5]; return b; } @Override public IoBuffer getData() { return piece.getBuf(); } }業務對象代碼,RandomAccessFile可用于隨機讀寫,用于文件的切片,這里還用了管道,主要目的是為了加開讀寫速度:
package com.a2.desktop.example5.mina.potocol; import java.io.File; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.UUID; import org.apache.mina.core.buffer.IoBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 分片文件操作類 * * @author Chen.Hui * */ public class FilePiece { Logger logger = LoggerFactory.getLogger(FilePiece.class); private ByteBuffer[] dsts; private IoBuffer buf; private String filename; private FileChannel fc; private RandomAccessFile raf; private int offset; private String checksum; /** 構建文件的基本信息 */ public FilePiece(String path, int offset) throws Exception { raf = new RandomAccessFile(new File(path), "rw"); fc = raf.getChannel(); this.offset = offset; dsts = new ByteBuffer[1024]; for (int i = 0; i < dsts.length; i++) { dsts[i] = ByteBuffer.allocate(1024); } fc.read(dsts, offset, 1024); buf=IoBuffer.allocate(1024); filename = UUID.randomUUID().toString(); logger.info("has built:" + filename + " filename size" + filename.length()); } /**這個方法還有點兒問題,數據取的不對*/ public IoBuffer getBuf(){ dsts[0].flip(); while(dsts[0].hasRemaining()){ buf.putChar(dsts[0].getChar()); } buf.flip(); return buf; } public String getFilename() { return filename; } public FileChannel getFc() { return fc; } public RandomAccessFile getRaf() { return raf; } public int getOffset() { return offset; } public String getChecksum() { // TODO checksum algorithems return "aaaa"; } }再接下來是編碼器 ,編碼器的作用就是講數據裝換成用于傳輸的流,在Mina中這種流就是IoBuffer:
package com.a2.desktop.example5.mina.potocol; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; /** * 編碼器 * @author Chen.Hui * */ public class InfoEncoder implements MessageEncoder{ private Charset charset; public InfoEncoder(Charset charset){ this.charset=charset; } @Override public void encode(IoSession session, AbsMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buf=IoBuffer.allocate(1024).setAutoExpand(true); if(message instanceof InfoRequest){ InfoRequest req=(InfoRequest) message; buf.put(req.getTag()); buf.putShort((short)req.getHeaderlen()); buf.put(req.getFilename()); buf.putInt(req.getFileLen()); buf.putInt(req.getOffset()); buf.put(req.getChecksum()); buf.put(req.getTmp()); buf.put(req.getData()); }else if(message instanceof InfoResponse){ //TODO } buf.flip(); out.write(buf); } }
解碼器與之類似,解碼器在這里的作用主要用戶服務器端解碼:
package com.a2.desktop.example5.mina.potocol; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; /** * 解碼器 * @author ChenHui * */ public class InfoDecoder implements MessageDecoder { private Charset charset; public InfoDecoder(Charset charset) { this.charset = charset; } @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { //System.out.println("package size:"+in.remaining()); // 報頭長度<56 if (in.remaining() < 56) { return MessageDecoderResult.NEED_DATA; } byte tag = in.get(); short head_len=in.getShort(); if (tag == (short) 0x01) { System.out.println("請求標識符:"+tag+" head length:"+head_len); }else{ //System.out.println("未知標識符..."); return MessageDecoderResult.NOT_OK; } return MessageDecoderResult.OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { byte tag=in.get(); if(tag==0x01){ InfoReqContainer irc=new InfoReqContainer(); irc.setTag(tag); irc.setHeadlen(in.getShort()); irc.setFilename(in.getString(36, charset.newDecoder())); irc.setFilelen(in.getInt()); irc.setOffset(in.getInt()); irc.setChecksum(in.getString(4, charset.newDecoder())); irc.setTemp(in.getString(5, charset.newDecoder()));//應該用head len-53 irc.setData(in); out.write(irc); } return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // TODO Auto-generated method stub } }為了解碼方便,我這里設計了一個輔助類InfoReqContainer,主要用戶輔助操作:
package com.a2.desktop.example5.mina.potocol; import org.apache.mina.core.buffer.IoBuffer; /** * 請求對象解析類 * * @author Chen.Hui * */ public class InfoReqContainer { private byte tag; private short headlen; private String filename; private int filelen; private int offset; private String temp; private String checksum; private IoBuffer data; public byte getTag() { return tag; } public void setTag(byte tag) { this.tag = tag; } public short getHeadlen() { return headlen; } public void setHeadlen(short headlen) { this.headlen = headlen; } public String getFilename() { return filename; } public void setFilename(String filename) { this.filename = filename; } public int getFilelen() { return filelen; } public void setFilelen(int filelen) { this.filelen = filelen; } public int getOffset() { return offset; } public void setOffset(int offset) { this.offset = offset; } public String getTemp() { return temp; } public void setTemp(String temp) { this.temp = temp; } public String getChecksum() { return checksum; } public void setChecksum(String checksum) { this.checksum = checksum; } public IoBuffer getData() { return data; } public void setData(IoBuffer data) { this.data = data; } }
在mina中,要將解碼器和編碼器綁定到協議工廠類中,才能被過濾器使用:
package com.a2.desktop.example5.mina.potocol; import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageEncoder; public class InfoCodecFactory extends DemuxingProtocolCodecFactory { private MessageDecoder decoder; private MessageEncoderencoder; public InfoCodecFactory(MessageDecoder decoder, MessageEncoder encoder) { this.decoder = decoder; this.encoder = encoder; addMessageDecoder(this.decoder); addMessageEncoder(AbsMessage.class, this.encoder); } }
做完了這些,編碼解碼的工作都完成了,最后就是寫客戶端和服務器端進行測試,注意文件位置,這里沒有做提示:
package com.a2.desktop.example5.mina.potocol; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class TestClient { private static String HOST = "127.0.0.1"; private static int PORT = 8082; public static void main(String[] args) { // 創建一個非阻塞的客戶端程序 IoConnector connector = new NioSocketConnector(); // 設置鏈接超時時間 connector.setConnectTimeout(30000); // 添加過濾器 connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new InfoCodecFactory( new InfoDecoder(Charset.forName("utf-8")), new InfoEncoder(Charset.forName("utf-8"))))); // 添加業務邏輯處理器類 connector.setHandler(new ClientHandler()); IoSession session = null; try { ConnectFuture future = connector.connect(new InetSocketAddress( HOST, PORT));// 創建連接 future.awaitUninterruptibly();// 等待連接創建完成 session = future.getSession();// 獲得session FilePiece piece = new FilePiece( "D:\\Develop Libs Tar\\apache-mina-2.0.7-bin.zip", 0); InfoRequest ir = new InfoRequest(piece); session.write(ir);// 發送消息 } catch (Exception e) { e.printStackTrace(); System.out.println("客戶端鏈接異常..."); } session.getCloseFuture().awaitUninterruptibly();// 等待連接斷開 connector.dispose(); } }
客戶端的Handler沒有做任何處理:
package com.a2.desktop.example5.mina.potocol; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; public class ClientHandler implements IoHandler { @Override public void sessionCreated(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionOpened(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionClosed(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { // TODO Auto-generated method stub } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { // TODO Auto-generated method stub } @Override public void messageReceived(IoSession session, Object message) throws Exception { //System.out.println(message.toString()); } @Override public void messageSent(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub } }服務器端:
package com.a2.desktop.example5.mina.potocol; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSessionConfig; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LogLevel; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class TestServer{ private static int PORT = 8082; public static void main(String[] args) { IoAcceptor acceptor = null; try { // 創建一個非阻塞的server端的Socket acceptor = new NioSocketAcceptor(); // 設置過濾器(添加自帶的編解碼器) acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new InfoCodecFactory( new InfoDecoder(Charset.forName("utf-8")), new InfoEncoder(Charset.forName("utf-8"))))); // 設置日志過濾器 LoggingFilter lf = new LoggingFilter(); lf.setMessageReceivedLogLevel(LogLevel.DEBUG); acceptor.getFilterChain().addLast("logger", lf); // 獲得IoSessionConfig對象 IoSessionConfig cfg = acceptor.getSessionConfig(); // 讀寫通道10秒內無操作進入空閑狀態 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 100); // 綁定邏輯處理器 acceptor.setHandler(new ServerHandler()); // 綁定端口 acceptor.bind(new InetSocketAddress(PORT)); System.out.println("成功開啟服務器端..."); } catch (Exception e) { e.printStackTrace(); } } }
服務器端的Handler:
package com.a2.desktop.example5.mina.potocol; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; public class ServerHandler implements IoHandler { @Override public void sessionCreated(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionOpened(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionClosed(IoSession session) throws Exception { // TODO Auto-generated method stub } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { // TODO Auto-generated method stub } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { // TODO Auto-generated method stub } @Override public void messageReceived(IoSession session, Object message) throws Exception { if (message instanceof InfoReqContainer) { InfoReqContainer irc = (InfoReqContainer) message; System.out.println("服務器端 獲取成功 Tag:" + irc.getTag() + "\r\n " + "head len:" + irc.getHeadlen() + "\r\nfilename: " + irc.getFilename() + "\r\nfile len:"+irc.getFilelen()+"\r\noffset:"+irc.getOffset()+"\r\nchecksum:"+irc.getChecksum()+"\r\ndata:"+irc.getData().toString()); session.write("success rescive"); } else { System.out.println("獲取失敗"); } } @Override public void messageSent(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub } }主要還是提供一個思路,并沒用很多業務的東西,也沒有做事務和并發處理,當然這在項目中一定會處理。Mina很好用,用原生的NIO也同樣能實現,原理都一樣。Mina的異步通信原理網上一大堆,我就不寫了。
謝謝觀賞。
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!