摘要:相較于手機端的消息推送(一般都是以Socket方式實現),WEB端是基于HTTP協議,很難像TCP一樣保持長連接。但隨著技術的發展,出現了WebSocket,Comet等新的技術可以達到類似長連接的效果。
在團隊協同工具 Worktile的使用過程中,你會發現無論是右上角的消息通知,還是在任務面板中拖動任務,還有用戶的在線狀態,都是實時刷新。Worktile中的推送服務是采用的是基于XMPP協議、Erlang語言實現的Ejabberd,并在其源碼基礎上,結合我們的業務,對源碼作了修改以適配我們自身的需求。另外,基于AMQP協議也可以作為實時消息推送的一種選擇,踢踢網就是采用 RabbitMQ+STOMP協議實現的消息推送服務。本文將結合我在Worktile和踢踢網的項目實踐,介紹下消息推送服務的具體實現。
實時推送的幾種實現方式
相較于手機端的消息推送(一般都是以Socket方式實現),WEB端是基于HTTP協議,很難像TCP一樣保持長連接。但隨著技術的發展,出現了WebSocket,Comet等新的技術可以達到類似長連接的效果,這些技術大體可分為以下幾類:
1)短輪詢。頁面端通過JS定時異步刷新,這種方式實時效果較差。
2)長輪詢。頁面端通過JS異步請求服務端,服務端在接收到請求后,如果該次請求沒有數據,則掛起這次請求,直到有數據到達或時間片(服務端設定)到,則返回本次請求,客戶端接著下一次請求。示例如下:
3)WebSocket。瀏覽器通過WebSocket協議連接服務端,實現了瀏覽器和服務器端的全雙工通信。需要服務端和瀏覽器都支持WebSocket協議。
以上幾種方式中,方式1實現較簡單,但效率和實時效果較差。方式2對服務端實現的要求比較高,尤其是并發量大的情況下,對服務端的壓力很大。方式3效率較高,但對較低版本的瀏覽器不支持,另外服務端也需要有支持WebSocket的實現。Worktile的WEB端實時消息推送,采用的是XMPP擴展協議XEP-0124 BOSH( http://xmpp.org/extensions/xep-0124.html),本質是采用方式2長輪詢的方式。踢踢網則采用了WebSocket連接RabbitMQ的方式實現,下面我會具體介紹如何用這兩種方式實現Server Push。
運行時環境準備
服務端的實現中,無論采用Ejabberd還是RabbitMQ,都是基于Erlang語言開發的,所以必須安裝Erlang運行時環境。Erlang是一種函數式語言,具有容錯、高并發的特點,借助OTP的函數庫,很容易構建一個健壯的分布式系統。目前,基于Erlang開發的產品有,數據庫方面:Riak(Dynamo實現)、CouchDB, Webserver方面:Cowboy、Mochiweb, 消息中間件有RabbitMQ等。對于服務端程序員來說,Erlang提供的高并發、容錯、熱部署等特性是其他語言無法達到的。無論在實時通信還是在游戲程序中,用Erlang可以很容易為每一個上線用戶創建一個對應的Process,對一臺4核8個G的服務器來說,承載上百萬個這樣的Process是非常輕松的事。下圖是Erlang程序發起Process的一般性示意圖:
如圖所示,Session Manager(or Gateway)負責為每個用戶(UID)創建相對應的Process, 并把這個對應關系(MAP)存放到數據表中。每個Process則對應用戶數據,并且他們之間可以相互發送消息。Erlang的優勢就是在內存足夠的情況下創建上百萬個這樣的Process,而且它的創建和銷毀比JAVA的Thread要輕量的多,兩者不是一個數量級的。
好了,我們現在開始著手Erlang環境的搭建(實驗的系統為Ubuntu 12.04, 4核8個G內存):
1、依賴庫安裝
[js] view plaincopy在CODE上查看代碼片派生到我的代碼片
sudo apt-get install build-essential
sudo apt-get install libncurses5-dev
sudo apt-get install libssl-dev libyaml-dev
sudo apt-get install m4
sudo apt-get install unixodbc unixodbc-dev
sudo apt-get install freeglut3-dev libwxgtk2.8-dev
sudo apt-get install xsltproc
sudo apt-get install fop tk8.5 libxml2-utils
2、官網下載OTP源碼包( http://www.erlang.org/download.html), 解壓并安裝:
[js] view plaincopy在CODE上查看代碼片派生到我的代碼片
tar zxvf otpsrcR16B01.tar.gz
cd otpsrcR16B01
configure
make & make install
至此,erlang運行環境就完成了。下面將分別介紹rabbitmq和ejabberd構建實時消息服務。
基于RabbitMQ的實時消息服務
RabbitMQ是在業界廣泛應用的消息中間件,也是對AMQP協議實現好的一種中間件。AMQP協議中定義了Producer、 Consumer、MessageQueue、Exchange、Binding、Virtual Host等實體,他們的關系如下圖所示:
消息發布者(Producer)連接交換器(Exchange), 交換器和消息隊列(Message Queue)通過KEY進行Binding,Binding是根據Exchange的類型(分為Fanout、Direct、Topic、Header)分別對消息作不同形式的派發。Message Queue又分為Durable、Temporary、Auto-Delete三種類型,Durable Queue是持久化隊列,不會因為服務ShutDown而消失,Temporary Queue則服務重啟后會消失,Auto-Delete則是在沒有Consumer連接時自動刪除。另外RabbitMQ有很多第三方插件,可以基于AMQP協議基礎之上做出很多擴展的應用。下面我們將介紹WEB STOMP插件構建基于AMQP之上的STOMP文本協議,通過瀏覽器WebSocket達到實時的消息傳輸。系統的結構如圖:
如圖所示,WEB端我們使用STOMP.JS和SockJS.JS與RabbitMQ的WEB STOMP Plugin通信,手機端可以用STOMPj, Gozirra(Android)或者Objc-STOMP(IOS)通過STOMP協議與RabbitMQ收發消息。因為我們是實時消息系統通常都是要與已有的用戶系統結合,RabbitMQ可以通過第三方插件RabbitMQ-AYTH-Backend-HTTP來適配已有的用戶系統,這個插件可以通過HTTP接口完成用戶連接時的認證過程。當然,認證方式還有LDAP等其他方式。下面介紹具體步驟:
從官網( http://rabbitmq.com/download.html)下載新版本的源碼包,解壓并安裝:
[js] view plaincopy在CODE上查看代碼片派生到我的代碼片
tar zxf rabbitmq-server-x.x.x.tar.gz
cd rabbitmq-server-x.x.x
make & make install
為RabbitMQ安裝WEB-STOMP插件
[js] view plaincopy在CODE上查看代碼片派生到我的代碼片
cd /path/to/your/rabbitmq
./sbin/rabbitmq-plugins enable rabbitmq_web_stomp
./sbin/rabbitmq-plugins enable rabbitmq_web_stomp_examples
./sbin/rabbitmqctl stop
./sbin/rabbitmqctl start
./sbin/rabbitmqctl status
將會顯示下圖所示的運行的插件列表
安裝用戶授權插件
[js] view plaincopy在CODE上查看代碼片派生到我的代碼片
cd /path/to/your/rabbitmq/plugins
wget <a href="http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez">http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez</a>
cd ..
./sbin/rabbitmq-plugins enable rabbitmq_auth_backend_http
編輯RabbitMQ.Config文件(默認存放于/ECT/RabbitMQ/下),添加:
[js] view plaincopy在CODE上查看代碼片派生到我的代碼片
[
...
{rabbit, [{auth_backends, [rabbit_auth_backend_http]}]},
...
{rabbitmq_auth_backend_http,
[{user_path, “http://your-server/auth/user”},
{vhost_path, “http://your-server/auth/vhost”},
{resource_path, “http://your-server/auth/resource”}
]}
...
].
其中,User_Path是根據用戶名密碼進行校驗,VHOST_Path是校驗是否有權限訪問VHOST, Resource_Path是校驗用戶對傳入的Exchange、Queue是否有權限。我下面的代碼是用Node.js實現的這三個接口的示例:
[js] view plaincopy在CODE上查看代碼片派生到我的代碼片
var express = require('express');
var app = express();
app.get('/auth/user', function(req, res){
var name = req.query.username;
var pass = req.query.password;
console.log("name : " + name + ", pass : " + pass);
if(name === 'guest' && pass === "guest"){
console.log("allow");
res.send("allow");
}else{
res.send('deny');
}
});
app.get('/auth/vhost', function(req, res){
console.log("/auth/vhost");
res.send("allow");
});
app.get('/auth/resource', function(req, res){
console.log("/auth/resource");
res.send("allow");
});
app.listen(3000);
瀏覽器端JS實現,示例代碼如下:
[js] view plaincopy在CODE上查看代碼片派生到我的代碼片
......
var ws = new SockJS('http://' + window.location.hostname + ':15674/stomp');
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
client.debug = pipe('#second');
var print_first = pipe('#first', function(data) {
client.send('/exchange/feed/user_x', {"content-type":"text/plain"}, data);
});
var on_connect = function(x) {
id = client.subscribe("/exchange/feed/user_x", function(d) {
print_first(d.body);
});
};
var on_error = function() {
console.log('error');
};
client.connect('guest1', 'guest1', on_connect, on_error, '/');
......
需要說明的時,在這里我們首先要在RabbitMQ實例中創建Feed這個Exchange,我們用STOMP.JS連接成功后,根據當前登陸用戶的ID(user_x)綁定到這個Exchange,即Subscribe(“/exchange/feed/user_x”, …) 這個操作的行為,這樣在向RabbitMQ中Feed Exchange發送消息并指定用戶ID(user_x)為KEY,頁面端就會通過WEB Socket實時接收到這條消息。
到目前為止,基于RabbitMQ+STOMP實現WEB端消息推送就已經完成,其中很多的細節需要小伙伴們親自去實踐了,這里就不多說了。實踐過程中可以參照官方文檔:
http://rabbitmq.com/stomp.html
http://rabbitmq.com/web-stomp.html
https://github.com/simonmacmullen/rabbitmq-auth-backend-http
以上的實現是我本人在踢踢網時采用的方式,下面接著介紹一下現在在Worktile中如何通過Ejabberd實現消息推送。
基于Ejabberd的實時消息推送
與RabbitMQ不同,Ejabberd是XMPP協議的一種實現,與AMQP相比,XMPP廣泛應用于即時通信領域。XMPP協議的實現有很多種,比如JAVA的OpenFire,但相較其他實現,Ejabberd的并發性能無疑使的。XMPP協議的前身是Jabber協議,早期的Jabber協議主要包括在線狀態(Presence)、好友花名冊(Roster)、IQ(Info/Query)幾個部分。現在Jabber已經成為RFC的官方標準,如RFC2799,RFC4622,RFC6121,以及XMPP的擴展協議(XEP)。Worktile Web端的消息提醒功能就是基于XEP-0124、XEP-0206定義的BOSH擴展協議。
由于自身業務的需要,我們對Ejabberd的用戶認證和好友列表模塊的源碼進行修改,通過Redis保存用戶的在線狀態,而不是Mnesia和MySQL。另外好友這塊我們是從已有的數據庫中(MongoDB)中獲取項目或團隊的成員。Web端通過Strophe.JS來連接(HTTP-BIND),Strophe.JS可以以長輪詢和WebSocket兩種方式來連接,由于Ejabberd還沒有好的WebSocket的實現,就采用了BOSH的方式模擬長連接。整個系統的結構如下:
Web端用Strophe.JS通過HTTP-BIND進行連接Nginx代理,Nginx反向代理EjabberdCluster。iOS用XMPP-FramWork連接, Android可以用Smack直接連Ejabberd服務器集群。這些都是現有的庫,無需對Client進行開發。在線狀態根據用戶UID作為KEY定義了在線、離線、忙等狀態存放于Redis中。好友列表從MongoDB的Project表中獲取。用戶認證直接修改了Ejabberd_Auth_Internal.erl文件,通過MongoDB驅動連接用戶庫,在線狀態等功能是新加了模塊,其部分代碼如下:
[js] view plaincopy在CODE上查看代碼片派生到我的代碼片
-module(wt_mod_proj).
-behaviour(gen_mod).
-behaviour(gen_server).
-include("ejabberd.hrl").
-include("logger.hrl").
-include("jlib.hrl").
-define(SUPERVISOR, ejabberd_sup).
...
-define(ONLINE, 1).
-define(OFFLINE, 0).
-define(BUSY, 2).
-define(LEAVE, 3).
...
%% API
-export([start_link/2, get_proj_online_users/2]).
%% gen_mod callbacks
-export([start/2, stop/1]).
%% gen_server callbacks
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
%% Hook callbacks
-export([user_available/1, unset_presence/3, set_presence/4]).
-export([get_redis/1, remove_online_user/3, append_online_user/3]).
...
-record(state,{host = <<"">>, server_host, rconn, mconn}).
start_link(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []).
user_available(New) ->
LUser = New#jid.luser, LServer = New#jid.lserver,
Proc = gen_mod:get_module_proc(LServer, ?MODULE),
gen_server:cast(Proc, {user_available, LUser, LServer}).
append_online_user(Uid, Proj, Host) ->
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:call(Proc, {append_online_user, Uid, Proj}).
remove_online_user(Uid, Proj, Host) ->
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:call(Proc, {remove_online_user, Uid, Proj}).
...
set_presence(User, Server, Resource, Packet) ->
Proc = gen_mod:get_module_proc(Server, ?MODULE),
gen_server:cast(Proc, {set_presence, User, Server, Resource, Packet}).
...
start(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?MODULE),
ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
transient, 2000, worker, [?MODULE]},
supervisor:start_child(?SUPERVISOR, ChildSpec).
stop(Host) ->
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:call(Proc, stop),
supervisor:delete_child(?SUPERVISOR, Proc).
init([Host, Opts]) ->
MyHost = gen_mod:get_opt_host(Host, Opts, <<"wtmuc.@HOST@">>),
RedisHost = gen_mod:get_opt(redis_host, Opts, fun(B) -> B end,?REDIS_HOST),
RedisPort = gen_mod:get_opt(redis_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?REDIS_PORT),
ejabberd_hooks:add(set_presence_hook, Host, ?MODULE, set_presence, 100),
ejabberd_hooks:add(user_available_hook, Host, ?MODULE, user_available, 50),
ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, unset_presence, 50),
MongoHost = gen_mod:get_opt(mongo_host, Opts, fun(B) -> binary_to_list(B) end, ?MONGO_HOST),
MongoPort = gen_mod:get_opt(mongo_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?MONGO_PORT),
{ok, Mongo} = mongo_connection:start_link({MongoHost, MongoPort}),
C = c(RedisHost, RedisPort),
ejabberd_router:register_route(MyHost), {ok, #state{host = Host, server_host = MyHost, rconn = C, mconn = Mongo}}.
terminate(_Reason, #state{host = Host, rconn = C, mconn = Mongo}) ->
ejabberd_hooks:delete(set_presence_hook, Host, ?MODULE, set_presence, 100),
ejabberd_hooks:delete(user_available_hook, Host, ?MODULE, user_available, 50),
ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, unset_presence, 50),
eredis:stop(C),
ok.
...
handle_call({append_online_user, Uid, ProjId}, _From, State) ->
C = State#state.rconn,
Key = <<!--?PRE_RPOJ_ONLINE_USERS /binary, ProjId/binary-->>,
Resp = eredis:q(C, ["SADD", Key, Uid]),
{reply, Resp, State};
handle_call({remove_online_user, Uid, ProjId}, _From, State) ->
...
handle_call({get_proj_online_users, ProjId}, _From, State) ->
...
handle_cast({set_presence, User, Server, Resource, Packet}, #state{mconn = Mongo} = State) ->
C = State#state.rconn,
Key = <<!--?USER_PRESENCE /binary, User/binary-->>,
Pids = get_user_projs(User, Mongo),
Cmd = get_proj_key(Pids, ["SUNION"]),
case xml:get_subtag_cdata(Packet, <<"show">>) of
<<"away">> ->
eredis:q(C, ["SET", Key, ?LEAVE]);
<<"offline">> ->
...
handle_cast(_Msg, State) -> {noreply, State}.
handle_info({route, From, To, Packet}, #state{host = Host, server_host = MyHost, rconn = RedisConn, mconn = Mongo} = State) ->
case catch do_route(Host, MyHost, From, To, Packet, RedisConn, Mongo) of
{'EXIT', Reason} ->
?ERROR_MSG("~p", [Reason]);
_ ->
ok
end,
{noreply, State};
handle_info(_Info, State) -> {noreply, State}.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
...
其中,User\_Available\_HOOK和SM\_Remove\_Connection\_HOOK 就是用戶上線和用戶斷開連接觸發的事件,Ejabberd 中正是由于這些HOOK,才能很容易擴展功能。
在用Tsung對Ejabberd進行壓力測試,測試機器為4核心8G內存的普通PC,以3臺客戶機模擬用戶登錄、設置在線狀態、發送一條文本消息、關閉連接操作,在同時在線達到30w時,CPU占用不到3%,內存大概到3個G左右,隨著用戶數增多,主要內存的損耗較大。由于壓力測試比較耗時,再等到有時間的時候,會在做一些更深入的測試。
本站文章版權歸原作者及原出處所有 。內容為作者個人觀點, 并不代表本站贊同其觀點和對其真實性負責,本站只提供參考并不構成任何投資及應用建議。本站是一個個人學習交流的平臺,網站上部分文章為轉載,并不用于任何商業目的,我們已經盡可能的對作者和來源進行了通告,但是能力有限或疏忽,造成漏登,請及時聯系我們,我們將根據著作權人的要求,立即更正或者刪除有關內容。本站擁有對此聲明的最終解釋權。