實時 Django 終于來了 —— Django Channels 入門指南
當Django剛創建時,那是十多年前,網絡還是一個不太復雜的地方。大部分的網頁都是靜態的。由數據庫支撐的模型/視圖/ 控制器架構的網絡應用還是很新鮮的東西。 Ajax 剛剛開始被使用,只在較少的場景中。
到現在2016年,網絡明顯更加強大。過去的幾年里已經看到了所謂的“實時”網絡應用:在這類應用中客戶端和服務器之間、點對點通信交互非常頻繁。包含很多服務(又名微服務)的應用也變成是常態。新的web技術允許web應用程序走向十年前我們只敢在夢里想象的方向。這些核心技術之一就是 WebSockets :一種新的提供全雙工通信的協議——一個持久的,允許任何時間發送數據的客戶端和服務器之間的連接。
在這個新的世界,Django顯示出了它的老成。在其核心,Django是建立在請求和響應的簡單概念之上的:瀏覽器發出請求,Django調用一個視圖,它返回一個響應并發送回瀏覽器。
這在WebSockets中是行不通的 !視圖的生命周期只在一個請求當中,沒有一種機制能打開一個連接不斷的發送數據到客戶端,而不發送相關請求。
因此: Django Channels 就應運而生了。Channels,簡而言之,取代了Django中的“guts” ——請求/響應周期發送跨通道的消息。Channels允許Django以非常類似于傳統HTTP的方式支持WebSockets。Channels也允許在運行Django的服務器上運行后臺任務。HTTP請求表現以前一樣,但也通過Channels進行路由。因此,在Channels 支持下Django現在看起來像這樣:
如您所見,Django Channels引入了一些新的概念:
Channels基本上就是任務隊列:消息被生產商推到通道,然后傳遞給監聽通道的消費者之一。如果你使用Go語言中的渠道,這個概念應該相當熟悉。主要的區別在于,Django Channels通過網絡工作,使生產者和消費者透明地運行在多臺機器上。這個網絡層稱為通道層。通道設計時使用Redis作為其首選通道層,雖然也支持其他類型(和API來創建自定義通道層)。
現在, 通道作為一個獨立的應用程序 搭配使用Django 1.9使用。計劃是將通道合并到Django1.10版本,今年夏天將會發布。
我認為Channels將是Django的一個非常重要的插件:它們將支撐Django順利進入這個新的web開發的時代。雖然這些api還沒有成為Django的一部分,他們將很快就會是!所以,現在是一個完美的時間開始學習Channels:你可以了解未來的Django。
開始實踐:如何在Django中實現一個實時聊天應用
作為一個例子,我構建了一個簡單的實時聊天應用程序——就像一個非常非常輕量級的Slack。有很多的房間,每個人都在同一個房間里可以聊天,彼此實時交互(使用WebSockets)。
你可以訪問我在 網絡上部署的例子 ,看看在GitHub上的代碼,或點擊這個按鈕來部署自己的。(這需要一個免費的Heroku賬戶,所以得要 先注冊 ):
注意 :你需要在點擊前面的鏈接后,啟動工作進程。使用儀表盤或運行heroku ps:scale web=1:free worker=1:free。
如果你想深入了解這個應用程序是如何工作的——包括你為什么需要worker!——那么請繼續讀下去。我將會一步一步來構建這個應用程序,并突出關鍵位置和概念。
第一步——從Django開始
雖然在實現上有了很大差異,但是這仍舊是我們使用了十年的Django。所以第一步和其他任何Django應用是一樣的(如果你是Django新手,你得看看 如何在Heroku上開始使用 Python 和 Django新手教程 )。創建一個工程后,你可以定義模型來表示一個聊天室和其中的消息( chat/models.py ):
class Room(models.Model):
name = models.TextField()
label = models.SlugField(unique=True)
class Message(models.Model):
room = models.ForeignKey(Room, related_name='messages')
handle = models.TextField()
message = models.TextField()
timestamp = models.DateTimeField(default=timezone.now, db_index=True)
然后創建一個 聊天室視圖 以及相應的 urls.py 和 模板 :
def chat_room(request, label):
# If the room with the given label doesn't exist, automatically create it
# upon first visit (a la etherpad).
room, created = Room.objects.get_or_create(label=label)
# We want to show the last 50 messages, ordered most-recent-last
messages = reversed(room.messages.order_by('-timestamp')[:50])
return render(request, "chat/room.html", {
'room': room,
'messages': messages,
})
現在,我們已經已經有了一個可以運行的Django應用。如果你在標準的Django環境中運行它,你可以看到已經存在的聊天室和聊天記錄,但是聊天室內無法進行交互操作。實時沒有起作用,我們得做工作來處理 WebSockets。
接下來我們做什么
為了搞明白接下來后臺需要做些什么,我們得先看下客戶端的代碼。你可以在 chat.js 中找到,其實也沒做多少工作!首先,創建一個 websocket:
varws_scheme = window.location.protocol == "https:" ? "wss" : "ws";
varchat_socket = new ReconnectingWebSocket(ws_scheme + '://' + window.location.host + "/chat" + window.location.pathname);
注意:
- 像HTTP和 HTTPS一樣,WebSocket協議區分為安全和非安全兩種,我們可以按照需要選擇合適的.
- 因為 Heroku的路由有60秒鐘過期 的問題。 我使用了 一個 瀏覽器WebSocket 小技巧 可以在socket斷開時自動重連。 (感謝Kenneth Reitz,在他的 Flask WebSocket 例子中指出了這一點)。
接下來,我們將加入一個回調函數,當表單提交時,我們就通過WebSocket發送數據(而不是 POST數據):
$('#chatform').on('submit', function(event) {
varmessage = {
handle: $('#handle').val(),
message: $('#message').val(),
}
chat_socket.send(JSON.stringify(message));
return false;
});
我們可以通過WebSocket發送任何想要發送的數據。像眾多的API一樣, JSON 是最容易的,所以我們將要發送的數據打包成JSON格式。
最后,我們需要將回調函數與WebSocket上的新數據接收事件對接起來:
chatsock.onmessage = function(message) {
vardata = JSON.parse(message.data);
$('#chat').append('<tr>'
+ '<td>' + data.timestamp + '</td>'
+ '<td>' + data.handle + '</td>'
+ '<td>' + data.message + ' </td>'
+ '</tr>');
};
簡單提示:從獲取的信息中拉取數據,在會話的表上加上一行。如果現在就運行這個代碼,他是無法運行的,現在還沒有誰監聽WebSocket連接呢,只是簡單的HTTP。現在,讓我們來連接WebSocket。
安裝和創建 Channels
要將這個應用“通道化”,我們需要做三件事情:安裝Channels,建立通道層,定義通道路由,修改我們的工程使其運行在Channels上(而不是WSGI)。
1. 安裝Channels
要安裝Channels,只需要執行pip install channels,然后將 “channels”添加到 INSTALLED_APPS配置項中。安裝Channels后,允許Django以“通道模式”運行,使用上面描述的通道架構來完成請求/響應的循環。(為了向后兼容,你仍可以以 WSGI模式運行Django ,但是在這種模式下WebSockets和Channel的其他特性就不能工作了。)
2. 選擇一個通道層
接下來,我們將定義一個通道層。這是Channels用來在消費者和生產者(消息發送者)之間傳遞消息的交換機制。 這是一種有特定屬性的消息隊列。
我們將使用Redis作為我們的通道層:它是首選的生產型(可用于工程部署)通道層,是部署在Heroku上顯而易見的選擇。 當然也有一些駐留內存和基于數據的通道層,但是它們更適合于本地開發或者低流量情況下使用。
但是首先:因為Redis通道層是在另外的包中實現的,我們需要運行pip安裝 asgi_redis。(我將會在下面稍微介紹點“ASGI”。)然后我們在CHANNEL_LAYERS配置中定義通道層:
CHANNEL_LAYERS = {
"default": {
"BACKEND": "asgi_redis.RedisChannelLayer",
"CONFIG": {
"hosts": [os.environ.get('REDIS_URL', 'redis://localhost:6379')],
},
"ROUTING": "chat.routing.channel_routing",
},
}
要注意的是我們把Redis的連接URL放到環境外面,以適應部署到Heroku的情況。
3. 通道路由
在通道層(CHANNEL_LAYERS),我們已經告訴 Channel去哪里找通道路由——chat.routing.channel_routing。通道路由很類似與URL路由的概念:URL路由將URL映射到視圖函數;通道路由將通道映射到消費者函數。跟 urls.py類似,按照慣例通道路由應該在routing.py里。現在,我們創建一條空路由:
channel_routing = {}
(我們將在后面看到好幾條通道路由信息,當連接WebSocket的時候回用到。)
你會注意到我們的app里有urls.py和routing.py兩個文件:我們使用同一個app處理HTTP請求和WebSockets。這是很典型的做法:Channels應用也是Django應用,所以你想用的所有Django的特性——視圖,表單,模型等等——都可以在Channels應用里使用。
4. 運行
最后,我們需要替換掉Django的基于HTTP/WSGI的請求處理器,而是使用通道。它是一個基于新興標準ASGI(異步服務器網關接口)的, 所以我們將在asgi.py文件里定義處理器:
import os
import channels.asgi
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "chat.settings")
channel_layer = channels.asgi.get_channel_layer()
(將來,Django會自動生成這個文件,就像現在自動生成wsgi.py文件一樣。)
現在,如果一切順利的話,我們應該能在通道上把這個app運行起來。Channels接口服務叫做 Daphne ,我們可以運行如下命令運行這個app:
$ daphnechat.asgi:channel_layer --port 8888
** 如果現在訪問http://localhost:8888/ 我們會看到……什么事情也沒發生。這很讓人困惑,直到你想起Channels將 Django分成了兩部分:前臺接口服務 Daphne ,后臺消息消費者。所以想要處理HTTP 請求,我們得運行一個worker:
$ pythonmanage.pyrunworker
現在請求應該能傳遞過去了。這說明了其中的機制很簡潔:Channels 繼續處理 HTTP(S)請求,但是是以一個完全不同的方式去處理,這與通過Django運行 Celery 沒有太大的不同,那種情況下運行WSGI服務的同時也要運行Celery服務。不過現在,所有的任務——HTTP請求, WebSockets,后臺服務都在worker中運行起來了.
(順便說一句,我們仍然可以通過運行python manage.py runserver命令來做本地測試。當這么做時, Channels只是在同一進程里運行起Daphne和一個worker。)
WebSocket消費者
好了,我們已經完成了安裝;讓我們開始進入最奇妙的部分吧。
Channels 將WebSocket連接映射到三個通道中:
- 一個新的客戶端 (如瀏覽器)第一次通過WebSocket連接上時,一條消息被發送到 websocket.connect 通道。當這發生時,我們記錄這個客戶端當前進入一個已知的聊天室。
- 每條客戶端通過已建立的socket發送的消息都被發送到 websocket.receive通道。(這些都是從瀏覽器接收到的消息;記住通道都是單向的。我們等一會兒會介紹如何將消息發送給客戶端。)當一條消息被接受時,我們將對聊天室里所有其他客戶端進行廣播。
- 最后,當客戶端斷開連接時,一條消息被發送到websocket.disconnect通道。當這發生時,我們將此客戶端從聊天室里移除。
首先,我們得在routing.py文件里對這個三個通道進行hook:
from . import consumers
channel_routing = {
'websocket.connect': consumers.ws_connect,
'websocket.receive': consumers.ws_receive,
'websocket.disconnect': consumers.ws_disconnect,
}
其實很簡單:就是將每個通道連接到對應的處理函數。現在我們來看看這些函數。按照慣例我們會將這些函數放到一個 consumers.py 文件里(但是像視圖一樣,其實也可以放在任何地方)。
首先來看看 ws_connect:
from channelsimport Group
from channels.sessionsimport channel_session
from .modelsimport Room
@channel_session
def ws_connect(message):
prefix, label = message['path'].strip('/').split('/')
room = Room.objects.get(label=label)
Group('chat-' + label).add(message.reply_channel)
message.channel_session['room'] = room.label
(為了清晰起見,我將代碼中的異常處理和日志去掉了。
這里代碼很多,讓我們一行行來看:
7. 客戶端將會連接到一個/chat/{label}/形式的WebSocket,label映射的是一個房間的屬性。因為所有的WebSocket消息(不考慮URL)客戶端都可以在相同的頻道里發送和獲取消息,我們要在哪個房間工作,通過路徑解析就可以。
客戶端解析WebSocket路徑是通過讀取message[‘path’]獲得的,這不同于傳統的URL路由,Django的urls.py的路由是基于path的。如果你有多個WebSocket URL,你會需要路由到你自己定制的不同函數。(這是一個“早期”頻道方面的內容;很可能在未來的版本里Channel將會包含在WebSocket URL 路由中。)
8.現在,我們可以從數據庫中查看Room對象了。
9.這條線是使聊天功能能工作的關鍵。我們需要知道如何把消息發送回這個客戶端。要做到這點,我們將使用消息的應答通道——每條消息都會有一個應答通道屬性(reply_channelattribute),可以用來把消息發送回這個客戶端。(我們不需要去自己創建這個通道;Channels已經創建好了。)
然而,只把消息發送到這一個通道還是遠遠不夠的的;當一個用戶聊天時,我們想把消息送給每一個連接到此聊天室的用戶。要做到這點,我們使用一個通道組( channel group)。一個組是由多個通道連接而成,你可以用他來廣播消息。 所以,我們將這個消息的應答通道加入到這個聊天室的特殊通道組中。
10.最后,后續的消息(接收/斷開)不再包含這個URL(因為連接已經激活)。所以,我們需要一種方式來把一個WebSocket連接映射到哪個聊天室記錄下來。要做到這點,我們可以使用一個通道會話。通道會話很像 Django的會話框架: 它們通過通道消息的屬性message.channel_session把這些信息持久化下來。我們給一個消費者添加修飾屬性 @channel_session,就可以讓會話框架起效。
現在一個客戶端已經連接上來了,讓我們看看ws_receive。WebSocket上每接收一條消息,這個消費者都會被調用:
@channel_session
def ws_receive(message):
label = message.channel_session['room']
room = Room.objects.get(label=label)
data = json.loads(message['text'])
m = room.messages.create(handle=data['handle'], message=data['message'])
Group('chat-'+label).send({'text': json.dumps(m.as_dict())})
(再一次說明,為了清晰起見,我把錯誤處理和日志都去掉了。)
最初的幾行很簡單:從 channel_session中解析出聊天室,在數據庫中查找出來該聊天室,解析JSON消息,將消息作為Message對象存放在數據庫中。然后,我們所要作的就是將這條消息廣播給聊天室里所有的成員,為了做到這點我們可以使用和前面一樣的通道組。Group.send()將會把這條信息發送到加入到本組的所有reply_channel。
然后, ws_disconnect就很簡單了:
@channel_session
def ws_disconnect(message):
label = message.channel_session['room']
Group('chat-'+label).discard(message.reply_channel)
這里,在從channel session里查找到聊天室后,我們從聊天組里斷開了reply_channel,就是這樣!
部署和擴展
現在我們已經把 WebSockets連接起來并開始工作,我們可以像上面一樣運行daphne和worker進行測試,或者運行manage.py runserver)。但是和自己聊天是很寂寞的哦,所以讓我們在Heroku上把它跑起來!
大部分情況下, 一個 Channels 應用和一個Python應用在Heroku上都是一樣的——在requirements.txt中有詳細需求, 在runtime.txt定義Python運行事,通過標準的git推送到heroku上進行部署,等等。 我將重點突出那些Channel應用和標準Django應用不一樣的地方:
1. Procfile 和處理類型
因為Channels應用同時需要 HTTP/WebSocket 服務和一個后臺通道消費者, 所以Procfile需要定義這兩種類型。下面是我們的Procfile:
web: daphnechat.asgi:channel_layer --port $PORT --bind 0.0.0.0 -v2
worker: pythonmanage.pyrunworker -v2
當我們首次部署,我們需要確認兩種處理類型都在運行中(Heroku默認值啟動web進程):
$ herokups:scaleweb=1:freeworker=1:free
(一個簡單的應用將運行在 Heroku的免費或者愛好者層上,不過在實際使用環境中你可能需要升級到產品級來提高吞吐量。)
2. 插件: Postgres和Redis
就像Django的大多數應用,你需要一個數據庫, Heroku的Postgres可以完美的滿足要求。然而,Channels也需要一個 Redis實例作為通道層。所以,我們在首次部署我們的應用時需要創建一個 Heroku Postgres和一個 Heroku Redis:
$ heroku addons:create heroku-postgresql
$ heroku addons:create heroku-redis
3. 擴展
因為Channels實在是太新了,擴展性問題還不是很了解。然而,基于現在的架構和我早前做的一些性能測試,我可以做出一些預測。關鍵點在于Channels 把負責連接的處理進程(daphne)和負責通道消息處理的處理進程(runworker)分開了。這意味著:
- 通道的吞吐量——HTTP請求, WebSocket消息,或者自定義的通道消息——取決于工作者進程的數量。所以,如果你需要處理大量的請求,你可以擴展工作者進程 (比如,heroku上 ps:scale worker=3)。
- 并發水平——當前打開的連接數——將受限于前端web進程的規模。所以,如果你需要處理大量并發的WebSocket連接,你得擴展web進程(比如, heroku 上ps:scale worker=2)。
基于我前期做的測試工作, 在一個Standard-1X進程內Daphne是非常適合處理成百的并發連接的。所以我估計很少有場景需要擴展這個web進程。一個Channels應用中的工作者進程的個數與一個老風格Django應用所需的web進程個數是相當的。
接下來要做些什么呢?
對WebSocket的支持是Django的一項很大的新特性,但是這只粗淺介紹了Channels可以做些什么。你要記住:Channels是一個運行后臺任務的通用工具。因此,很多過去需要 Celery 或者Python-RQ 才能做得事情,都可以用Channels替換。 Channels無法完全替換復雜的任務隊列:他有些很重要的限制,比如只發一次,這并不適合所有的場景。當然, Channels可以使通常的后臺任務更加簡單。比如,你可以很容易的使用Channels完成圖像縮略圖生成,發送郵件、推文或者短信,運行耗時數據計算等等工作。
對于Channels來說:計劃在 Django 1.10中包含Channels ,定于今年夏天發布。這意味著現在是一個很好的時機來嘗試一下并給出反饋:您的反饋將會推動這一重要特性的發展方向。如果你想參與進來,看看這份指導文檔 向Djang貢獻代碼 ,然后到django開發者郵件列表 里分享你的反饋。
來自:http://python.jobbole.com/86861/