把Socket擼成RxSocket

dmsa4941 8年前發布 | 12K 次閱讀 Socket 網絡技術

0 .概述

最近,公司的項目需要重構代碼,因為個人最近接觸了Rx這個高大上的東東,

而,項目原來是滿滿的回調地獄...項目是基于Socket的長連接,所以呢,

從最底層開始,把普通的Socket擼成Rx!

1 .設計模式

首先,這個RxSocket是唯一的,也就是,全局唯一咯,

嗯,設計成單例。

public static RxSocket getInstance() {
        RxSocket rxSocket = defaultInstance;
        if (defaultInstance == null) {
            synchronized (RxSocket.class) {
                rxSocket = defaultInstance;
                if (defaultInstance == null) {
                    rxSocket = new RxSocket();
                    defaultInstance = rxSocket;
                }
            }
        }
        return rxSocket;
    }

雙重加鎖型的單例。

2 .對外的接口/方法

Socket,第一個想到就是連接,讀寫,而,我們外界想知道的,就只是是否

寫,連接是否成功,和讀到啥數據。所以定義:

public Observable<Boolean> connectRx(String ip, int port);
public Observable<Boolean> disConnect();
public Observable<byte[]> read();
public Observable<Boolean> write(ByteBuffer buffer);

還有一點,應該要有一個方法,讓外界知道,這個Socket的狀態,也就是監聽方法:

public Observable<SocketStatus> socketStatusListener ();

3 .具體代碼實現

package chestnut.RxSocket;

import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.TimeUnit;

import chestnut.utils.LogUtils; import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; import rx.subjects.SerializedSubject; import rx.subjects.Subject;

/**

  • Created by Chestnut on 2016/12/18. */

public class RxSocket {

//本類的常量
private static final String TAG = "RxSocket";
private static final boolean OpenLog = true;
private static final long WRITE_TIME_OUT = 3000;
private static final long CONNECT_TIME_OUT = 3000;

//單例
private Subject<Object,byte[]> readSubject;
private Subject<Object,SocketStatus> connectStatus;
private static volatile RxSocket defaultInstance;
private RxSocket() {
    readSubject = new SerializedSubject(PublishSubject.create());
    connectStatus = new SerializedSubject(PublishSubject.create());
}
public static RxSocket getInstance() {
    RxSocket rxSocket = defaultInstance;
    if (defaultInstance == null) {
        synchronized (RxSocket.class) {
            rxSocket = defaultInstance;
            if (defaultInstance == null) {
                rxSocket = new RxSocket();
                defaultInstance = rxSocket;
            }
        }
    }
    return rxSocket;
}

//變量
private SocketStatus socketStatus = SocketStatus.DIS_CONNECT;
private Selector selector = null;
private SocketChannel socketChannel = null;
private SelectionKey selectionKey = null;
private ReadThread readThread = null;
private boolean isReadThreadAlive = true;
private SocketReconnectCallback socketReconnectCallback = null;

//方法

/**
 * 監聽Socket的狀態
 * @return
 */
public Observable<SocketStatus> socketStatusListener () {
    return connectStatus;
}

/**
 * 建立Socket連接,只是嘗試建立一次
 * @param ip    IP or 域名
 * @param port  端口
 * @return  Rx true or false
 */
public Observable<Boolean> connectRx(String ip, int port) {
    return Observable
            .create(new Observable.OnSubscribe<Boolean>() {
                @Override
                public void call(Subscriber<? super Boolean> subscriber) {

                    //正在連接
                    if (socketStatus == SocketStatus.CONNECTING) {
                        subscriber.onNext(false);
                        subscriber.onCompleted();
                        return;
                    }

                    //未連接 | 已經連接,關閉Socket
                    socketStatus = SocketStatus.DIS_CONNECT;
                    isReadThreadAlive = false;
                    readThread = null;
                    if (selector!=null)
                        try {
                            selector.close();
                        } catch (Exception e) {
                            LogUtils.i(OpenLog,TAG,"selector.close");
                        }
                    if (selectionKey!=null)
                        try {
                            selectionKey.cancel();
                        } catch (Exception e) {
                            LogUtils.i(OpenLog,TAG,"selectionKey.cancel");
                        }
                    if (socketChannel!=null)
                        try {
                            socketChannel.close();
                        } catch (Exception e) {
                            LogUtils.i(OpenLog,TAG,"socketChannel.close");
                        }

                    //重啟Socket
                    isReadThreadAlive = true;
                    readThread = new ReadThread(ip,port);
                    readThread.start();
                    socketReconnectCallback = new SocketReconnectCallback() {
                        @Override
                        public void onSuccess() {
                            subscriber.onNext(true);
                            subscriber.onCompleted();
                        }

                        @Override
                        public void onFail(String msg) {
                            LogUtils.i(OpenLog,TAG,"connectRx:"+msg);
                            subscriber.onNext(false);
                            subscriber.onCompleted();
                        }
                    };
                }
            })
            .subscribeOn(Schedulers.newThread())
            .timeout(CONNECT_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false));
}

