iOS即時通訊,從入門到“放棄”?

前言
本文會用實例的方式,將iOS各種IM的方案都簡單的實現一遍。并且提供一些選型、實現細節以及優化的建議。
可以打開項目先預覽效果,對照著進行閱讀。
言歸正傳,首先我們來總結一下我們去實現IM的方式
第一種方式,使用第三方IM服務
對于短平快的公司,完全可以采用第三方SDK來實現。國內IM的第三方服務商有很多,類似云信、環信、融云、LeanCloud,當然還有其它的很多,這里就不一一舉例了,感興趣的小伙伴可以自行查閱下。
-
第三方服務商IM底層協議基本上都是TCP。他們的IM方案很成熟,有了它們,我們甚至不需要自己去搭建IM后臺,什么都不需要去考慮。
如果你足夠 懶,甚至連UI都不需要自己做,這些第三方有各自一套IM的UI,拿來就可以直接用。真可謂3分鐘集成...
-
但是缺點也很明顯,定制化程度太高,很多東西我們不可控。當然還有一個最最重要的一點,就是太貴了...作為真正社交為主打的APP,僅此一點,就足以讓我們望而卻步。當然,如果IM對于APP只是一個輔助功能,那么用第三方服務也無可厚非。
另外一種方式,我們自己去實現
我們自己去實現也有很多選擇:
1)首先面臨的就是傳輸協議的選擇,TCP還是UDP?
2)其次是我們需要去選擇使用哪種聊天協議:
-
基于Scoket或者WebScoket或者其他的私有協議、
-
MQTT
-
還是廣為人詬病的XMPP?
3)我們是自己去基于OS底層Socket進行封裝還是在第三方框架的基礎上進行封裝?
4)傳輸數據的格式,我們是用Json、還是XML、還是谷歌推出的ProtocolBuffer?
5)我們還有一些細節問題需要考慮,例如TCP的長連接如何保持,心跳機制,Qos機制,重連機制等等...當然,除此之外,我們還有一些安全問題需要考慮。
一、傳輸協議的選擇
接下來我們可能需要自己考慮去實現IM,首先從傳輸層協議來說,我們有兩種選擇:TCP or UDP?

這里我們直接說結論吧:對于小公司或者技術不那么成熟的公司,IM一定要用TCP來實現,因為如果你要用UDP的話,需要做的事太多。當然QQ就是用的UDP協議,當然不僅僅是UDP,騰訊還用了自己的私有協議,來保證了傳輸的可靠性,杜絕了UDP下各種數據丟包,亂序等等一系列問題。
總之一句話,如果你覺得團隊技術很成熟,那么你用UDP也行,否則還是用TCP為好。
二、我們來看看各種聊天協議
首先我們以實現方式來切入,基本上有以下四種實現方式:
-
基于Scoket原生:代表框架 CocoaAsyncSocket。
-
基于WebScoket:代表框架 SocketRocket。
-
基于MQTT:代表框架 MQTTKit。
-
基于XMPP:代表框架 XMPPFramework。
當然,以上四種方式我們都可以不使用第三方框架,直接基于OS底層Scoket去實現我們的自定義封裝。下面我會給出一個基于Scoket原生而不使用框架的例子,供大家參考一下。
首先需要搞清楚的是,其中MQTT和XMPP為聊天協議,它們是最上層的協議,而WebScoket是傳輸通訊協議,它是基于Socket封裝的一個協議。而通常我們所說的騰訊IM的私有協議, 就是基于WebScoket或者Scoket原生進行封裝的一個聊天協議 。
具體這3種聊天協議的對比優劣如下:

協議優劣對比.png
所以說到底,iOS要做一個真正的IM產品,一般都是基于Scoket或者WebScoket等,再之上加上一些私有協議來保證的。
1.我們先不使用任何框架,直接用OS底層Socket來實現一個簡單的IM。
我們客戶端的實現思路也是很簡單,創建Socket,和服務器的Socket對接上,然后開始傳輸數據就可以了。
-
我們學過c/c++或者java這些語言,我們就知道,往往任何教程,最后一章都是講Socket編程,而Socket是什么呢,簡單的來說,就是我們使用TCP/IP 或者UDP/IP協議的一組編程接口。如下圖所示:

