XMPP eJabberd 的花名册和出席 (3)

DavidAlphaFox · 发布于 2018年01月08日 · 318 次阅读
84794b
本帖已被设为精华帖!

我又回来了

前面的文章中,其中重点介绍了花名册管理出席订阅。花名册通过在服务器存储,解决了用户好友关系在多个机器上漫游的问题。出席订阅机制重点的解决了如何建立好友关系的过程。
本篇将重点介绍下出席通知,那么出席通知主要解决什么问题呢?可能各位读者已经猜测到了,就是好友上线通知。

出席通知

XMPP出席通知是典型地遵循一个"发布-订阅"或"观察者"模型的通知系统。 这里,一个实体发送出席信息给它的服务器,它的服务器接着广播那个信息给所有订阅了该实体的出席信息的联系人。

出席探测

出席探测是一个对某联系人的当前出席信息的请求的操作, 由代表某个用户的服务器代表该用户发送;语法上它是一个type属性值为probe的出席信息节.。在出席信息订阅的上下文中,from地址的值必须是订阅的用户的纯JID(不带资源的JID)而to地址的值必须是被订阅的联系人的纯JID, 因为出席信息订阅是基于纯JID(不带资源的JID)的。

<presence from='[email protected]'
              id='ign291v5'
              to='[email protected]'
              type='probe'/>

用户上线后,虽然会立刻接收服务器发来的花名册,但是花名册并不携带用户好友的出席信息,因此需要服务器帮助探测所有的好友出席信息。

出席通知

一个客户端,在完成XMPP的RFC6120 Extensible Messaging and Presence Protocol (XMPP): Core 中所有规定动作,就需要发送出席通知给服务器,告知服务器客户端已经上线,可以进行常规通讯了。这个出席通知非常简单,是一个没有任何属性的节。

<presence/>

虽然该节不包含任何属性,但是该节可以包含元素, 元素, 一个一个或多个元素实例。
这个出席通知虽然很简单,但是会在服务器上触发一系列动作。

ejabberd如何实现

ejabberd在实现XMPP中presence的部分非常完善,在关于代表客户端操作的方法都集中在ejabberd_c2s中。如前面文章所说的,出席订阅部分集中在presence_track这部分,而关于出席通知这部分,重点集中在 presence_update部分。

下面是比较重要的代码

%% 更新出席信息
%% @doc User updates his presence (non-directed presence packet)
-spec presence_update(Acc :: mongoose_acc:t(),
                      From :: 'undefined' | ejabberd:jid(),
                      State :: state()) -> {mongoose_acc:t(), state()}.