/**
 * 斷開當前的Socket
 * @return Rx true or false
 */
public Observable<Boolean> disConnect() {
    return Observable.create(new Observable.OnSubscribe<Boolean>() {
        @Override
        public void call(Subscriber<? super Boolean> subscriber) {
            try {
                if (socketStatus == SocketStatus.DIS_CONNECT) {
                    subscriber.onNext(true);
                    subscriber.onCompleted();
                }
                else {
                    socketStatus = SocketStatus.DIS_CONNECT;
                    isReadThreadAlive = false;
                    readThread = null;
                    if (selector!=null)
                        try {
                            selector.close();
                        } catch (Exception e) {
                            LogUtils.i(OpenLog,TAG,"selector.close");
                        }
                    if (selectionKey!=null)
                        try {
                            selectionKey.cancel();
                        } catch (Exception e) {
                            LogUtils.i(OpenLog,TAG,"selectionKey.cancel");
                        }
                    if (socketChannel!=null)
                        try {
                            socketChannel.close();
                        } catch (Exception e) {
                            LogUtils.i(OpenLog,TAG,"socketChannel.close");
                        }
                    subscriber.onNext(true);
                    subscriber.onCompleted();
                }
            } catch (Exception e) {
                subscriber.onNext(false);
                subscriber.onCompleted();
            }
        }
    });
}

/**
 * 讀取Socket的消息
 * @return  Rx error 或者 有數據
 */
public Observable<byte[]> read() {
    if (socketStatus != SocketStatus.CONNECTED)
        return Observable.create(new Observable.OnSubscribe<byte[]>() {
            @Override
            public void call(Subscriber<? super byte[]> subscriber) {
                subscriber.onError(new Throwable("Socket Dis Connect"));
            }
        });
    else
        return readSubject;
}

/**
 * 向Socket寫消息
 * @param buffer    數據包
 * @return  Rx true or false
 */
public Observable<Boolean> write(ByteBuffer buffer) {
    return Observable
            .create(new Observable.OnSubscribe<Boolean>() {
                @Override
                public void call(Subscriber<? super Boolean> subscriber) {
                    if (socketStatus != SocketStatus.CONNECTED) {
                        subscriber.onNext(false);
                        subscriber.onCompleted();
                    }
                    else {
                        if (socketChannel!=null && socketChannel.isConnected()) {
                            try {
                                int result = socketChannel.write(buffer);
                                if (result==0) {
                                    LogUtils.i(OpenLog,TAG,"write."+"服務器斷開鏈接");
                                }
                                else if (result<0) {
                                    LogUtils.e(OpenLog, TAG, "write." + "發送出錯");
                                }
                                else {
                                    subscriber.onNext(true);
                                    subscriber.onCompleted();
                                }
                            } catch (Exception e) {
                                LogUtils.i(OpenLog,TAG,"write."+e.getMessage());
                                subscriber.onNext(false);
                                subscriber.onCompleted();
                            }
                        }
                        else {
                            LogUtils.i(OpenLog,TAG,"write."+"close");
                            subscriber.onNext(false);
                            subscriber.onCompleted();
                        }
                    }
                }
            })
            .subscribeOn(Schedulers.newThread())
            .timeout(WRITE_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false));
}

