Tech Talk 让技术发出声音
RSS

EMQ的Session 管理

EMQ的session定义

EMQ是使用链接进程(emqtt_client)和session进程(emqtt_session)分开的策略,其中emqtt_session负责管理EMQ的客户端的会话。
因此可以看出emqtt_session做为会话的管理者和网络socket没有任何关系。
emqtt_session主要管理客户端离线后的消息,高QoS 2消息的确认管理,管理订阅以及packet identify的管理。

emqtt_session的创建

emqtt_client进程确认客户端的合法性后,会使用emqtt_sm中的函数去创建emqtt_session进程。

从代码中可以清晰的看出,emqtt_client并没有使用spawn_link机制来直接创建emqtt_session, 而是使用emqttd_session_sup这个监督者来创建session进程。但是在emqtt_session的进程中, emqtt_session会主动的去关联emqtt_client进程,同时emqtt_session还会建立一个monitor去监控emqtt_client进程。

create_session(CleanSess, {ClientId, Username}, ClientPid) ->                                                                     
    case emqttd_session_sup:start_session(CleanSess, {ClientId, Username}, ClientPid) of   
        {ok, SessPid} ->       
            Session = #mqtt_session{
                      client_id = ClientId, 
                     sess_pid = SessPid, 
                     clean_sess = CleanSess},                            
            case insert_session(Session) of                                                                                       
                {aborted, {conflict, ConflictPid}} ->                                                                             
                    %% Conflict with othe node? 同名ID同时上线了                                                                  
                    lager:error("SM(~s): Conflict with ~p", [ClientId, ConflictPid]),                                             
                    {error, mnesia_conflict};                                                                                     
                {atomic, ok} ->                                                                                                   
                    {ok, SessPid}                                                                                                 
            end;                                                                                                                  
        {error, Error} ->                                                                                                         
            {error, Error}                                                                                                        
    end.

为什么要这样做呢?是因为以下几点原因:

  1. emqtt_session进程退出后,emqtt_client必须跟着退出
  2. emqtt_client进程退出后,根据情况需要保留emqtt_session进程,继续服务

emqtt_session恢复

当MQTT客户端在CONNECT包中将,clean session设置为false的时候,emqtt_session进程会在emqtt_session进程退出之后, 继续接收一段时间消息,这个时间段可以通过mqtt.session.expiry_interval来进行配置。
当然,在订阅某主题,并持续有消息广播的情况下,emqtt_session进程在失去emqtt_session进程后维持越久,所消耗的内存将会越多。


handle_cast({resume, ClientId, ClientPid},      
            State = #state{client_id       = ClientId,  
                           client_pid      = OldClientPid,    
                           clean_sess      = CleanSess,    
                           retry_timer     = RetryTimer,     
                           await_rel_timer = AwaitTimer,  
                           expiry_timer    = ExpireTimer}) ->                
    ?LOG(debug, "Resumed by ~p", [ClientPid], State),           
    %% Cancel Timers                                                                                                              
    lists:foreach(fun emqttd_misc:cancel_timer/1,         
                  [RetryTimer, AwaitTimer, ExpireTimer]),                                                                         
    %% 踢掉老的客户端                                                                                                             
    case kick(ClientId, OldClientPid, ClientPid) of  
        ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State);               
        ignore -> ok                          
    end,    
    true = link(ClientPid),                                      
    State1 = State#state{client_pid      = ClientPid,           
                         binding         = binding(ClientPid),          
                         old_client_pid  = OldClientPid,           
                         clean_sess      = false,                           
                         retry_timer     = undefined,    
                         awaiting_rel    = #{},   
                         await_rel_timer = undefined,      
                         expiry_timer    = undefined},   
    if  
       CleanSess =:= true ->
             ?LOG(error, "CleanSess changed to false.", [], State1),
              emqttd_sm:register_session(ClientId, false, info(State1)); 
       CleanSess =:= false -> ok
    end, 
    hibernate(emit_stats(dequeue(retry_delivery(true, State1)))); 

从这些代码上,不难看出在emqtt_session在恢复过程中,会做下面几件事情:

  1. 踢掉老的客户端进程,和新客户端进程建立关联
  2. 检测clean session的变化,并根据情况重新注册session
  3. 重发所有堆积的消息

总结

使用链接进程(emqtt_client)和session进程(emqtt_session)分开的策略,充分的利用了Erlang的actor模型,简化了代码的编写同时利用Erlang的调度机制提高了消息的实时性(CPU数量充足)。

但是这个设计并不是无懈可击的,当使用集群和session超时和clean session设置不当的时候,会出内网流量暴增以及某些节点内存暴增。同时因为happens before这种可能性,在下面场景会引起QoS 0的消息丢失。
emqtt_session因某些未知原因退出,同时emqtt_client进程的socket已经接收完数据,但未发送给emqtt_client进程,这个时候link机制的退出消息已经推送给emqtt_client进程。接着emqtt_client进程被Erlang调度器调度,这时候,emqtt_client直接进行退出操作,忽略所有已经收到的socket数据。

当然该会话管理模型还有很多优点和缺点,望读者们自己逐步去发现。