presence_update(Acc, From, StateData) ->
    Packet = mongoose_acc:get(element, Acc),
    case mongoose_acc:get(type, Acc) of
        <<"unavailable">> ->
            Status = case xml:get_subtag(Packet, <<"status">>) of
                         false ->
                             <<>>;
                         StatusTag ->
                             xml:get_tag_cdata(StatusTag)
                     end,
            Info = [{ip, StateData#state.ip}, {conn, StateData#state.conn},
                    {auth_module, StateData#state.auth_module}],
            Acc1 = ejabberd_sm:unset_presence(Acc,
                                              StateData#state.sid,
                                              StateData#state.user,
                                              StateData#state.server,
                                              StateData#state.resource,
                                              Status,
                                              Info),
            Acc2 = presence_broadcast(Acc1, StateData#state.pres_a, StateData),
            Acc3 = presence_broadcast(Acc2, StateData#state.pres_i, StateData),
            % and here we reach the end
            {Acc3, StateData#state{pres_last = undefined,
                                   pres_timestamp = undefined,
                                   pres_a = gb_sets:new(),
                                   pres_i = gb_sets:new(),
                                   pres_invis = false}};
        <<"invisible">> ->
            NewPriority = get_priority_from_presence(Packet),
            Acc0 = update_priority(Acc, NewPriority, Packet, StateData),
            case StateData#state.pres_invis of
                false ->
                    Acc1 = presence_broadcast(Acc0,
                                              StateData#state.pres_a,
                                              StateData),
                    Acc2 = presence_broadcast(Acc1,
                                              StateData#state.pres_i,
                                              StateData),
                    S1 = StateData#state{pres_last = undefined,
                                         pres_timestamp = undefined,
                                         pres_a = gb_sets:new(),
                                         pres_i = gb_sets:new(),
                                         pres_invis = true},
                    presence_broadcast_first(Acc2, From, S1, Packet);
                true ->
                    {Acc0, StateData}
            end;
        <<"error">> ->
            {Acc, StateData};
        <<"probe">> ->
            {Acc, StateData};
        <<"subscribe">> ->
            {Acc, StateData};
        <<"subscribed">> ->
            {Acc, StateData};
        <<"unsubscribe">> ->
            {Acc, StateData};
        <<"unsubscribed">> ->
            {Acc, StateData};
        _ ->
            presence_update_to_available(Acc, From, Packet, StateData)
    end.

presence_update_to_available(true, Acc, _, NewPriority, From, Packet, StateData) ->
    Acc2 = ejabberd_hooks:run_fold(user_available_hook,
                                   StateData#state.server,
                                   Acc,
                                   [StateData#state.jid]),
    Res = case NewPriority >= 0 of
              true ->
                  Acc3 = ejabberd_hooks:run_fold(roster_get_subscription_lists,
                                                 StateData#state.server,
                                                 Acc2,
                                                 [StateData#state.user,
                                                 StateData#state.server]),
                  {_, _, Pending} = mongoose_acc:get(subscription_lists, Acc3, {[], [], []}),
                  Acc4 = resend_offline_messages(Acc3, StateData),
                  resend_subscription_requests(Acc4,
                                               StateData#state{pending_invitations = Pending});
              false ->
                  {Acc2, StateData}
              end,
    {Accum, NewStateData1} = Res,
    %% 得到订阅者的信息,全局广播
    presence_broadcast_first(Accum, From, NewStateData1, Packet);

其中,ejabberd会根据type进行判断,该进行哪些操作,接着presence_update_to_available会在客户端首次上线和从离线状态变为上线状态的时候进行出席通知。正如前面所描述的XMPP出席通知是典型地遵循一个"发布-订阅"或"观察者"模型的通知系统,因此出席通知会通知所有在出席订阅过程中订阅的用户也就是用户的好友。

另一方面,eJabberd借助了Erlang的特性,会在客户端断开连接后通知用户的好友用户离线了。

-spec terminate(Reason :: any(), statename(), state()) -> ok.
terminate(_Reason, StateName, StateData) ->
    case {should_close_session(StateName), StateData#state.authenticated} of
        {false, _} ->
            ok;
        %% if we are in an state wich have a session established
        {_, replaced} ->
            ?INFO_MSG("(~w) Replaced session for ~s",
                      [StateData#state.socket,
                       jid:to_binary(StateData#state.jid)]),
            From = StateData#state.jid,
            StatusEl = #xmlel{name = <<"status">>,
                              children = [#xmlcdata{content = <<"Replaced by new connection">>}]},
            Packet = #xmlel{name = <<"presence">>,
                            attrs = [{<<"type">>, <<"unavailable">>}],
                            children = [StatusEl]},
            Acc0 = mongoose_acc:from_element(Packet),
            Acc = mongoose_acc:put(from_jid, From, Acc0),
            ejabberd_sm:close_session_unset_presence(
              StateData#state.sid,
              StateData#state.user,
              StateData#state.server,
              StateData#state.resource,
              <<"Replaced by new connection">>,
              replaced),
            Acc1 = presence_broadcast(Acc, StateData#state.pres_a, StateData),
            presence_broadcast(Acc1, StateData#state.pres_i, StateData),
            reroute_unacked_messages(StateData);
        {_, resumed} ->
            ?INFO_MSG("(~w) Stream ~p resumed for ~s",
                      [StateData#state.socket,
                       StateData#state.stream_mgmt_id,
                       jid:to_binary(StateData#state.jid)]);
        _ ->
            ?INFO_MSG("(~w) Close session for ~s",
                      [StateData#state.socket,
                       jid:to_binary(StateData#state.jid)]),

            EmptySet = gb_sets:new(),
            case StateData of
                #state{pres_last = undefined,
                       pres_a = EmptySet,
                       pres_i = EmptySet,
                       pres_invis = false} ->
                    ejabberd_sm:close_session(StateData#state.sid,
                                              StateData#state.user,
                                              StateData#state.server,
                                              StateData#state.resource,
                                              normal);
                _ ->
                    From = StateData#state.jid,
                    Packet = #xmlel{name = <<"presence">>,
                                    attrs = [{<<"type">>, <<"unavailable">>}]},
                    Acc0 = mongoose_acc:from_element(Packet),
                    Acc = mongoose_acc:put(from_jid, From, Acc0),
                    ejabberd_sm:close_session_unset_presence(
                      StateData#state.sid,
                      StateData#state.user,
                      StateData#state.server,
                      StateData#state.resource,
                      <<"">>,
                      normal),
                    Acc1 = presence_broadcast(Acc, StateData#state.pres_a, StateData),
                    presence_broadcast(Acc1, StateData#state.pres_i, StateData)
            end,
            reroute_unacked_messages(StateData)
    end,
    (StateData#state.sockmod):close(StateData#state.socket),
    ok.

不管是上线,还是离线,从代码中都可以看到这里面的两个函数presence_broadcastpresence_broadcast_first的命名上都含有broadcast,也就是说这两个函数会进行广播操作。

必须知道的问题

XMPP的花名册和出席机制是非常完美的,就是因为它很完美,这就给实际生产使用带来了一些弊端。

花名册

XMPP的花名册现在已经通过版本机制来避免每次全量同步花名册的操作,所以在客户端开发的时候,一定要注意相关功能是否被启用。否则会被移动用户抱怨飞速的消耗流量。

出席通知

  1. 在客户端首次上线的时候,会通过presence_broadcast_first来进行广播,从presence_broadcast_first 的代码中可以看到,服务器会给我们所有的订阅用户发送出席探测的消息,同时给所有订阅用户发送用户出席的信息,如果一个用户有超级多的好友的情况下,用户的ejabberd_c2s进程会长时间无法处理别的信息。
  2. 异常离线的时候,也会给所有用户的好友发送离线出席信息的广播,所以在网络环境不好的情况,对用户的好友的流量会产生极大的负担,同时会加重服务器的负担。
  3. 考虑到XMPP的分布式特性,XMPP服务器本身并不集中保存用户的出席状态,而是通过出席探测的方式进行逐个探测,这个虽然很准确。但是会给用户用户和好友造成大量的流量负担。

如何解决

对于流量负担,是在所难免的,但是可以通过自己定制出席通知的方式,建立出席信息集中存储来降低用户流量的负担。

总结

至此,已经带各位读者了解了部分ejabberd中是如何实现花名册和出席相关的知识,省下的部分就需要读者们进一步深入到源代码和RFC相关规范中,带着需求去阅读源代码,从而从根本上解决自己业务上的需求。

共收到 0 条回复
84794b DavidAlphaFox 将本帖设为了精华贴 01月10日 10:03
需要 登录/注册 后方可回复, 如果你还没有账号请点击这里 注册