把Socket擼成RxSocket
0 .概述
最近,公司的項目需要重構代碼,因為個人最近接觸了Rx這個高大上的東東,
而,項目原來是滿滿的回調地獄...項目是基于Socket的長連接,所以呢,
從最底層開始,把普通的Socket擼成Rx!
1 .設計模式
首先,這個RxSocket是唯一的,也就是,全局唯一咯,
嗯,設計成單例。
public static RxSocket getInstance() {
RxSocket rxSocket = defaultInstance;
if (defaultInstance == null) {
synchronized (RxSocket.class) {
rxSocket = defaultInstance;
if (defaultInstance == null) {
rxSocket = new RxSocket();
defaultInstance = rxSocket;
}
}
}
return rxSocket;
}
雙重加鎖型的單例。
2 .對外的接口/方法
Socket,第一個想到就是連接,讀寫,而,我們外界想知道的,就只是是否
寫,連接是否成功,和讀到啥數據。所以定義:
public Observable<Boolean> connectRx(String ip, int port);
public Observable<Boolean> disConnect();
public Observable<byte[]> read();
public Observable<Boolean> write(ByteBuffer buffer);
還有一點,應該要有一個方法,讓外界知道,這個Socket的狀態,也就是監聽方法:
public Observable<SocketStatus> socketStatusListener ();
3 .具體代碼實現
package chestnut.RxSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import chestnut.utils.LogUtils;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
/**
- Created by Chestnut on 2016/12/18.
*/
public class RxSocket {
//本類的常量
private static final String TAG = "RxSocket";
private static final boolean OpenLog = true;
private static final long WRITE_TIME_OUT = 3000;
private static final long CONNECT_TIME_OUT = 3000;
//單例
private Subject<Object,byte[]> readSubject;
private Subject<Object,SocketStatus> connectStatus;
private static volatile RxSocket defaultInstance;
private RxSocket() {
readSubject = new SerializedSubject(PublishSubject.create());
connectStatus = new SerializedSubject(PublishSubject.create());
}
public static RxSocket getInstance() {
RxSocket rxSocket = defaultInstance;
if (defaultInstance == null) {
synchronized (RxSocket.class) {
rxSocket = defaultInstance;
if (defaultInstance == null) {
rxSocket = new RxSocket();
defaultInstance = rxSocket;
}
}
}
return rxSocket;
}
//變量
private SocketStatus socketStatus = SocketStatus.DIS_CONNECT;
private Selector selector = null;
private SocketChannel socketChannel = null;
private SelectionKey selectionKey = null;
private ReadThread readThread = null;
private boolean isReadThreadAlive = true;
private SocketReconnectCallback socketReconnectCallback = null;
//方法
/**
* 監聽Socket的狀態
* @return
*/
public Observable<SocketStatus> socketStatusListener () {
return connectStatus;
}
/**
* 建立Socket連接,只是嘗試建立一次
* @param ip IP or 域名
* @param port 端口
* @return Rx true or false
*/
public Observable<Boolean> connectRx(String ip, int port) {
return Observable
.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
//正在連接
if (socketStatus == SocketStatus.CONNECTING) {
subscriber.onNext(false);
subscriber.onCompleted();
return;
}
//未連接 | 已經連接,關閉Socket
socketStatus = SocketStatus.DIS_CONNECT;
isReadThreadAlive = false;
readThread = null;
if (selector!=null)
try {
selector.close();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"selector.close");
}
if (selectionKey!=null)
try {
selectionKey.cancel();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"selectionKey.cancel");
}
if (socketChannel!=null)
try {
socketChannel.close();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"socketChannel.close");
}
//重啟Socket
isReadThreadAlive = true;
readThread = new ReadThread(ip,port);
readThread.start();
socketReconnectCallback = new SocketReconnectCallback() {
@Override
public void onSuccess() {
subscriber.onNext(true);
subscriber.onCompleted();
}
@Override
public void onFail(String msg) {
LogUtils.i(OpenLog,TAG,"connectRx:"+msg);
subscriber.onNext(false);
subscriber.onCompleted();
}
};
}
})
.subscribeOn(Schedulers.newThread())
.timeout(CONNECT_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false));
}
/**
* 斷開當前的Socket
* @return Rx true or false
*/
public Observable<Boolean> disConnect() {
return Observable.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
try {
if (socketStatus == SocketStatus.DIS_CONNECT) {
subscriber.onNext(true);
subscriber.onCompleted();
}
else {
socketStatus = SocketStatus.DIS_CONNECT;
isReadThreadAlive = false;
readThread = null;
if (selector!=null)
try {
selector.close();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"selector.close");
}
if (selectionKey!=null)
try {
selectionKey.cancel();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"selectionKey.cancel");
}
if (socketChannel!=null)
try {
socketChannel.close();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"socketChannel.close");
}
subscriber.onNext(true);
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onNext(false);
subscriber.onCompleted();
}
}
});
}
/**
* 讀取Socket的消息
* @return Rx error 或者 有數據
*/
public Observable<byte[]> read() {
if (socketStatus != SocketStatus.CONNECTED)
return Observable.create(new Observable.OnSubscribe<byte[]>() {
@Override
public void call(Subscriber<? super byte[]> subscriber) {
subscriber.onError(new Throwable("Socket Dis Connect"));
}
});
else
return readSubject;
}
/**
* 向Socket寫消息
* @param buffer 數據包
* @return Rx true or false
*/
public Observable<Boolean> write(ByteBuffer buffer) {
return Observable
.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
if (socketStatus != SocketStatus.CONNECTED) {
subscriber.onNext(false);
subscriber.onCompleted();
}
else {
if (socketChannel!=null && socketChannel.isConnected()) {
try {
int result = socketChannel.write(buffer);
if (result==0) {
LogUtils.i(OpenLog,TAG,"write."+"服務器斷開鏈接");
}
else if (result<0) {
LogUtils.e(OpenLog, TAG, "write." + "發送出錯");
}
else {
subscriber.onNext(true);
subscriber.onCompleted();
}
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"write."+e.getMessage());
subscriber.onNext(false);
subscriber.onCompleted();
}
}
else {
LogUtils.i(OpenLog,TAG,"write."+"close");
subscriber.onNext(false);
subscriber.onCompleted();
}
}
}
})
.subscribeOn(Schedulers.newThread())
.timeout(WRITE_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false));
}
//類
private class ReadThread extends Thread {
private String ip;
private int port;
ReadThread(String ip, int port) {
this.ip = ip;
this.port = port;
}
@Override
public void run() {
LogUtils.i(OpenLog,TAG,"ReadThread:"+"start");
while (isReadThreadAlive) {
//連接
if (socketStatus == SocketStatus.DIS_CONNECT) {
try {
if (selectionKey != null) selectionKey.cancel();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
selector = Selector.open();
socketChannel.connect(new InetSocketAddress(ip, port));
selectionKey = socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketStatus = SocketStatus.CONNECTING;
connectStatus.onNext(SocketStatus.CONNECTING);
} catch (Exception e) {
isReadThreadAlive = false;
socketStatus = SocketStatus.DIS_CONNECT;
connectStatus.onNext(SocketStatus.DIS_CONNECT);
LogUtils.e(OpenLog, TAG, "ReadThread:init:" + e.getMessage());
if (socketReconnectCallback!=null)
socketReconnectCallback.onFail("SocketConnectFail1");
}
}
//讀取
else if (socketStatus == SocketStatus.CONNECTING || socketStatus == SocketStatus.CONNECTED) {
try {
selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isConnectable()) {
if (socketChannel.isConnectionPending()) {
try {
socketChannel.finishConnect();
socketStatus = SocketStatus.CONNECTED;
connectStatus.onNext(SocketStatus.CONNECTED);
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
if (socketReconnectCallback!=null)
socketReconnectCallback.onSuccess();
} catch (Exception e) {
isReadThreadAlive = false;
socketStatus = SocketStatus.DIS_CONNECT;
connectStatus.onNext(SocketStatus.DIS_CONNECT);
LogUtils.e(OpenLog, TAG, "ReadThread:finish:" + e.getMessage());
if (socketReconnectCallback!=null)
socketReconnectCallback.onFail("SocketConnectFail2");
}
}
} else if (key.isReadable()) {
ByteBuffer buf = ByteBuffer.allocate(10000);
int length = socketChannel.read(buf);
if (length <= 0) {
LogUtils.e(OpenLog, TAG, "服務器主動斷開鏈接!");
} else {
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
bytes[i] = buf.get(i);
}
readSubject.onNext(bytes);
}
}
}
it.remove();
} catch (Exception e) {
isReadThreadAlive = false;
socketStatus = SocketStatus.DIS_CONNECT;
connectStatus.onNext(SocketStatus.DIS_CONNECT);
LogUtils.e(OpenLog, TAG, "ReadThread:read:" + e.getMessage());
if (socketReconnectCallback!=null)
socketReconnectCallback.onFail("SocketConnectFail3");
}
}
}
}
}
//枚舉 && 接口
public enum SocketStatus {
DIS_CONNECT,
CONNECTING,
CONNECTED,
}
private interface SocketReconnectCallback {
void onSuccess();
void onFail(String msg);
}
}</code></pre>
額,好像有點長,這個Socket是NIO包的Socket,里面只是開啟了一條線程。
4 .注意
-
之所以放出一個監聽方法,我想的是,Socket連接上后,有可能會被斷開,
這樣,就需要做一個重連的策略,當然,這個策略看項目的要求,
因而,我把其對外開放了。你可以監聽這個方法,去做Socket的重連策略。
-
RxSokcet的讀方法,需要注意,要在適當的時候去解除訂閱。
還有,Socket狀態的監聽也是。
-
最后,有哪些不合理的地方,各位大老要好好教導一下小弟~
來自:http://www.jianshu.com/p/27e4f714cfa3