我們在應用層,使用socket,輕易的實現了進程之間的通信(跨網絡的)。想想,如果沒有socket,我們要直面TCP/IP協議,我們需要去寫多少繁瑣而又重復的代碼。
我們接著可以開始著手去實現IM了,首先我們不基于任何框架,直接去調用OS底層-基于C的BSD Socket去實現,它提供了這樣一組接口:
//socket 創建并初始化 socket,返回該 socket 的文件描述符,如果描述符為 -1 表示創建失敗。 int socket(int addressFamily, int type,int protocol) //關閉socket連接 int close(int socketFileDescriptor) //將 socket 與特定主機地址與端口號綁定,成功綁定返回0,失敗返回 -1。 int bind(int socketFileDescriptor,sockaddr *addressToBind,int addressStructLength) //接受客戶端連接請求并將客戶端的網絡地址信息保存到 clientAddress 中。 int accept(int socketFileDescriptor,sockaddr *clientAddress, int clientAddressStructLength) //客戶端向特定網絡地址的服務器發送連接請求,連接成功返回0,失敗返回 -1。 int connect(int socketFileDescriptor,sockaddr *serverAddress, int serverAddressLength) //使用 DNS 查找特定主機名字對應的 IP 地址。如果找不到對應的 IP 地址則返回 NULL。 hostent* gethostbyname(char *hostname) //通過 socket 發送數據,發送成功返回成功發送的字節數,否則返回 -1。 int send(int socketFileDescriptor, char *buffer, int bufferLength, int flags) //從 socket 中讀取數據,讀取成功返回成功讀取的字節數,否則返回 -1。 int receive(int socketFileDescriptor,char *buffer, int bufferLength, int flags) //通過UDP socket 發送數據到特定的網絡地址,發送成功返回成功發送的字節數,否則返回 -1。 int sendto(int socketFileDescriptor,char *buffer, int bufferLength, int flags, sockaddr *destinationAddress, int destinationAddressLength) //從UDP socket 中讀取數據,并保存發送者的網絡地址信息,讀取成功返回成功讀取的字節數,否則返回 -1 。 int recvfrom(int socketFileDescriptor,char *buffer, int bufferLength, int flags, sockaddr *fromAddress, int *fromAddressLength)
讓我們可以對socket進行各種操作,首先我們來用它寫個客戶端。總結一下,簡單的IM客戶端需要做如下4件事:
-
客戶端調用 socket(...) 創建socket;
-
客戶端調用 connect(...) 向服務器發起連接請求以建立連接;
-
客戶端與服務器建立連接之后,就可以通過send(...)/receive(...)向客戶端發送或從客戶端接收數據;
-
客戶端調用 close 關閉 socket;
根據上面4條大綱,我們封裝了一個名為TYHSocketManager的單例,來對socket相關方法進行調用:
TYHSocketManager.h
#import (Foundation/Foundation.h)(因識別問題,此處用圓括號替換尖括號) @interface TYHSocketManager : NSObject + (instancetype)share; - (void)connect; - (void)disConnect; - (void)sendMsg:(NSString *)msg; @end
TYHSocketManager.m
#import "TYHSocketManager.h"
#import (sys/types.h)(因識別問題,此處用圓括號替換尖括號)
#import (sys/socket.h)(因識別問題,此處用圓括號替換尖括號)
#import (netinet/in.h)(因識別問題,此處用圓括號替換尖括號)
#import (arpa/inet.h)(因識別問題,此處用圓括號替換尖括號)
@interface TYHSocketManager()
@property (nonatomic,assign)int clientScoket;
1
@end
@implementation TYHSocketManager
+ (instancetype)share
{
static dispatch_once_t onceToken;
static TYHSocketManager *instance = nil;
dispatch_once(&onceToken, ^{
instance = [[self alloc]init];
[instance initScoket];
[instance pullMsg];
});
return instance;
}
- (void)initScoket
{
//每次連接前,先斷開連接
if (_clientScoket != 0) {
[self disConnect];
_clientScoket = 0;
}
//創建客戶端socket
_clientScoket = CreateClinetSocket();
//服務器Ip
const char * server_ip="127.0.0.1";
//服務器端口
short server_port=6969;
//等于0說明連接失敗
if (ConnectionToServer(_clientScoket,server_ip, server_port)==0) {
printf("Connect to server error\n");
return ;
}
//走到這說明連接成功
printf("Connect to server ok\n");
}
static int CreateClinetSocket()
{
int ClinetSocket = 0;
//創建一個socket,返回值為Int。(注scoket其實就是Int類型)
//第一個參數addressFamily IPv4(AF_INET) 或 IPv6(AF_INET6)。
//第二個參數 type 表示 socket 的類型,通常是流stream(SOCK_STREAM) 或數據報文datagram(SOCK_DGRAM)
//第三個參數 protocol 參數通常設置為0,以便讓系統自動為選擇我們合適的協議,對于 stream socket 來說會是 TCP 協議(IPPROTO_TCP),而對于 datagram來說會是 UDP 協議(IPPROTO_UDP)。
ClinetSocket = socket(AF_INET, SOCK_STREAM, 0);
return ClinetSocket;
}
static int ConnectionToServer(int client_socket,const char * server_ip,unsigned short port)
{
//生成一個sockaddr_in類型結構體
struct sockaddr_in sAddr={0};
sAddr.sin_len=sizeof(sAddr);
//設置IPv4
sAddr.sin_family=AF_INET;
//inet_aton是一個改進的方法來將一個字符串IP地址轉換為一個32位的網絡序列IP地址
//如果這個函數成功,函數的返回值非零,如果輸入地址不正確則會返回零。
inet_aton(server_ip, &sAddr.sin_addr);
//htons是將整型變量從主機字節順序轉變成網絡字節順序,賦值端口號
sAddr.sin_port=htons(port);
//用scoket和服務端地址,發起連接。
//客戶端向特定網絡地址的服務器發送連接請求,連接成功返回0,失敗返回 -1。
//注意:該接口調用會阻塞當前線程,直到服務器返回。
if (connect(client_socket, (struct sockaddr *)&sAddr, sizeof(sAddr))==0) {
return client_socket;
}
return 0;
}
#pragma mark - 新線程來接收消息
- (void)pullMsg
{
NSThread *thread = [[NSThread alloc]initWithTarget:self selector:@selector(recieveAction) object:nil];
[thread start];
}
#pragma mark - 對外邏輯
- (void)connect
{
[self initScoket];
}
- (void)disConnect
{
//關閉連接
close(self.clientScoket);
}
//發送消息
- (void)sendMsg:(NSString *)msg
{
const char *send_Message = [msg UTF8String];
send(self.clientScoket,send_Message,strlen(send_Message)+1,0);
}
//收取服務端發送的消息
- (void)recieveAction{
while (1) {
char recv_Message[1024] = {0};
recv(self.clientScoket, recv_Message, sizeof(recv_Message), 0);
printf("%s\n",recv_Message);
}
}
如上所示:
-
我們調用了initScoket方法,利用CreateClinetSocket方法了一個scoket,就是就是調用了socket函數:
ClinetSocket = socket(AF_INET, SOCK_STREAM, 0);
-
然后調用了ConnectionToServer函數與服務器連接,IP地址為127.0.0.1也就是本機localhost和端口6969相連。在該函數中,我們綁定了一個sockaddr_in類型的結構體,該結構體內容如下:
struct sockaddr_in {
__uint8_t sin_len;
sa_family_t sin_family;
in_port_t sin_port;
struct in_addr sin_addr;
char sin_zero[8];
};
里面包含了一些,我們需要連接的服務端的scoket的一些基本參數,具體賦值細節可以見注釋。
-
連接成功之后,我們就可以調用send函數和recv函數進行消息收發了,在這里,我新開辟了一個常駐線程,在這個線程中一個死循環里去不停的調用recv函數,這樣服務端有消息發送過來,第一時間便能被接收到。
就這樣客戶端便簡單的可以用了,接著我們來看看服務端的實現。
一樣,我們首先對服務端需要做的工作簡單的總結下:
-
服務器調用 socket(...) 創建socket;
-
服務器調用 listen(...) 設置緩沖區;
-
服務器通過 accept(...)接受客戶端請求建立連接;
-
服務器與客戶端建立連接之后,就可以通過 send(...)/receive(...)向客戶端發送或從客戶端接收數據;
-
服務器調用 close 關閉 socket;
接著我們就可以具體去實現了
OS底層的函數是支持我們去實現服務端的,但是我們一般不會用iOS去這么做(試問真正的應用場景,有誰用iOS做scoket服務器么...)
在這里我用node.js去搭了一個簡單的scoket服務器。源碼如下:
var net = require('net');
var HOST = '127.0.0.1';
var PORT = 6969;
// 創建一個TCP服務器實例,調用listen函數開始監聽指定端口
// 傳入net.createServer()的回調函數將作為”connection“事件的處理函數
// 在每一個“connection”事件中,該回調函數接收到的socket對象是唯一的
net.createServer(function(sock) {
// 我們獲得一個連接 - 該連接自動關聯一個socket對象
console.log('CONNECTED: ' +
sock.remoteAddress + ':' + sock.remotePort);
sock.write('服務端發出:連接成功');
// 為這個socket實例添加一個"data"事件處理函數
sock.on('data', function(data) {
console.log('DATA ' + sock.remoteAddress + ': ' + data);
// 回發該數據,客戶端將收到來自服務端的數據
sock.write('You said "' + data + '"');
});
// 為這個socket實例添加一個"close"事件處理函數
sock.on('close', function(data) {
console.log('CLOSED: ' +
sock.remoteAddress + ' ' + sock.remotePort);
});
}).listen(PORT, HOST);
console.log('Server listening on ' + HOST +':'+ PORT);
看到這不懂node.js的朋友也不用著急,在這里你可以使用任意語言c/c++/java/oc等等去實現后臺,這里node.js僅僅是樓主的一個選擇,為了讓我們來驗證之前寫的客戶端scoket的效果。如果你不懂node.js也沒關系,你只需要把上述樓主寫的相關代碼復制粘貼,如果你本機有node的解釋器,那么直接在終端進入該源代碼文件目錄中輸入:
node fileName
即可運行該腳本(fileName為保存源代碼的文件名)。
我們來看看運行效果:

