Apache Mina工具類

jopen 11年前發布 | 43K 次閱讀 網絡工具包 Apache MINA

MINA(Multipurpose Infrastructure for Network Applications)是用于開發高性能和高可用性的網絡應用程序的基礎框架。通過使用MINA框架可以可以省下處理底層I/O和線程并發等復雜工作,開發人員能夠把更多的精力投入到業務設計和開發當中。MINA框架的應用比較廣泛,應用的開源項目有Apache Directory、AsyncWeb、Apache Qpid、QuickFIX/J、Openfire、SubEthaSTMP、red5等。

MINA框架的特點有:基于java NIO類庫開發;采用非阻塞方式的異步傳輸;事件驅動;支持批量數據傳輸;支持TCP、UDP協議;控制反轉的設計模式(支持Spring);采用優雅的松耦合架構;可靈活的加載過濾器機制;單元測試更容易實現;可自定義線程的數量,以提高運行于多處理器上的性能;采用回調的方式完成調用,線程的使用更容易。

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.ReadFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

/**

  • 使用Mina2.x發送報文的工具類
  • @see 詳細說明請參閱我的博客http://blog.csdn.net/jadyer/article/details/8088068
  • @version v1.5
  • @history v1.1-->編碼器和解碼器中的字符處理,升級為Mina2.x提供的<code>putString()</code>方法來處理
  • @history v1.2-->解碼器采用CumulativeProtocolDecoder實現,以適應應答報文被拆分多次后發送Client的情況
  • @history v1.3-->修復BUG:請求報文有誤時,Server可能返回非約定報文,此時會拋java.lang.NumberFormatException
  • @history v1.4-->增加全局異常捕獲
  • @history v1.5-->由于本工具類的作用是同步的客戶端,故取消IoHandler設置,但注意必須setUseReadOperation(true)
  • @update Jul 27, 2013 10:21:01 AM
  • @create Oct 3, 2012 12:42:21 PM
  • @author 玄玉<http://blog.csdn.net/jadyer&gt; */
    public class MinaUtil {
    private MinaUtil(){}

    /**

    • 發送TCP消息
    • @see 當通信發生異常時,如Fail to get session....返回<code>"MINA_SERVER_ERROR"</code>字符串
    • @param message 待發送報文的中文字符串形式
    • @param ipAddress 遠程主機的IP地址
    • @param port 遠程主機的端口號
    • @param charset 該方法與遠程主機間通信報文為編碼字符集(編碼為byte[]發送到Server)
    • @return 遠程主機響應報文的字符串形式 */
      public static String sendTCPMessage(String message, String ipAddress, int port, String charset){
      IoConnector connector = new NioSocketConnector();
      connector.setConnectTimeoutMillis(1000);
      connector.getSessionConfig().setUseReadOperation(true); //同步的客戶端,必須設置此項,其默認為false
      connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ClientProtocolEncoder(charset), new ClientProtocolDecode(charset)));
      //connector.setHandler(this); //作為同步的客戶端,可以不需要IoHandler,Mina會自動添加一個默認的IoHandler實現,即AbstractIoConnector
      IoSession session = null;
      Object respData = null;
      try{
       ConnectFuture connectFuture = connector.connect(new InetSocketAddress(ipAddress, port));  
       connectFuture.awaitUninterruptibly();          //等待連接成功,相當于將異步執行轉為同步執行  
       session = connectFuture.getSession();          //獲取連接成功后的會話對象  
       session.write(message).awaitUninterruptibly(); //由于上面已經設置setUseReadOperation(true),故IoSession.read()方法才可用  
       ReadFuture readFuture = session.read();        //因其內部使用BlockingQueue,故Server端用之可能會內存泄漏,但Client端可適當用之  
       if(readFuture.awaitUninterruptibly(90, TimeUnit.SECONDS)){ //Wait until the message is received  
           respData = readFuture.getMessage();                    //Get the received message  
       }else{  
           LogUtil.getLogger().info("讀取[/" + ipAddress + ":" + port + "]超時");  
       }  
      
      }catch(Exception e){
       LogUtil.getLogger().error("請求通信[/" + ipAddress + ":" + port + "]偶遇異常,堆棧軌跡如下", e);  
      
      }finally{
       if(session != null){  
           //關閉IoSession,該操作是異步的,true為立即關閉,false為所有寫操作都flush后關閉  
           //這里僅僅是關閉了TCP的連接通道,并未關閉Client端程序  
           session.close(true);  
           //客戶端發起連接時,會請求系統分配相關的文件句柄,而在連接失敗時記得釋放資源,否則會造成文件句柄泄露  
           //當總的文件句柄數超過系統設置值時[ulimit -n],則拋異常"java.io.IOException: Too many open files",導致新連接無法創建,服務器掛掉  
           //所以,若不關閉的話,其運行一段時間后可能拋出too many open files異常,導致無法連接  
           session.getService().dispose();  
       }  
      
      }
      return respData==null ? "MINA_SERVER_ERROR" : respData.toString();
      }
