Mina實現自定義協議的通信

jopen 12年前發布 | 114K 次閱讀 網絡工具包 Apache MINA MINA

網絡的傳輸使用需要遵循一定的規則,這些規則我們稱為協議。如在互聯網請求HTML頁面的時候,我們要遵循HTTP協議,HTTP頭的格式就是我們要遵守的規則:

Request Headers
Accept:
text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Charset:
GBK,utf-8;q=0.7,*;q=0.3
Accept-Encoding:
gzip,deflate,sdch
Accept-Language:
zh-CN,zh;q=0.8
Cache-Control:
max-age=0

比如我們在做一些第三方開發的時候,經常會按照固定的格式向服務器端發送請求,服務器驗證消息后,返回相應的結果。在Mina的開發中,我們也會用到這種模式。首先我們先來描述下這種方式的使用情形:

這樣的方式,目前,我所在的項目組中主要用于通信和傳輸。為了將文件從客戶端傳到云端,我們先在客戶端將數據根據一定的規則切片,然后通過一種特定的格式傳輸到服務器端。Mina,作為這中間的橋梁,秉承了框架很好的優點,快速開發。由于我們的服務器端采用的是C寫的,所以我只給出客戶端的編碼過程,解碼過程原理和開發都一樣。

 用Mina執行一定協議的傳輸,主要有以下幾個步驟:

1.         設計通信協議;

2.         編寫請求(返回)對象和業務對象;

3.         編寫解碼(編碼)器;

4.         編寫客戶端、服務器端。

Mina實現自定義協議的通信

下面根據代碼,一步步介紹這個過程,由于是前提的測試代碼,所以沒有考慮線程安全等問題,只是一個思路:

首先是通信的協議,根據mina的構建規則,將協議設計成抽象類,由請求對象和返回對象分別去繼承,協議格式如下:

package com.a2.desktop.example5.mina.potocol;


import org.apache.mina.core.buffer.IoBuffer;

/**
 * 通信協議
 * @author Chen.Hui
 * 
 */
public abstract class AbsMessage {

    /**
     * 協議格式:
     * 
     * tag | header length | Filename | File length | offset | checksum | temps | data
     * 
     */

    /** 請求或訪問類型 請求Tag:0x00 返回Tag:0x01 共 8 bit */
    public abstract byte getTag();

    /** 頭文件長度 共 2^16 可表示 65535 */
    public abstract short getHeaderlen();

    /** 根據UUID生成文件唯一標識,共 8*36=288 bit */
    public abstract byte[] getFilename();//需要設計一個算法

    /** 獲取文件長度 2^32=4GB 共 32 bit */
    public abstract int getFileLen();

    /** 獲取文件的偏移量offset 共 32 bit */
    public abstract int getOffset();

    /** 獲取文件的MD5校驗碼 共 32 bit */
    public abstract byte[] getChecksum();

    /** 預留字段 長度不超過 128 bit */
    public abstract byte[] getTmp();

    /**data 方式傳輸內容 不超過1024bit*/
    public abstract IoBuffer getData();

}

下面是請求對象(返回)和業務對象,這里的業務對象主要功能是將文件切片:

package com.a2.desktop.example5.mina.potocol;

import java.io.IOException;
import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 請求對象
 * 
 * @author Chen.Hui
 * 
 */
public class InfoRequest extends AbsMessage {

    Logger logger = LoggerFactory.getLogger(InfoRequest.class);

    FilePiece piece;

    Charset charset;

    public InfoRequest(FilePiece piece) {
        this.piece = piece;
    }

    public InfoRequest(){
        //empty
    }

    @Override
    public byte getTag() {// 0x01 請求包
        return (byte) 0x01;
    }

    @Override
    public short getHeaderlen() {
        if (getTmp() == null) {
            short len = (short) (1 + 2 + 36 + 4 + 4 + 4 );
            return len;
        } else {
            short len = (short) (1 + 2 + 36 + 4 + 4 + 4 + (short) getTmp().length);
            return len;
        }
    }