handle2.gif
服務器運行起來了,并且監聽著6969端口。
接著我們用之前寫的iOS端的例子。客戶端打印顯示連接成功,而我們運行的服務器也打印了連接成功。接著我們發了一條消息,服務端成功的接收到了消息后,把該消息再發送回客戶端,繞了一圈客戶端又收到了這條消息。至此我們用OS底層scoket實現了簡單的IM。
大家看到這是不是覺得太過簡單了?
當然簡單,我們僅僅是實現了Scoket的連接,信息的發送與接收,除此之外我們什么都沒有做,現實中,我們需要做的處理遠不止于此,我們先接著往下看。接下來,我們就一起看看第三方框架是如何實現IM的。

分割圖.png
2.我們接著來看看基于Socket原生的CocoaAsyncSocket:
這個框架實現了兩種傳輸協議TCP和UDP,分別對應GCDAsyncSocket類和GCDAsyncUdpSocket,這里我們重點講GCDAsyncSocket。
這里Socket服務器延續上一個例子,因為同樣是基于原生Scoket的框架,所以之前的Node.js的服務端,該例仍然試用。這里我們就只需要去封裝客戶端的實例,我們還是創建一個TYHSocketManager單例。
TYHSocketManager.h
#import (Foundation/Foundation.h)(因識別問題,此處圓括號替換尖括號) @interface TYHSocketManager : NSObject + (instancetype)share; - (BOOL)connect; - (void)disConnect; - (void)sendMsg:(NSString *)msg; - (void)pullTheMsg; @end
TYHSocketManager.m
#import "TYHSocketManager.h"
#import "GCDAsyncSocket.h" // for TCP
static NSString * Khost = @"127.0.0.1";
static const uint16_t Kport = 6969;
@interface TYHSocketManager()(GCDAsyncSocketDelegate)(因識別問題,此處圓括號替換尖括號)
{
GCDAsyncSocket *gcdSocket;
}
@end
@implementation TYHSocketManager
+ (instancetype)share
{
static dispatch_once_t onceToken;
static TYHSocketManager *instance = nil;
dispatch_once(&onceToken, ^{
instance = [[self alloc]init];
[instance initSocket];
});
return instance;
}
- (void)initSocket
{
gcdSocket = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:dispatch_get_main_queue()];
}
#pragma mark - 對外的一些接口
//建立連接
- (BOOL)connect
{
return [gcdSocket connectToHost:Khost onPort:Kport error:nil];
}
//斷開連接
- (void)disConnect
{
[gcdSocket disconnect];
}
//發送消息
- (void)sendMsg:(NSString *)msg
{
NSData *data = [msg dataUsingEncoding:NSUTF8StringEncoding];
//第二個參數,請求超時時間
[gcdSocket writeData:data withTimeout:-1 tag:110];
}
//監聽最新的消息
- (void)pullTheMsg
{
//監聽讀數據的代理 -1永遠監聽,不超時,但是只收一次消息,
//所以每次接受到消息還得調用一次
[gcdSocket readDataWithTimeout:-1 tag:110];
}
#pragma mark - GCDAsyncSocketDelegate
//連接成功調用
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(uint16_t)port
{
NSLog(@"連接成功,host:%@,port:%d",host,port);
[self pullTheMsg];
//心跳寫在這...
}
//斷開連接的時候調用
- (void)socketDidDisconnect:(GCDAsyncSocket *)sock withError:(nullable NSError *)err
{
NSLog(@"斷開連接,host:%@,port:%d",sock.localHost,sock.localPort);
//斷線重連寫在這...
}
//寫成功的回調
- (void)socket:(GCDAsyncSocket*)sock didWriteDataWithTag:(long)tag
{
// NSLog(@"寫的回調,tag:%ld",tag);
}
//收到消息的回調
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
{
NSString *msg = [[NSString alloc]initWithData:data encoding:NSUTF8StringEncoding];
NSLog(@"收到消息:%@",msg);
[self pullTheMsg];
}
//分段去獲取消息的回調
//- (void)socket:(GCDAsyncSocket *)sock didReadPartialDataOfLength:(NSUInteger)partialLength tag:(long)tag
//{
//
// NSLog(@"讀的回調,length:%ld,tag:%ld",partialLength,tag);
//
//}
//為上一次設置的讀取數據代理續時 (如果設置超時為-1,則永遠不會調用到)
//-(NSTimeInterval)socket:(GCDAsyncSocket *)sock shouldTimeoutReadWithTag:(long)tag elapsed:(NSTimeInterval)elapsed bytesDone:(NSUInteger)length
//{
// NSLog(@"來延時,tag:%ld,elapsed:%f,length:%ld",tag,elapsed,length);
// return 10;
//}
@end
這個框架使用起來也十分簡單,它基于Scoket往上進行了一層封裝,提供了OC的接口給我們使用。至于使用方法,大家看看注釋應該就能明白,這里唯一需要說的一點就是這個方法:
[gcdSocket readDataWithTimeout:-1 tag:110];
這個方法的作用就是去讀取當前消息隊列中的未讀消息。 記住,這里不調用這個方法,消息回調的代理是永遠不會被觸發的 。而且必須是tag相同,如果tag不同,這個收到消息的代理也不會被處罰。
我們調用一次這個方法,只能觸發一次讀取消息的代理,如果我們調用的時候沒有未讀消息,它就會等在那,直到消息來了被觸發。一旦被觸發一次代理后,我們必須再次調用這個方法,否則,之后的消息到了仍舊無法觸發我們讀取消息的代理。就像我們在例子中使用的那樣,在每次讀取到消息之后我們都去調用:
//收到消息的回調
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
{
NSString *msg = [[NSString alloc]initWithData:data encoding:NSUTF8StringEncoding];
NSLog(@"收到消息:%@",msg);
[self pullTheMsg];
}
//監聽最新的消息
- (void)pullTheMsg
{
//監聽讀數據的代理,只能監聽10秒,10秒過后調用代理方法 -1永遠監聽,不超時,但是只收一次消息,
//所以每次接受到消息還得調用一次
[gcdSocket readDataWithTimeout:-1 tag:110];
}
除此之外,我們還需要說的是這個超時timeout
這里如果設置10秒,那么就只能監聽10秒,10秒過后調用是否續時的代理方法:
-(NSTimeInterval)socket:(GCDAsyncSocket *)sock shouldTimeoutReadWithTag:(long)tag elapsed:(NSTimeInterval)elapsed bytesDone:(NSUInteger)length
如果我們選擇不續時,那么10秒到了還沒收到消息,那么Scoket會自動斷開連接。看到這里有些小伙伴要吐槽了,怎么一個方法設計的這么麻煩,當然這里這么設計是有它的應用場景的,我們后面再來細講。
我們同樣來運行看看效果:

至此我們也用CocoaAsyncSocket這個框架實現了一個簡單的IM。

分割圖.png
3.接著我們繼續來看看基于webScoket的IM:
這個例子我們會把心跳,斷線重連,以及PingPong機制進行簡單的封裝,所以我們先來談談這三個概念:
首先我們來談談什么是心跳
簡單的來說, 心跳就是用來檢測TCP連接的雙方是否可用 。那又會有人要問了,TCP不是本身就自帶一個KeepAlive機制嗎?
這里我們需要說明的是TCP的KeepAlive機制只能保證連接的存在,但是并不能保證客戶端以及服務端的可用性 .比如會有以下一種情況:
“某臺服務器因為某些原因導致負載超高,CPU 100%,無法響應任何業務請求,但是使用 TCP 探針則仍舊能夠確定連接狀態,這就是典型的連接活著但業務提供方已死的狀態。
這個時候心跳機制就起到作用了:
-
我們客戶端發起心跳Ping(一般都是客戶端),假如設置在10秒后如果沒有收到回調,那么說明服務器或者客戶端某一方出現問題,這時候我們需要主動斷開連接。
-
服務端也是一樣,會維護一個socket的心跳間隔,當約定時間內,沒有收到客戶端發來的心跳,我們會知道該連接已經失效,然后主動斷開連接。
其實做過IM的小伙伴們都知道,我們真正需要心跳機制的原因其實主要是在于國內運營商NAT超時。
那么究竟什么是NAT超時呢?
原來這是因為IPV4引起的,我們上網很可能會處在一個NAT設備(無線路由器之類)之后。
NAT設備會在IP封包通過設備時修改源/目的IP地址. 對于家用路由器來說, 使用的是網絡地址端口轉換(NAPT), 它不僅改IP, 還修改TCP和UDP協議的端口號, 這樣就能讓內網中的設備共用同一個外網IP. 舉個例子, NAPT維護一個類似下表的NAT表:

NAT映射
NAT設備會根據NAT表對出去和進來的數據做修改, 比如將192.168.0.3:8888發出去的封包改成120.132.92.21:9202, 外部就認為他們是在和120.132.92.21:9202通信. 同時NAT設備會將120.132.92.21:9202收到的封包的IP和端口改成192.168.0.3:8888, 再發給內網的主機, 這樣內部和外部就能雙向通信了, 但如果其中192.168.0.3:8888 == 120.132.92.21:9202這一映射因為某些原因被NAT設備淘汰了, 那么外部設備就無法直接與192.168.0.3:8888通信了。
我們的設備經常是處在NAT設備的后面, 比如在大學里的校園網, 查一下自己分配到的IP, 其實是內網IP, 表明我們在NAT設備后面, 如果我們在寢室再接個路由器, 那么我們發出的數據包會多經過一次NAT.
國內移動無線網絡運營商在鏈路上一段時間內沒有數據通訊后, 會淘汰NAT表中的對應項, 造成鏈路中斷。
而國內的運營商一般NAT超時的時間為5分鐘,所以通常我們心跳設置的時間間隔為3-5分鐘。
接著我們來講講PingPong機制:
很多小伙伴可能又會感覺到疑惑了,那么我們在這心跳間隔的3-5分鐘如果連接假在線(例如在地鐵電梯這種環境下)。那么我們豈不是無法保證消息的即時性么?這顯然是我們無法接受的,所以業內的解決方案是采用雙向的PingPong機制。

