Mina實現自定義協議的通信
網絡的傳輸使用需要遵循一定的規則,這些規則我們稱為協議。如在互聯網請求HTML頁面的時候,我們要遵循HTTP協議,HTTP頭的格式就是我們要遵守的規則:
Request Headers Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8 Accept-Charset: GBK,utf-8;q=0.7,*;q=0.3 Accept-Encoding: gzip,deflate,sdch Accept-Language: zh-CN,zh;q=0.8 Cache-Control: max-age=0
比如我們在做一些第三方開發的時候,經常會按照固定的格式向服務器端發送請求,服務器驗證消息后,返回相應的結果。在Mina的開發中,我們也會用到這種模式。首先我們先來描述下這種方式的使用情形:
這樣的方式,目前,我所在的項目組中主要用于通信和傳輸。為了將文件從客戶端傳到云端,我們先在客戶端將數據根據一定的規則切片,然后通過一種特定的格式傳輸到服務器端。Mina,作為這中間的橋梁,秉承了框架很好的優點,快速開發。由于我們的服務器端采用的是C寫的,所以我只給出客戶端的編碼過程,解碼過程原理和開發都一樣。
用Mina執行一定協議的傳輸,主要有以下幾個步驟:
1. 設計通信協議;
2. 編寫請求(返回)對象和業務對象;
3. 編寫解碼(編碼)器;
4. 編寫客戶端、服務器端。
下面根據代碼,一步步介紹這個過程,由于是前提的測試代碼,所以沒有考慮線程安全等問題,只是一個思路:
首先是通信的協議,根據mina的構建規則,將協議設計成抽象類,由請求對象和返回對象分別去繼承,協議格式如下:
package com.a2.desktop.example5.mina.potocol;
import org.apache.mina.core.buffer.IoBuffer;
/**
* 通信協議
* @author Chen.Hui
*
*/
public abstract class AbsMessage {
/**
* 協議格式:
*
* tag | header length | Filename | File length | offset | checksum | temps | data
*
*/
/** 請求或訪問類型 請求Tag:0x00 返回Tag:0x01 共 8 bit */
public abstract byte getTag();
/** 頭文件長度 共 2^16 可表示 65535 */
public abstract short getHeaderlen();
/** 根據UUID生成文件唯一標識,共 8*36=288 bit */
public abstract byte[] getFilename();//需要設計一個算法
/** 獲取文件長度 2^32=4GB 共 32 bit */
public abstract int getFileLen();
/** 獲取文件的偏移量offset 共 32 bit */
public abstract int getOffset();
/** 獲取文件的MD5校驗碼 共 32 bit */
public abstract byte[] getChecksum();
/** 預留字段 長度不超過 128 bit */
public abstract byte[] getTmp();
/**data 方式傳輸內容 不超過1024bit*/
public abstract IoBuffer getData();
} 下面是請求對象(返回)和業務對象,這里的業務對象主要功能是將文件切片:
package com.a2.desktop.example5.mina.potocol;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.IoBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 請求對象
*
* @author Chen.Hui
*
*/
public class InfoRequest extends AbsMessage {
Logger logger = LoggerFactory.getLogger(InfoRequest.class);
FilePiece piece;
Charset charset;
public InfoRequest(FilePiece piece) {
this.piece = piece;
}
public InfoRequest(){
//empty
}
@Override
public byte getTag() {// 0x01 請求包
return (byte) 0x01;
}
@Override
public short getHeaderlen() {
if (getTmp() == null) {
short len = (short) (1 + 2 + 36 + 4 + 4 + 4 );
return len;
} else {
short len = (short) (1 + 2 + 36 + 4 + 4 + 4 + (short) getTmp().length);
return len;
}
}
@Override
public int getFileLen() {// 文件總長度
try {
return (int) piece.getFc().size();
} catch (IOException e) {
e.printStackTrace();
}
return 0;
}
@Override
public int getOffset() {// 傳輸 偏移量
return piece.getOffset();
}
@Override
public byte[] getFilename() {// 文件名稱
/** check the bits of name */
byte[] name = new byte[36];
name = piece.getFilename().getBytes();
return name;
}
@Override
public byte[] getChecksum() {// checksum
byte[] checksum = new byte[4];
checksum = piece.getChecksum().getBytes();
return checksum;
}
@Override
public byte[] getTmp() {
byte[] b=new byte[5];
return b;
}
@Override
public IoBuffer getData() {
return piece.getBuf();
}
}業務對象代碼,RandomAccessFile可用于隨機讀寫,用于文件的切片,這里還用了管道,主要目的是為了加開讀寫速度:
package com.a2.desktop.example5.mina.potocol;
import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.UUID;
import org.apache.mina.core.buffer.IoBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 分片文件操作類
*
* @author Chen.Hui
*
*/
public class FilePiece {
Logger logger = LoggerFactory.getLogger(FilePiece.class);
private ByteBuffer[] dsts;
private IoBuffer buf;
private String filename;
private FileChannel fc;
private RandomAccessFile raf;
private int offset;
private String checksum;
/** 構建文件的基本信息 */
public FilePiece(String path, int offset) throws Exception {
raf = new RandomAccessFile(new File(path), "rw");
fc = raf.getChannel();
this.offset = offset;
dsts = new ByteBuffer[1024];
for (int i = 0; i < dsts.length; i++) {
dsts[i] = ByteBuffer.allocate(1024);
}
fc.read(dsts, offset, 1024);
buf=IoBuffer.allocate(1024);
filename = UUID.randomUUID().toString();
logger.info("has built:" + filename + " filename size"
+ filename.length());
}
/**這個方法還有點兒問題,數據取的不對*/
public IoBuffer getBuf(){
dsts[0].flip();
while(dsts[0].hasRemaining()){
buf.putChar(dsts[0].getChar());
}
buf.flip();
return buf;
}
public String getFilename() {
return filename;
}
public FileChannel getFc() {
return fc;
}
public RandomAccessFile getRaf() {
return raf;
}
public int getOffset() {
return offset;
}
public String getChecksum() {
// TODO checksum algorithems
return "aaaa";
}
}再接下來是編碼器 ,編碼器的作用就是講數據裝換成用于傳輸的流,在Mina中這種流就是IoBuffer: package com.a2.desktop.example5.mina.potocol; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; /** * 編碼器 * @author Chen.Hui * */ public class InfoEncoder implements MessageEncoder{ private Charset charset; public InfoEncoder(Charset charset){ this.charset=charset; } @Override public void encode(IoSession session, AbsMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buf=IoBuffer.allocate(1024).setAutoExpand(true); if(message instanceof InfoRequest){ InfoRequest req=(InfoRequest) message; buf.put(req.getTag()); buf.putShort((short)req.getHeaderlen()); buf.put(req.getFilename()); buf.putInt(req.getFileLen()); buf.putInt(req.getOffset()); buf.put(req.getChecksum()); buf.put(req.getTmp()); buf.put(req.getData()); }else if(message instanceof InfoResponse){ //TODO } buf.flip(); out.write(buf); } }
解碼器與之類似,解碼器在這里的作用主要用戶服務器端解碼:
package com.a2.desktop.example5.mina.potocol;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageDecoderResult;
/**
* 解碼器
* @author ChenHui
*
*/
public class InfoDecoder implements MessageDecoder {
private Charset charset;
public InfoDecoder(Charset charset) {
this.charset = charset;
}
@Override
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
//System.out.println("package size:"+in.remaining());
// 報頭長度<56
if (in.remaining() < 56) {
return MessageDecoderResult.NEED_DATA;
}
byte tag = in.get();
short head_len=in.getShort();
if (tag == (short) 0x01) {
System.out.println("請求標識符:"+tag+" head length:"+head_len);
}else{
//System.out.println("未知標識符...");
return MessageDecoderResult.NOT_OK;
}
return MessageDecoderResult.OK;
}
@Override
public MessageDecoderResult decode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
byte tag=in.get();
if(tag==0x01){
InfoReqContainer irc=new InfoReqContainer();
irc.setTag(tag);
irc.setHeadlen(in.getShort());
irc.setFilename(in.getString(36, charset.newDecoder()));
irc.setFilelen(in.getInt());
irc.setOffset(in.getInt());
irc.setChecksum(in.getString(4, charset.newDecoder()));
irc.setTemp(in.getString(5, charset.newDecoder()));//應該用head len-53
irc.setData(in);
out.write(irc);
}
return MessageDecoderResult.OK;
}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception {
// TODO Auto-generated method stub
}
}為了解碼方便,我這里設計了一個輔助類InfoReqContainer,主要用戶輔助操作:
package com.a2.desktop.example5.mina.potocol;
import org.apache.mina.core.buffer.IoBuffer;
/**
* 請求對象解析類
*
* @author Chen.Hui
*
*/
public class InfoReqContainer {
private byte tag;
private short headlen;
private String filename;
private int filelen;
private int offset;
private String temp;
private String checksum;
private IoBuffer data;
public byte getTag() {
return tag;
}
public void setTag(byte tag) {
this.tag = tag;
}
public short getHeadlen() {
return headlen;
}
public void setHeadlen(short headlen) {
this.headlen = headlen;
}
public String getFilename() {
return filename;
}
public void setFilename(String filename) {
this.filename = filename;
}
public int getFilelen() {
return filelen;
}
public void setFilelen(int filelen) {
this.filelen = filelen;
}
public int getOffset() {
return offset;
}
public void setOffset(int offset) {
this.offset = offset;
}
public String getTemp() {
return temp;
}
public void setTemp(String temp) {
this.temp = temp;
}
public String getChecksum() {
return checksum;
}
public void setChecksum(String checksum) {
this.checksum = checksum;
}
public IoBuffer getData() {
return data;
}
public void setData(IoBuffer data) {
this.data = data;
}
} 在mina中,要將解碼器和編碼器綁定到協議工廠類中,才能被過濾器使用:
package com.a2.desktop.example5.mina.potocol;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageEncoder;
public class InfoCodecFactory extends DemuxingProtocolCodecFactory {
private MessageDecoder decoder;
private MessageEncoder encoder;
public InfoCodecFactory(MessageDecoder decoder,
MessageEncoder encoder) {
this.decoder = decoder;
this.encoder = encoder;
addMessageDecoder(this.decoder);
addMessageEncoder(AbsMessage.class, this.encoder);
}
} 做完了這些,編碼解碼的工作都完成了,最后就是寫客戶端和服務器端進行測試,注意文件位置,這里沒有做提示:
package com.a2.desktop.example5.mina.potocol;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class TestClient {
private static String HOST = "127.0.0.1";
private static int PORT = 8082;
public static void main(String[] args) {
// 創建一個非阻塞的客戶端程序
IoConnector connector = new NioSocketConnector();
// 設置鏈接超時時間
connector.setConnectTimeout(30000);
// 添加過濾器
connector.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new InfoCodecFactory(
new InfoDecoder(Charset.forName("utf-8")),
new InfoEncoder(Charset.forName("utf-8")))));
// 添加業務邏輯處理器類
connector.setHandler(new ClientHandler());
IoSession session = null;
try {
ConnectFuture future = connector.connect(new InetSocketAddress(
HOST, PORT));// 創建連接
future.awaitUninterruptibly();// 等待連接創建完成
session = future.getSession();// 獲得session
FilePiece piece = new FilePiece(
"D:\\Develop Libs Tar\\apache-mina-2.0.7-bin.zip", 0);
InfoRequest ir = new InfoRequest(piece);
session.write(ir);// 發送消息
} catch (Exception e) {
e.printStackTrace();
System.out.println("客戶端鏈接異常...");
}
session.getCloseFuture().awaitUninterruptibly();// 等待連接斷開
connector.dispose();
}
} 客戶端的Handler沒有做任何處理:
package com.a2.desktop.example5.mina.potocol;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class ClientHandler implements IoHandler {
@Override
public void sessionCreated(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void sessionOpened(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void sessionClosed(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
// TODO Auto-generated method stub
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
//System.out.println(message.toString());
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
}
}服務器端: package com.a2.desktop.example5.mina.potocol;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class TestServer{
private static int PORT = 8082;
public static void main(String[] args) {
IoAcceptor acceptor = null;
try {
// 創建一個非阻塞的server端的Socket
acceptor = new NioSocketAcceptor();
// 設置過濾器(添加自帶的編解碼器)
acceptor.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new InfoCodecFactory(
new InfoDecoder(Charset.forName("utf-8")),
new InfoEncoder(Charset.forName("utf-8")))));
// 設置日志過濾器
LoggingFilter lf = new LoggingFilter();
lf.setMessageReceivedLogLevel(LogLevel.DEBUG);
acceptor.getFilterChain().addLast("logger", lf);
// 獲得IoSessionConfig對象
IoSessionConfig cfg = acceptor.getSessionConfig();
// 讀寫通道10秒內無操作進入空閑狀態
cfg.setIdleTime(IdleStatus.BOTH_IDLE, 100);
// 綁定邏輯處理器
acceptor.setHandler(new ServerHandler());
// 綁定端口
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("成功開啟服務器端...");
} catch (Exception e) {
e.printStackTrace();
}
}
} 服務器端的Handler:
package com.a2.desktop.example5.mina.potocol;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class ServerHandler implements IoHandler {
@Override
public void sessionCreated(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void sessionOpened(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void sessionClosed(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
// TODO Auto-generated method stub
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
if (message instanceof InfoReqContainer) {
InfoReqContainer irc = (InfoReqContainer) message;
System.out.println("服務器端 獲取成功 Tag:" + irc.getTag() + "\r\n " + "head len:"
+ irc.getHeadlen() + "\r\nfilename: " + irc.getFilename()
+ "\r\nfile len:"+irc.getFilelen()+"\r\noffset:"+irc.getOffset()+"\r\nchecksum:"+irc.getChecksum()+"\r\ndata:"+irc.getData().toString());
session.write("success rescive");
} else {
System.out.println("獲取失敗");
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
}
}主要還是提供一個思路,并沒用很多業務的東西,也沒有做事務和并發處理,當然這在項目中一定會處理。Mina很好用,用原生的NIO也同樣能實現,原理都一樣。Mina的異步通信原理網上一大堆,我就不寫了。 謝謝觀賞。
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!