MQTT EMQ 中的 session 管理

DavidAlphaFox · 发布于 2018年01月23日 · 469 次阅读
84794b

EMQ的session定义

前一篇EMQ 一个客户链接的资源消耗中,提到了EMQ是使用链接进程(emqtt_client)和session进程(emqtt_session)分开的策略,其中 emqtt_session负责管理EMQ的客户端的会话。
因此可以看出emqtt_session做为会话的管理者和网络socket没有任何关系。emqtt_session主要管理客户端离线后的消息,高QoS 2消息的确认管理,管理订阅以及packet identify的管理。

emqtt_session的创建

emqtt_client进程确认客户端的合法性后,会使用emqtt_sm中的函数去创建emqtt_session进程。

create_session(CleanSess, {ClientId, Username}, ClientPid) ->                                                                     
    case emqttd_session_sup:start_session(CleanSess, {ClientId, Username}, ClientPid) of                                          
        {ok, SessPid} ->                                                                                                          
            Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, clean_sess = CleanSess},                            
            case insert_session(Session) of                                                                                       
                {aborted, {conflict, ConflictPid}} ->                                                                             
                    %% Conflict with othe node? 同名ID同时上线了                                                                  
                    lager:error("SM(~s): Conflict with ~p", [ClientId, ConflictPid]),                                             
                    {error, mnesia_conflict};                                                                                     
                {atomic, ok} ->                                                                                                   
                    {ok, SessPid}                                                                                                 
            end;                                                                                                                  
        {error, Error} ->                                                                                                         
            {error, Error}                                                                                                        
    end.

从代码中可以清晰的看出,emqtt_client并没有使用spawn_link机制来直接创建emqtt_session,而是使用emqttd_session_sup这个监督者来创建session进程。但是在emqtt_session的进程中,emqtt_session会主动的去关联emqtt_client进程,同时emqtt_session还会建立一个monitor去监控emqtt_client进程。
为什么要这样做呢?是因为以下几点原因:

  1. emqtt_session进程退出后,emqtt_client必须跟着退出
  2. emqtt_client进程退出后,根据情况需要保留emqtt_session进程,继续服务

emqtt_session恢复

当MQTT客户端在CONNECT包中将,clean session设置为false的时候,emqtt_session进程会在emqtt_session进程退出之后,继续接收一段时间消息,这个时间段可以通过mqtt.session.expiry_interval来进行配置。
当然,在订阅某主题,并持续有消息广播的情况下,emqtt_session进程在失去emqtt_session进程后维持越久,所消耗的内存将会越多。

handle_cast({resume, ClientId, ClientPid},                                                                                        
            State = #state{client_id       = ClientId,                                                                            
                           client_pid      = OldClientPid,                                                                        
                           clean_sess      = CleanSess,                                                                           
                           retry_timer     = RetryTimer,                                                                          
                           await_rel_timer = AwaitTimer,                                                                          
                           expiry_timer    = ExpireTimer}) ->                                                                     

    ?LOG(debug, "Resumed by ~p", [ClientPid], State),                                                                             

    %% Cancel Timers                                                                                                              
    lists:foreach(fun emqttd_misc:cancel_timer/1,                                                                                 
                  [RetryTimer, AwaitTimer, ExpireTimer]),                                                                         
    %% 踢掉老的客户端                                                                                                             
    case kick(ClientId, OldClientPid, ClientPid) of                                                                               
        ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State);                                                   
        ignore -> ok                                                                                                              
    end,    
    true = link(ClientPid),                                                                                                       

    State1 = State#state{client_pid      = ClientPid,                                                                             
                         binding         = binding(ClientPid),                                                                    
                         old_client_pid  = OldClientPid,                                                                          
                         clean_sess      = false,                                                                                 
                         retry_timer     = undefined,                                                                             
                         awaiting_rel    = #{},                                                                                   
                         await_rel_timer = undefined,                                                                             
                         expiry_timer    = undefined},                                                                            

    %% Clean Session: true -> false?                                                                                              
    if                                                                                                                            
        CleanSess =:= true ->                                                                                                     
            ?LOG(error, "CleanSess changed to false.", [], State1),                                                               
            emqttd_sm:register_session(ClientId, false, info(State1));                                                            
        CleanSess =:= false ->                                                                                                    
            ok                                                                                                                    
    end,                                                                                                                                                                                                                                                            
    %% Replay delivery and Dequeue pending messages                                                                               
    hibernate(emit_stats(dequeue(retry_delivery(true, State1)))); 

从这些代码上,不难看出在emqtt_session在恢复过程中,会做下面几件事情:

  1. 踢掉老的客户端进程,和新客户端进程建立关联
  2. 检测clean session的变化,并根据情况重新注册session
  3. 重发所有堆积的消息

总结

使用链接进程(emqtt_client)和session进程(emqtt_session)分开的策略,充分的利用了Erlang的actor模型,简化了代码的编写同时利用Erlang的调度机制提高了消息的实时性(CPU数量充足)。

但是这个设计并不是无懈可击的,当使用集群和session超时和clean session设置不当的时候,会出内网流量暴增以及某些节点内存暴增。同时因为happens before这种可能性,在下面场景会引起QoS 0的消息丢失。
emqtt_session因某些未知原因退出,同时emqtt_client进程的socket已经接收完数据,但未发送给emqtt_client进程,这个时候link机制的退出消息已经推送给emqtt_client进程。接着emqtt_client进程被Erlang调度器调度,这时候,emqtt_client直接进行退出操作,忽略所有已经收到的socket数据。

当然该会话管理模型还有很多优点和缺点,望读者们自己逐步去发现。

暂无回复。
需要 登录/注册 后方可回复, 如果你还没有账号请点击这里 注册