當服務端發出一個Ping,客戶端沒有在約定的時間內返回響應的ack,則認為客戶端已經不在線,這時我們Server端會主動斷開Scoket連接,并且改由APNS推送的方式發送消息。
同樣的是,當客戶端去發送一個消息,因為我們遲遲無法收到服務端的響應ack包,則表明客戶端或者服務端已不在線,我們也會顯示消息發送失敗,并且斷開Scoket連接。
還記得我們之前CocoaSyncSockt的例子所講的獲取消息超時就斷開嗎?其實它就是一個PingPong機制的客戶端實現。我們每次可以在發送消息成功后,調用這個超時讀取的方法,如果一段時間沒收到服務器的響應,那么說明連接不可用,則斷開Scoket連接
最后就是重連機制:
理論上,我們自己主動去斷開的Scoket連接(例如退出賬號,APP退出到后臺等等),不需要重連。其他的連接斷開,我們都需要進行斷線重連。
一般解決方案是嘗試重連幾次,如果仍舊無法重連成功,那么不再進行重連。
接下來的WebScoket的例子,我會封裝一個重連時間指數級增長的一個重連方式,可以作為一個參考。
言歸正傳,我們看完上述三個概念之后,我們來講一個WebScoket最具代表性的一個第三方框架SocketRocket。
我們首先來看看它對外封裝的一些方法:
@interface SRWebSocket : NSObject (NSStreamDelegate)(因識別問題,此處用圓括號替換尖括號) @property (nonatomic, weak) id delegate; @property (nonatomic, readonly) SRReadyState readyState; @property (nonatomic, readonly, retain) NSURL *url; @property (nonatomic, readonly) CFHTTPMessageRef receivedHTTPHeaders; // Optional array of cookies (NSHTTPCookie objects) to apply to the connections @property (nonatomic, readwrite) NSArray * requestCookies; // This returns the negotiated protocol. // It will be nil until after the handshake completes. @property (nonatomic, readonly, copy) NSString *protocol; // Protocols should be an array of strings that turn into Sec-WebSocket-Protocol. - (id)initWithURLRequest:(NSURLRequest *)request protocols:(NSArray *)protocols allowsUntrustedSSLCertificates:(BOOL)allowsUntrustedSSLCertificates; - (id)initWithURLRequest:(NSURLRequest *)request protocols:(NSArray *)protocols; - (id)initWithURLRequest:(NSURLRequest *)request; // Some helper constructors. - (id)initWithURL:(NSURL *)url protocols:(NSArray *)protocols allowsUntrustedSSLCertificates:(BOOL)allowsUntrustedSSLCertificates; - (id)initWithURL:(NSURL *)url protocols:(NSArray *)protocols; - (id)initWithURL:(NSURL *)url; // Delegate queue will be dispatch_main_queue by default. // You cannot set both OperationQueue and dispatch_queue. - (void)setDelegateOperationQueue:(NSOperationQueue*) queue; - (void)setDelegateDispatchQueue:(dispatch_queue_t) queue; // By default, it will schedule itself on +[NSRunLoop SR_networkRunLoop] using defaultModes. - (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode; - (void)unscheduleFromRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode; // SRWebSockets are intended for one-time-use only. Open should be called once and only once. - (void)open; - (void)close; - (void)closeWithCode:(NSInteger)code reason:(NSString *)reason; // Send a UTF8 String or Data. - (void)send:(id)data; // Send Data (can be nil) in a ping message. - (void)sendPing:(NSData *)data; @end #pragma mark - SRWebSocketDelegate @protocol SRWebSocketDelegate 3 // message will either be an NSString if the server is using text // or NSData if the server is using binary. - (void)webSocket:(SRWebSocket *)webSocket didReceiveMessage:(id)message; @optional - (void)webSocketDidOpen:(SRWebSocket *)webSocket; - (void)webSocket:(SRWebSocket *)webSocket didFailWithError:(NSError *)error; - (void)webSocket:(SRWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean; - (void)webSocket:(SRWebSocket *)webSocket didReceivePong:(NSData *)pongPayload; // Return YES to convert messages sent as Text to an NSString. Return NO to skip NSData -> NSString conversion for Text messages. Defaults to YES. - (BOOL)webSocketShouldConvertTextFrameToString:(SRWebSocket *)webSocket; @end
方法也很簡單,分為兩個部分:
-
一部分為SRWebSocket的初始化,以及連接,關閉連接,發送消息等方法。
-
另一部分為SRWebSocketDelegate,其中包括一些回調:
收到消息的回調,連接失敗的回調,關閉連接的回調,收到pong的回調,是否需要把data消息轉換成string的代理方法。
接著我們還是舉個例子來實現以下,首先來封裝一個TYHSocketManager單例:
TYHSocketManager.h
#import (Foundation/Foundation.h)(因識別問題,此處用圓括號替換尖括號)
typedef enum : NSUInteger {
disConnectByUser ,
disConnectByServer,
} DisConnectType;
@interface TYHSocketManager : NSObject
+ (instancetype)share;
- (void)connect;
- (void)disConnect;
- (void)sendMsg:(NSString *)msg;
- (void)ping;
@end
TYHSocketManager.m
#import "TYHSocketManager.h"
#import "SocketRocket.h"
#define dispatch_main_async_safe(block)\
if ([NSThread isMainThread]) {\
block();\
} else {\
dispatch_async(dispatch_get_main_queue(), block);\
}
static NSString * Khost = @"127.0.0.1";
static const uint16_t Kport = 6969;
@interface TYHSocketManager()(SRWebSocketDelegate)(因識別問題,此處用圓括號替換尖括號)
{
SRWebSocket *webSocket;
NSTimer *heartBeat;
NSTimeInterval reConnectTime;
}
@end
@implementation TYHSocketManager
+ (instancetype)share
{
static dispatch_once_t onceToken;
static TYHSocketManager *instance = nil;
dispatch_once(&onceToken, ^{
instance = [[self alloc]init];
[instance initSocket];
});
return instance;
}
//初始化連接
- (void)initSocket
{
if (webSocket) {
return;
}
webSocket = [[SRWebSocket alloc]initWithURL:[NSURL URLWithString:[NSString stringWithFormat:@"ws://%@:%d", Khost, Kport]]];
webSocket.delegate = self;
//設置代理線程queue
NSOperationQueue *queue = [[NSOperationQueue alloc]init];
queue.maxConcurrentOperationCount = 1;
[webSocket setDelegateOperationQueue:queue];
//連接
[webSocket open];
}
//初始化心跳
- (void)initHeartBeat
{
dispatch_main_async_safe(^{
[self destoryHeartBeat];
__weak typeof(self) weakSelf = self;
//心跳設置為3分鐘,NAT超時一般為5分鐘
heartBeat = [NSTimer scheduledTimerWithTimeInterval:3*60 repeats:YES block:^(NSTimer * _Nonnull timer) {
NSLog(@"heart");
//和服務端約定好發送什么作為心跳標識,盡可能的減小心跳包大小
[weakSelf sendMsg:@"heart"];
}];
[[NSRunLoop currentRunLoop]addTimer:heartBeat forMode:NSRunLoopCommonModes];
})
}
//取消心跳
- (void)destoryHeartBeat
{
dispatch_main_async_safe(^{
if (heartBeat) {
[heartBeat invalidate];
heartBeat = nil;
}
})
}
#pragma mark - 對外的一些接口
//建立連接
- (void)connect
{
[self initSocket];
//每次正常連接的時候清零重連時間
reConnectTime = 0;
}
//斷開連接
- (void)disConnect
{
if (webSocket) {
[webSocket close];
webSocket = nil;
}
}
//發送消息
- (void)sendMsg:(NSString *)msg
{
[webSocket send:msg];
}
//重連機制
- (void)reConnect
{
[self disConnect];
//超過一分鐘就不再重連 所以只會重連5次 2^5 = 64
if (reConnectTime > 64) {
return;
}
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(reConnectTime * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
webSocket = nil;
[self initSocket];
});
//重連時間2的指數級增長
if (reConnectTime == 0) {
reConnectTime = 2;
}else{
reConnectTime *= 2;
}
}
//pingPong
- (void)ping{
[webSocket sendPing:nil];
}
#pragma mark - SRWebSocketDelegate
- (void)webSocket:(SRWebSocket *)webSocket didReceiveMessage:(id)message
{
NSLog(@"服務器返回收到消息:%@",message);
}
- (void)webSocketDidOpen:(SRWebSocket *)webSocket
{
NSLog(@"連接成功");
//連接成功了開始發送心跳
[self initHeartBeat];
}
//open失敗的時候調用
- (void)webSocket:(SRWebSocket *)webSocket didFailWithError:(NSError *)error
{
NSLog(@"連接失敗.....\n%@",error);
//失敗了就去重連
[self reConnect];
}
//網絡連接中斷被調用
- (void)webSocket:(SRWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean
{
NSLog(@"被關閉連接,code:%ld,reason:%@,wasClean:%d",code,reason,wasClean);
//如果是被用戶自己中斷的那么直接斷開連接,否則開始重連
if (code == disConnectByUser) {
[self disConnect];
}else{
[self reConnect];
}
//斷開連接時銷毀心跳
[self destoryHeartBeat];
}
//sendPing的時候,如果網絡通的話,則會收到回調,但是必須保證ScoketOpen,否則會crash
- (void)webSocket:(SRWebSocket *)webSocket didReceivePong:(NSData *)pongPayload
{
NSLog(@"收到pong回調");
}
//將收到的消息,是否需要把data轉換為NSString,每次收到消息都會被調用,默認YES
//- (BOOL)webSocketShouldConvertTextFrameToString:(SRWebSocket *)webSocket
//{
// NSLog(@"webSocketShouldConvertTextFrameToString");
//
// return NO;
//}
.m文件有點長,大家可以參照github中的demo進行閱讀,這回我們添加了一些細節的東西了,包括一個簡單的心跳,重連機制,還有webScoket封裝好的一個pingpong機制。
代碼非常簡單,大家可以配合著注釋讀一讀,應該很容易理解。
需要說一下的是這個心跳機制是一個定時的間隔,往往我們可能會有更復雜實現,比如我們正在發送消息的時候,可能就不需要心跳。當不在發送的時候在開啟心跳之類的。
還有一點需要說的就是這個重連機制,demo中我采用的是2的指數級別增長,第一次立刻重連,第二次2秒,第三次4秒,第四次8秒...直到大于64秒就不再重連。而任意的一次成功的連接,都會重置這個重連時間。
最后一點需要說的是,這個框架給我們封裝的webscoket在調用它的sendPing方法之前,一定要判斷當前scoket是否連接,如果不是連接狀態,程序則會crash。
客戶端的實現就大致如此,接著同樣我們需要實現一個服務端,來看看實際通訊效果。
webScoket服務端實現
在這里我們無法沿用之前的node.js例子了,因為這并不是一個原生的scoket,這是webScoket,所以我們服務端同樣需要遵守webScoket協議,兩者才能實現通信。
其實這里實現也很簡單,我采用了node.js的ws模塊,只需要用npm去安裝ws即可。
什么是npm呢?舉個例子,npm之于Node.js相當于cocospod至于iOS,它就是一個拓展模塊的一個管理工具。如果不知道怎么用的可以看看這篇文章: npm的使用
我們進入當前腳本目錄,輸入終端命令,即可安裝ws模塊:
$ npm install ws
大家如果懶得去看npm的小伙伴也沒關系,直接下載github中的 WSServer.js這個文件運行即可。
該源文件代碼如下:
var WebSocketServer = require('ws').Server,
wss = new WebSocketServer({ port: 6969 });
wss.on('connection', function (ws) {
console.log('client connected');
ws.send('你是第' + wss.clients.length + '位');
//收到消息回調
ws.on('message', function (message) {
console.log(message);
ws.send('收到:'+message);
});
// 退出聊天
ws.on('close', function(close) {
console.log('退出連接了');
});
});
console.log('開始監聽6969端口');
代碼沒幾行,理解起來很簡單。
就是監聽了本機6969端口,如果客戶端連接了,打印lient connected,并且向客戶端發送:你是第幾位。
如果收到客戶端消息后,打印消息,并且向客戶端發送這條收到的消息。
接著我們同樣來運行一下看看效果:

運行我們可以看到,主動去斷開的連接,沒有去重連,而server端斷開的,我們開啟了重連。感興趣的朋友可以下載demo實際運行一下。

分割圖.png
4.我們接著來看看MQTT:
MQTT是一個聊天協議,它比webScoket更上層,屬于應用層。
它的基本模式是簡單的發布訂閱,也就是說當一條消息發出去的時候,誰訂閱了誰就會受到。其實它并不適合IM的場景,例如用來實現有些簡單IM場景,卻需要很大量的、復雜的處理。
比較適合它的場景為訂閱發布這種模式的,例如微信的實時共享位置,滴滴的地圖上小車的移動、客戶端推送等功能。
首先我們來看看基于MQTT協議的框架-MQTTKit:
這個框架是c來寫的,把一些方法公開在MQTTKit類中,對外用OC來調用,我們來看看這個類:
@interface MQTTClient : NSObject {
struct mosquitto *mosq;
}
@property (readwrite, copy) NSString *clientID;
@property (readwrite, copy) NSString *host;
@property (readwrite, assign) unsigned short port;
@property (readwrite, copy) NSString *username;
@property (readwrite, copy) NSString *password;
@property (readwrite, assign) unsigned short keepAlive;
@property (readwrite, assign) BOOL cleanSession;
@property (nonatomic, copy) MQTTMessageHandler messageHandler;
+ (void) initialize;
+ (NSString*) version;
- (MQTTClient*) initWithClientId: (NSString *)clientId;
- (void) setMessageRetry: (NSUInteger)seconds;
#pragma mark - Connection
- (void) connectWithCompletionHandler:(void (^)(MQTTConnectionReturnCode code))completionHandler;
- (void) connectToHost: (NSString*)host
completionHandler:(void (^)(MQTTConnectionReturnCode code))completionHandler;
- (void) disconnectWithCompletionHandler:(void (^)(NSUInteger code))completionHandler;
- (void) reconnect;
- (void)setWillData:(NSData *)payload
toTopic:(NSString *)willTopic
withQos:(MQTTQualityOfService)willQos
retain:(BOOL)retain;
- (void)setWill:(NSString *)payload
toTopic:(NSString *)willTopic
withQos:(MQTTQualityOfService)willQos
retain:(BOOL)retain;
- (void)clearWill;
#pragma mark - Publish
- (void)publishData:(NSData *)payload
toTopic:(NSString *)topic
withQos:(MQTTQualityOfService)qos
retain:(BOOL)retain
completionHandler:(void (^)(int mid))completionHandler;
- (void)publishString:(NSString *)payload
toTopic:(NSString *)topic
withQos:(MQTTQualityOfService)qos
retain:(BOOL)retain
completionHandler:(void (^)(int mid))completionHandler;
#pragma mark - Subscribe
- (void)subscribe:(NSString *)topic
withCompletionHandler:(MQTTSubscriptionCompletionHandler)completionHandler;
- (void)subscribe:(NSString *)topic
withQos:(MQTTQualityOfService)qos
completionHandler:(MQTTSubscriptionCompletionHandler)completionHandler;
- (void)unsubscribe: (NSString *)topic
withCompletionHandler:(void (^)(void))completionHandler;
這個類一共分為4個部分:初始化、連接、發布、訂閱,具體方法的作用可以先看看方法名理解下,我們接著來用這個框架封裝一個實例。
同樣,我們封裝了一個單例MQTTManager。
MQTTManager.h
#import (Foundation/Foundation.h)(因識別問題,此處用圓括號替換尖括號) @interface MQTTManager : NSObject + (instancetype)share; - (void)connect; - (void)disConnect; - (void)sendMsg:(NSString *)msg; @end
MQTTManager.m
#import "MQTTManager.h"
#import "MQTTKit.h"
static NSString * Khost = @"127.0.0.1";
static const uint16_t Kport = 6969;
static NSString * KClientID = @"tuyaohui";
@interface MQTTManager()
{
MQTTClient *client;
}
@end
@implementation MQTTManager
+ (instancetype)share
{
static dispatch_once_t onceToken;
static MQTTManager *instance = nil;
dispatch_once(&onceToken, ^{
instance = [[self alloc]init];
});
return instance;
}
//初始化連接
- (void)initSocket
{
if (client) {
[self disConnect];
}
client = [[MQTTClient alloc] initWithClientId:KClientID];
client.port = Kport;
[client setMessageHandler:^(MQTTMessage *message)
{
//收到消息的回調,前提是得先訂閱
NSString *msg = [[NSString alloc]initWithData:message.payload encoding:NSUTF8StringEncoding];
NSLog(@"收到服務端消息:%@",msg);
}];
[client connectToHost:Khost completionHandler:^(MQTTConnectionReturnCode code) {
switch (code) {
case ConnectionAccepted:
NSLog(@"MQTT連接成功");
//訂閱自己ID的消息,這樣收到消息就能回調
[client subscribe:client.clientID withCompletionHandler:^(NSArray *grantedQos) {
NSLog(@"訂閱tuyaohui成功");
}];
break;
case ConnectionRefusedBadUserNameOrPassword:
NSLog(@"錯誤的用戶名密碼");
//....
default:
NSLog(@"MQTT連接失敗");
break;
}
}];
}
#pragma mark - 對外的一些接口
//建立連接
- (void)connect
{
[self initSocket];
}
//斷開連接
- (void)disConnect
{
if (client) {
//取消訂閱
[client unsubscribe:client.clientID withCompletionHandler:^{
NSLog(@"取消訂閱tuyaohui成功");
}];
//斷開連接
[client disconnectWithCompletionHandler:^(NSUInteger code) {
NSLog(@"斷開MQTT成功");
}];
client = nil;
}
}
//發送消息
- (void)sendMsg:(NSString *)msg
{
//發送一條消息,發送給自己訂閱的主題
[client publishString:msg toTopic:KClientID withQos:ExactlyOnce retain:YES completionHandler:^(int mid) {
}];
}
@end
實現代碼很簡單,需要說一下的是:
1)當我們連接成功了,我們需要去訂閱自己clientID的消息,這樣才能收到發給自己的消息。
2)其次是這個框架為我們實現了一個QOS機制,那么什么是QOS呢?
QoS(Quality of Service, 服務質量 )指一個網絡能夠利用各種基礎技術,為指定的 網絡通信 提供更好的服務能力, 是網絡的一種安全機制, 是用來解決網絡延遲和阻塞等問題的一種技術。
在這里,它提供了三個選項:
typedef enum MQTTQualityOfService : NSUInteger {
AtMostOnce,
AtLeastOnce,
ExactlyOnce
} MQTTQualityOfService;
分別對應最多發送一次,至少發送一次,精確只發送一次。
-
QOS(0),最多發送一次:如果消息沒有發送過去,那么就直接丟失。
-
QOS(1),至少發送一次:保證消息一定發送過去,但是發幾次不確定。
-
QOS(2),精確只發送一次:它內部會有一個很復雜的發送機制,確保消息送到,而且只發送一次。
同樣的我們需要一個用MQTT協議實現的服務端,我們還是node.js來實現,這次我們還是需要用npm來新增一個模塊mosca。
我們來看看服務端代碼:
MQTTServer.js
var mosca = require('mosca');
var MqttServer = new mosca.Server({
port: 6969
});
MqttServer.on('clientConnected', function(client){
console.log('收到客戶端連接,連接ID:', client.id);
});
/**
* 監聽MQTT主題消息
**/
MqttServer.on('published', function(packet, client) {
var topic = packet.topic;
console.log('有消息來了','topic為:'+topic+',message為:'+ packet.payload.toString());
});
MqttServer.on('ready', function(){
console.log('mqtt服務器開啟,監聽6969端口');
});
服務端代碼沒幾行,開啟了一個服務,并且監聽本機6969端口。并且監聽了客戶端連接、發布消息等狀態。
接著我們同樣來運行一下看看效果:

至此,我們實現了一個簡單的MQTT封裝。
5.XMPP:XMPPFramework框架
結果就是并沒有XMPP...因為個人感覺XMPP對于IM來說實在是不堪重用。僅僅只能作為一個玩具demo,給大家練練手。網上有太多XMPP的內容了,相當一部分用openfire來做服務端,這一套東西實在是太老了。還記得多年前,樓主初識IM就是用的這一套東西...
三、關于IM傳輸格式的選擇:
引用陳宜龍大神文章( iOS程序犭袁 )中一段:
使用 ProtocolBuffer 減少 Payload
滴滴打車40%;
攜程之前分享過,說是采用新的Protocol Buffer數據格式+Gzip壓縮后的Payload大小降低了15%-45%。數據序列化耗時下降了80%-90%。
采用高效安全的私有協議,支持長連接的復用,穩定省電省流量
【高效】提高網絡請求成功率,消息體越大,失敗幾率隨之增加。
【省流量】流量消耗極少,省流量。一條消息數據用Protobuf序列化后的大小是 JSON 的1/10、XML格式的1/20、是二進制序列化的1/10。同 XML 相比, Protobuf 性能優勢明顯。它以高效的二進制方式存儲,比 XML 小 3 到 10 倍,快 20 到 100 倍。
【省電】省電
【高效心跳包】同時心跳包協議對IM的電量和流量影響很大,對心跳包協議上進行了極簡設計:僅 1 Byte 。
【易于使用】開發人員通過按照一定的語法定義結構化的消息格式,然后送給命令行工具,工具將自動生成相關的類,可以支持java、c++、python、Objective-C等語言環境。通過將這些類包含在項目中,可以很輕松的調用相關方法來完成業務消息的序列化與反序列化工作。語言支持:原生支持c++、java、python、Objective-C等多達10余種語言。 2015-08-27 Protocol Buffers v3.0.0-beta-1中發布了Objective-C(Alpha)版本, 2016-07-28 3.0 Protocol Buffers v3.0.0正式版發布,正式支持 Objective-C。
【可靠】微信和手機 QQ 這樣的主流 IM 應用也早已在使用它(采用的是改造過的Protobuf協議)