    @Override
    public int getFileLen() {// 文件總長度

        try {
            return (int) piece.getFc().size();

        } catch (IOException e) {
            e.printStackTrace();
        }
        return 0;
    }

    @Override
    public int getOffset() {// 傳輸 偏移量

        return piece.getOffset();

    }

    @Override
    public byte[] getFilename() {// 文件名稱

        /** check the bits of name */
        byte[] name = new byte[36];
        name = piece.getFilename().getBytes();

        return name;

    }

    @Override
    public byte[] getChecksum() {// checksum

        byte[] checksum = new byte[4];
        checksum = piece.getChecksum().getBytes();  
        return checksum;
    }

    @Override
    public byte[] getTmp() {
        byte[] b=new byte[5];
        return b;
    }

    @Override
    public IoBuffer getData() {
        return piece.getBuf();
    }
}
業務對象代碼,RandomAccessFile可用于隨機讀寫,用于文件的切片,這里還用了管道,主要目的是為了加開讀寫速度:



package com.a2.desktop.example5.mina.potocol;

import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.UUID;

import org.apache.mina.core.buffer.IoBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 分片文件操作類
 * 
 * @author Chen.Hui
 * 
 */
public class FilePiece {

    Logger logger = LoggerFactory.getLogger(FilePiece.class);

    private ByteBuffer[] dsts;

    private IoBuffer buf;

    private String filename;

    private FileChannel fc;

    private RandomAccessFile raf;

    private int offset;

    private String checksum;

    /** 構建文件的基本信息 */
    public FilePiece(String path, int offset) throws Exception {

        raf = new RandomAccessFile(new File(path), "rw");
        fc = raf.getChannel();

        this.offset = offset;

        dsts = new ByteBuffer[1024];

        for (int i = 0; i < dsts.length; i++) {
            dsts[i] = ByteBuffer.allocate(1024);
        }

        fc.read(dsts, offset, 1024);


        buf=IoBuffer.allocate(1024);

        filename = UUID.randomUUID().toString();
        logger.info("has built:" + filename + " filename size"
                + filename.length());

    }
    /**這個方法還有點兒問題,數據取的不對*/
    public IoBuffer getBuf(){
        dsts[0].flip();
        while(dsts[0].hasRemaining()){
            buf.putChar(dsts[0].getChar());
        }
        buf.flip();
        return buf;
    }

    public String getFilename() {
        return filename;
    }

    public FileChannel getFc() {
        return fc;
    }

    public RandomAccessFile getRaf() {
        return raf;
    }

    public int getOffset() {
        return offset;
    }

    public String getChecksum() {
        // TODO checksum algorithems
        return "aaaa";
    }
}
再接下來是編碼器 ,編碼器的作用就是講數據裝換成用于傳輸的流,在Mina中這種流就是IoBuffer
package com.a2.desktop.example5.mina.potocol;

import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.demux.MessageEncoder;

/**
 * 編碼器
 * @author Chen.Hui
 *
 */
public class InfoEncoder implements MessageEncoder{

    private Charset charset;

    public InfoEncoder(Charset charset){
        this.charset=charset;
    }

    @Override
    public void encode(IoSession session, AbsMessage message,
            ProtocolEncoderOutput out) throws Exception {

        IoBuffer buf=IoBuffer.allocate(1024).setAutoExpand(true);

        if(message instanceof InfoRequest){

            InfoRequest req=(InfoRequest) message;
            buf.put(req.getTag());
            buf.putShort((short)req.getHeaderlen());
            buf.put(req.getFilename());
            buf.putInt(req.getFileLen());
            buf.putInt(req.getOffset());
            buf.put(req.getChecksum());
            buf.put(req.getTmp());
            buf.put(req.getData());

        }else if(message instanceof InfoResponse){
            //TODO
        }

        buf.flip();

        out.write(buf);
    }
}

解碼器與之類似,解碼器在這里的作用主要用戶服務器端解碼:

package com.a2.desktop.example5.mina.potocol;