//類
private class ReadThread extends Thread {
    private String ip;
    private int port;
    ReadThread(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }
    @Override
    public void run() {
        LogUtils.i(OpenLog,TAG,"ReadThread:"+"start");
        while (isReadThreadAlive) {
            //連接
            if (socketStatus == SocketStatus.DIS_CONNECT) {
                try {
                    if (selectionKey != null) selectionKey.cancel();
                    socketChannel = SocketChannel.open();
                    socketChannel.configureBlocking(false);
                    selector = Selector.open();
                    socketChannel.connect(new InetSocketAddress(ip, port));
                    selectionKey = socketChannel.register(selector, SelectionKey.OP_CONNECT);
                    socketStatus = SocketStatus.CONNECTING;
                    connectStatus.onNext(SocketStatus.CONNECTING);
                } catch (Exception e) {
                    isReadThreadAlive = false;
                    socketStatus = SocketStatus.DIS_CONNECT;
                    connectStatus.onNext(SocketStatus.DIS_CONNECT);
                    LogUtils.e(OpenLog, TAG, "ReadThread:init:" + e.getMessage());
                    if (socketReconnectCallback!=null)
                        socketReconnectCallback.onFail("SocketConnectFail1");
                }
            }
            //讀取
            else if (socketStatus == SocketStatus.CONNECTING || socketStatus  == SocketStatus.CONNECTED) {
                try {
                    selector.select();
                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        if (key.isConnectable()) {
                            if (socketChannel.isConnectionPending()) {
                                try {
                                    socketChannel.finishConnect();
                                    socketStatus = SocketStatus.CONNECTED;
                                    connectStatus.onNext(SocketStatus.CONNECTED);
                                    socketChannel.configureBlocking(false);
                                    socketChannel.register(selector, SelectionKey.OP_READ);
                                    if (socketReconnectCallback!=null)
                                        socketReconnectCallback.onSuccess();
                                } catch (Exception e) {
                                    isReadThreadAlive = false;
                                    socketStatus = SocketStatus.DIS_CONNECT;
                                    connectStatus.onNext(SocketStatus.DIS_CONNECT);
                                    LogUtils.e(OpenLog, TAG, "ReadThread:finish:" + e.getMessage());
                                    if (socketReconnectCallback!=null)
                                        socketReconnectCallback.onFail("SocketConnectFail2");
                                }
                            }
                        } else if (key.isReadable()) {
                            ByteBuffer buf = ByteBuffer.allocate(10000);
                            int length = socketChannel.read(buf);
                            if (length <= 0) {
                                LogUtils.e(OpenLog, TAG, "服務器主動斷開鏈接!");
                            } else {
                                byte[] bytes = new byte[length];
                                for (int i = 0; i < length; i++) {
                                    bytes[i] = buf.get(i);
                                }
                                readSubject.onNext(bytes);
                            }
                        }
                    }
                    it.remove();
                } catch (Exception e) {
                    isReadThreadAlive = false;
                    socketStatus = SocketStatus.DIS_CONNECT;
                    connectStatus.onNext(SocketStatus.DIS_CONNECT);
                    LogUtils.e(OpenLog, TAG, "ReadThread:read:" + e.getMessage());
                    if (socketReconnectCallback!=null)
                        socketReconnectCallback.onFail("SocketConnectFail3");
                }
            }
        }
    }
}

//枚舉 && 接口
public enum SocketStatus {
    DIS_CONNECT,
    CONNECTING,
    CONNECTED,
}

private interface SocketReconnectCallback {
    void onSuccess();
    void onFail(String msg);
}

}</code></pre>

額,好像有點長,這個Socket是NIO包的Socket,里面只是開啟了一條線程。

4 .注意

  • 之所以放出一個監聽方法,我想的是,Socket連接上后,有可能會被斷開,

    這樣,就需要做一個重連的策略,當然,這個策略看項目的要求,

    因而,我把其對外開放了。你可以監聽這個方法,去做Socket的重連策略。

  • RxSokcet的讀方法,需要注意,要在適當的時候去解除訂閱。

    還有,Socket狀態的監聽也是。

  • 最后,有哪些不合理的地方,各位大老要好好教導一下小弟~

 

來自:http://www.jianshu.com/p/27e4f714cfa3

 

 本文由用戶 dmsa4941 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!