/** 
 * 客戶端編碼器 
 * @see 將Client的報文編碼后發送到Server 
 */  
private static class ClientProtocolEncoder extends ProtocolEncoderAdapter {  
    private final String charset;  
    public ClientProtocolEncoder(String charset){  
        this.charset = charset;  
    }  
    @Override  
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {  
        IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);  
        //二者等效--><code>buffer.put(message.toString().getBytes(charset))</code>  
        buffer.putString(message.toString(), Charset.forName(charset).newEncoder());  
        buffer.flip();  
        out.write(buffer);  
    }  
}  


/** 
 * 客戶端解碼器 
 * @see 解碼Server的響應報文給Client 
 * @see 樣例報文[000064100030010000120121101210419100000000000028`18622233125`10`] 
 */  
private static class ClientProtocolDecode extends CumulativeProtocolDecoder {  
    private final String charset;  
    //注意這里使用了Mina自帶的AttributeKey類來定義保存在IoSession中對象的鍵值,其可有效防止鍵值重復  
    //通過查詢AttributeKey類源碼發現,它的構造方法采用的是"類名+鍵名+AttributeKey的hashCode"的方式  
    private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");  
    public ClientProtocolDecode(String charset){  
        this.charset = charset;  
    }  
    private Context getContext(IoSession session){  
        Context context = (Context)session.getAttribute(CONTEXT);  
        if(null == context){  
            context = new Context();  
            session.setAttribute(CONTEXT, context);  
        }  
        return context;  
    }  
    @Override  
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
        Context ctx = this.getContext(session);  
        IoBuffer buffer = ctx.innerBuffer;  
        int messageCount = ctx.getMessageCount();  
        while(in.hasRemaining()){    //判斷position和limit之間是否有元素  
            buffer.put(in.get());    //get()讀取buffer的position的字節,然后position+1  
            if(messageCount++ == 5){ //約定:報文的前6個字符串表示報文總長度,不足6位則左側補0  
                buffer.flip();       //Set limit=position and position=0 and mark=-1  
                //當Server的響應報文中含0x00時,Mina2.x的buffer.getString(fieldSize, decoder)方法會break  
                //該方法的處理細節,詳見org.apache.mina.core.buffer.AbstractIoBuffer類的第1718行源碼,其說明如下  
                //Reads a NUL-terminated string from this buffer using the specified decoder and returns it  
                //ctx.setMessageLength(Integer.parseInt(buffer.getString(6, decoder)));  
                byte[] messageLength = new byte[6];  
                buffer.get(messageLength);  
                try{  
                    //請求報文有誤時,Server可能返回非約定報文,此時會拋java.lang.NumberFormatException  
                    ctx.setMessageLength(Integer.parseInt(new String(messageLength, charset)));  
                }catch(NumberFormatException e){  
                    ctx.setMessageLength(in.limit());  
                }  
                buffer.limit(in.limit()); //讓兩個IoBuffer的limit相等  
            }  
        }  
        ctx.setMessageCount(messageCount);  
        if(ctx.getMessageLength() == buffer.position()){  
            buffer.flip();  
            byte[] message = new byte[buffer.limit()];  
            buffer.get(message);  
            out.write(new String(message, charset));  
            ctx.reset();  
            return true;  
        }else{  
            return false;  
        }  
    }  
    private class Context{  
        private final IoBuffer innerBuffer; //用于累積數據的IoBuffer  
        private int messageCount;           //記錄已讀取的報文字節數  
        private int messageLength;          //記錄已讀取的報文頭標識的報文長度  
        public Context(){  
            innerBuffer = IoBuffer.allocate(100).setAutoExpand(true);  
        }  
        public int getMessageCount() {  
            return messageCount;  
        }  
        public void setMessageCount(int messageCount) {  
            this.messageCount = messageCount;  
        }  
        public int getMessageLength() {  
            return messageLength;  
        }  
        public void setMessageLength(int messageLength) {  
            this.messageLength = messageLength;  
        }  
        public void reset(){  
            this.innerBuffer.clear(); //Set limit=capacity and position=0 and mark=-1  
            this.messageCount = 0;  
            this.messageLength = 0;  
        }  
    }  
}  

} </pre>

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!