import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageDecoderResult;
/**
 * 解碼器
 * @author ChenHui
 *
 */
public class InfoDecoder implements MessageDecoder {

    private Charset charset;

    public InfoDecoder(Charset charset) {
        this.charset = charset;
    }

    @Override
    public MessageDecoderResult decodable(IoSession session, IoBuffer in) {

        //System.out.println("package size:"+in.remaining());
        // 報頭長度<56
        if (in.remaining() < 56) {
            return MessageDecoderResult.NEED_DATA;
        }

        byte tag = in.get();
        short head_len=in.getShort();

        if (tag == (short) 0x01) {
            System.out.println("請求標識符:"+tag+" head length:"+head_len);
        }else{
            //System.out.println("未知標識符...");
            return MessageDecoderResult.NOT_OK;
        }

        return MessageDecoderResult.OK;
    }

    @Override
    public MessageDecoderResult decode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception {
        byte tag=in.get();

        if(tag==0x01){
            InfoReqContainer irc=new InfoReqContainer();
            irc.setTag(tag);
            irc.setHeadlen(in.getShort());
            irc.setFilename(in.getString(36, charset.newDecoder()));
            irc.setFilelen(in.getInt());
            irc.setOffset(in.getInt());
            irc.setChecksum(in.getString(4, charset.newDecoder()));
            irc.setTemp(in.getString(5, charset.newDecoder()));//應該用head len-53
            irc.setData(in);

            out.write(irc);
        }

        return MessageDecoderResult.OK;
    }

    @Override
    public void finishDecode(IoSession session, ProtocolDecoderOutput out)
            throws Exception {
        // TODO Auto-generated method stub

    }

}
為了解碼方便,我這里設計了一個輔助類InfoReqContainer,主要用戶輔助操作:

package com.a2.desktop.example5.mina.potocol;

import org.apache.mina.core.buffer.IoBuffer;

/**
 * 請求對象解析類
 * 
 * @author Chen.Hui
 * 
 */
public class InfoReqContainer {
    private byte tag;
    private short headlen;
    private String filename;
    private int filelen;
    private int offset;
    private String temp;
    private String checksum;
    private IoBuffer data;

    public byte getTag() {
        return tag;
    }

    public void setTag(byte tag) {
        this.tag = tag;
    }

    public short getHeadlen() {
        return headlen;
    }

    public void setHeadlen(short headlen) {
        this.headlen = headlen;
    }

    public String getFilename() {
        return filename;
    }

    public void setFilename(String filename) {
        this.filename = filename;
    }

    public int getFilelen() {
        return filelen;
    }

    public void setFilelen(int filelen) {
        this.filelen = filelen;
    }

    public int getOffset() {
        return offset;
    }

    public void setOffset(int offset) {
        this.offset = offset;
    }

    public String getTemp() {
        return temp;
    }

    public void setTemp(String temp) {
        this.temp = temp;
    }

    public String getChecksum() {
        return checksum;
    }

    public void setChecksum(String checksum) {
        this.checksum = checksum;
    }

    public IoBuffer getData() {
        return data;
    }

    public void setData(IoBuffer data) {
        this.data = data;
    }

}

mina中,要將解碼器和編碼器綁定到協議工廠類中,才能被過濾器使用:

package com.a2.desktop.example5.mina.potocol;

import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageEncoder;

public class InfoCodecFactory extends DemuxingProtocolCodecFactory {
    private MessageDecoder decoder;

    private MessageEncoder encoder;

    public InfoCodecFactory(MessageDecoder decoder,
            MessageEncoder encoder) {
        this.decoder = decoder;
        this.encoder = encoder;
        addMessageDecoder(this.decoder);
        addMessageEncoder(AbsMessage.class, this.encoder);
    }
}

做完了這些,編碼解碼的工作都完成了,最后就是寫客戶端和服務器端進行測試,注意文件位置,這里沒有做提示:

package com.a2.desktop.example5.mina.potocol;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class TestClient {

    private static String HOST = "127.0.0.1";

    private static int PORT = 8082;

    public static void main(String[] args) {
        // 創建一個非阻塞的客戶端程序
        IoConnector connector = new NioSocketConnector();
        // 設置鏈接超時時間
        connector.setConnectTimeout(30000);
        // 添加過濾器
        connector.getFilterChain().addLast(
                "codec",
                new ProtocolCodecFilter(new InfoCodecFactory(
                        new InfoDecoder(Charset.forName("utf-8")),
                        new InfoEncoder(Charset.forName("utf-8")))));
        // 添加業務邏輯處理器類
        connector.setHandler(new ClientHandler());
        IoSession session = null;
        try {
            ConnectFuture future = connector.connect(new InetSocketAddress(
                    HOST, PORT));// 創建連接
            future.awaitUninterruptibly();// 等待連接創建完成
            session = future.getSession();// 獲得session

            FilePiece piece = new FilePiece(
                    "D:\\Develop Libs Tar\\apache-mina-2.0.7-bin.zip", 0);

            InfoRequest ir = new InfoRequest(piece);

            session.write(ir);// 發送消息


        } catch (Exception e) {
e.printStackTrace();
            System.out.println("客戶端鏈接異常...");
        }

        session.getCloseFuture().awaitUninterruptibly();// 等待連接斷開
        connector.dispose();
    }

}

客戶端的Handler沒有做任何處理:

package com.a2.desktop.example5.mina.potocol;

import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

public class ClientHandler implements IoHandler {

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status)
            throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {
        //System.out.println(message.toString());
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        // TODO Auto-generated method stub

    }

}
服務器端:
package com.a2.desktop.example5.mina.potocol;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;



public class TestServer{

    private static int PORT = 8082;

    public static void main(String[] args) {
        IoAcceptor acceptor = null;
        try {
            // 創建一個非阻塞的server端的Socket
            acceptor = new NioSocketAcceptor();

            // 設置過濾器(添加自帶的編解碼器)
            acceptor.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter(new InfoCodecFactory(
                            new InfoDecoder(Charset.forName("utf-8")),
                            new InfoEncoder(Charset.forName("utf-8")))));
            // 設置日志過濾器
            LoggingFilter lf = new LoggingFilter();
            lf.setMessageReceivedLogLevel(LogLevel.DEBUG);
            acceptor.getFilterChain().addLast("logger", lf);
            // 獲得IoSessionConfig對象
            IoSessionConfig cfg = acceptor.getSessionConfig();
            // 讀寫通道10秒內無操作進入空閑狀態
            cfg.setIdleTime(IdleStatus.BOTH_IDLE, 100);

            // 綁定邏輯處理器
            acceptor.setHandler(new ServerHandler());
            // 綁定端口
            acceptor.bind(new InetSocketAddress(PORT));
            System.out.println("成功開啟服務器端...");

        } catch (Exception e) {

            e.printStackTrace();
        }
    }
}        

服務器端的Handler

package com.a2.desktop.example5.mina.potocol;

import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

public class ServerHandler implements IoHandler {

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status)
            throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {
        if (message instanceof InfoReqContainer) {
            InfoReqContainer irc = (InfoReqContainer) message;
            System.out.println("服務器端 獲取成功 Tag:" + irc.getTag() + "\r\n " + "head len:"
                    + irc.getHeadlen() + "\r\nfilename: " + irc.getFilename()
                    + "\r\nfile len:"+irc.getFilelen()+"\r\noffset:"+irc.getOffset()+"\r\nchecksum:"+irc.getChecksum()+"\r\ndata:"+irc.getData().toString());

            session.write("success rescive");

        } else {
            System.out.println("獲取失敗");
        }

    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        // TODO Auto-generated method stub

    }

}
主要還是提供一個思路,并沒用很多業務的東西,也沒有做事務和并發處理,當然這在項目中一定會處理。Mina很好用,用原生的NIO也同樣能實現,原理都一樣。Mina的異步通信原理網上一大堆,我就不寫了。

謝謝觀賞。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!