如何測試驗證 Protobuf 的高性能?
對數據分別操作100次,1000次,10000次和100000次進行了測試,
縱坐標是完成時間,單位是毫秒,
反序列化
序列化
字節長度



數據來源

數據來自:項目 thrift-protobuf-compare ,測試項為 Total Time,也就是 指一個對象操作的整個時間,包括創建對象,將對象序列化為內存中的字節序列,然后再反序列化的整個過程。從測試結果可以看到 Protobuf 的成績很好.
缺點:
可能會造成 APP 的包體積增大,通過 Google 提供的腳本生成的 Model,會非常“龐大”,Model 一多,包體積也就會跟著變大。
如果 Model 過多,可能導致 APP 打包后的體積驟增,但 IM 服務所使用的 Model 非常少,比如在 ChatKit-OC 中只用到了一個 Protobuf 的 Model:Message對象,對包體積的影響微乎其微。
在使用過程中要合理地權衡包體積以及傳輸效率的問題,據說去哪兒網,就曾經為了減少包體積,進而減少了 Protobuf 的使用。
綜上所述,我們選擇傳輸格式的時候:ProtocolBuffer > Json > XML
四、IM一些其它問題
1.IM的可靠性:
我們之前穿插在例子中提到過:
心跳機制、PingPong機制、斷線重連機制、還有我們后面所說的QOS機制。這些被用來保證連接的可用,消息的即時與準確的送達等等。
上述內容保證了我們IM服務時的可靠性,其實我們能做的還有很多:比如我們在大文件傳輸的時候使用分片上傳、斷點續傳、秒傳技術等來保證文件的傳輸。
2.安全性:
我們通常還需要一些安全機制來保證我們IM通信安全。
例如: 防止 DNS 污染 、帳號安全、第三方服務器鑒權、單點登錄等等
3.一些其他的優化:
類似微信,服務器不做聊天記錄的存儲,只在本機進行緩存,這樣可以減少對服務端數據的請求,一方面減輕了服務器的壓力,另一方面減少客戶端流量的消耗。
我們進行http連接的時候盡量采用上層API,類似NSUrlSession。而網絡框架盡量使用AFNetWorking3。因為這些上層網絡請求都用的是HTTP/2 ,我們請求的時候可以復用這些連接。
五、音視頻通話
IM應用中的實時音視頻技術,幾乎是IM開發中的最后一道高墻。原因在于:實時音視頻技術 = 音視頻處理技術 + 網絡傳輸技術 的橫向技術應用集合體,而公共互聯網不是為了實時通信設計的。
實時音視頻技術上的實現內容主要包括:音視頻的采集、編碼、網絡傳輸、解碼、播放等環節。這么多項并不簡單的技術應用,如果把握不當,將會在在實際開發過程中遇到一個又一個的坑。
來自:http://www.cocoachina.com/ios/20170110/18544.html