Erlang Erlang 集群互联

DavidAlphaFox · 发布于 2017年09月23日 · 438 次阅读
84794b
本帖已被设为精华帖!

Erlang的集群

Erlang语言本身定义的时候就支持了分布式特性。其中在Erlang虚拟机中,通过定义数据的编码方式,Erlang进程的表示方法和大量的基础组件来完成Erlang的分布式。

Erlang集群的特性

每个节点在使用非隐藏模式(在启动的时候没有使用-hidden)加入集群,那么这个节点和集群中所有的节点都会有一个TCP连接,就是大家所知道的无中心和全互联。就如下面的图片所示一样: Erlang中的Erlang进程位置透明,不管Erlang进程在集群中任何一个节点上,其它节点的进程均可以向它发送消息,就如同该进程和发送消息的进程在同一个节点上一样。

Erlang节点直接使用简单的Cookie机制进行验证,防止错误的接入和非法接入。 Erlang节点间的数据传输使用普通TCP传输,也可以使用TLS进行传输,从而防止被窃听。

Erlang集群是如何创建的

一般情况下,节点都是会被命名成[email protected][email protected]这种模式。可以通过net_adm:ping('[email protected]')或net_adm:ping('[email protected]')来完成节点的加入工作。但是真正进行节点建立的是net_kernel,因此本篇将重点分析net_kernel都进行了什么样的动作。

令一种情况,就是Erlang集群中使用了mnesia集群,当mnesia启动的时候,mnesia会要求Erlang虚拟机连接其它节点加入集群。

Erlang是如何发现别的节点的

每个Erlang虚拟机在启动的时候都会尝试启动自带的epmd。epmd就如同大家所知到的DNS一样,它运行在一个约定的端口上,Erlang虚拟机启动后会在epmd上注册一个自己的节点名字和监听的端口号。当节点A想连接节点B的时候,节点A首先会从[email protected]中取出ip部分,之后去连接这个ip上的epmd,当能成功连接epmd后节点A就会去查询节点B的端口,并进行连接。

代码分析

net_kernel

net_kernel是Erlang集群构建中最关键的部分之一,它高屋建瓴的控制Erlang虚拟机和OTP库中其它模块成集群的建立和维护。net_kernel是一个gen_server,它在启动后,会完成下面这些功能:

1.会建立一个定时器进程用来和别的节点进行心跳,检测其它节点是否离开集群。 2.创建连接管理表 3.启动可以连接的端口,用来接收别的节点的连接。

net_kernel:connect

当一个节点需要连接另一个节点的时候,就需要使用该函数了。

handle_call({connect, _, Node}, From, State) when Node =:= node() ->
    async_reply({reply, true, State}, From);
handle_call({connect, Type, Node}, From, State) ->
    verbose({connect, Type, Node}, 1, State),
    case ets:lookup(sys_dist, Node) of
    [Conn] when Conn#connection.state =:= up ->
        async_reply({reply, true, State}, From);
    [Conn] when Conn#connection.state =:= pending ->
        Waiting = Conn#connection.waiting,
        ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
        {noreply, State};
    [Conn] when Conn#connection.state =:= up_pending ->
        Waiting = Conn#connection.waiting,
        ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
        {noreply, State};
    _ ->
        case setup(Node,Type,From,State) of
        {ok, SetupPid} ->
            Owners = [{SetupPid, Node} | State#state.conn_owners],
            {noreply,State#state{conn_owners=Owners}};
        _  ->
            ?connect_failure(Node, {setup_call, failed}),
            async_reply({reply, false, State}, From)
        end
    end;

从中可以看出,如果目标节点是自身,那么直接就忽略掉,返回成功。

如果目标节点不是自身,先看一下ets中是否有向远程节点连接的进程。当这进行连接的进程状态是up,则直接返回true,否则将请求进程加入连接等待队列中。如果我们没有向远程节点进行连接的进程,则调用setup函数来建立一个。在setup函数中,会先找出连接远程节点所使用的模块名称,一般情况下是inet_tcp_dist这个模块。下面先假定是使用inet_tcp_dist这个模块,这个时候net_kernel会调用inet_tcp_dist:setup,并将成功后的Erlang进程PID放入sys_dist这个ets中。

net_kernel的心跳

创建的ticker进程,它专门负责发心跳给net_kernel进程,然后net_kernel进程会遍历所有远程连接的进程,让其进行一次心跳。当需要改变节点的心跳时间的时候,net_kernel会开启一个aux_ticker进程帮助我们进行过度,直到所有其它节点都知道了该节点改变了心跳周期为止,当所有其它节点都知道了这个节点的心跳周期发生了变化,这个aux_ticker进程也就结束了它的历史性任务,安静的退出了。

当节点之间心跳发生异常了,就会发生TCP数据传输故障。当TCP传输发生异常的时候,Ports会按照约定好的规则进行清理,这个可参见dist.c中的erts_do_net_exits。

inet_tcp_dist

inet_tcp_dist模块在整个集群建立当中,提供了协议的支持和连接接入这些细节操作。

inet_tcp_dist:setup

setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
    spawn_opt(?MODULE, do_setup, 
          [self(), Node, Type, MyNode, LongOrShortNames, SetupTime],
          [link, {priority, max}]).

do_setup(Kernel, Node, Type, MyNode, LongOrShortNames,SetupTime) ->
    ?trace("~p~n",[{inet_tcp_dist,self(),setup,Node}]),
    [Name, Address] = splitnode(Node, LongOrShortNames),
    case inet:getaddr(Address, inet) of
    {ok, Ip} ->
        Timer = dist_util:start_timer(SetupTime),
        %用epmd协议获得远程节点的端口
        case erl_epmd:port_please(Name, Ip) of
        {port, TcpPort, Version} ->
            ?trace("port_please(~p) -> version ~p~n", 
               [Node,Version]),
            dist_util:reset_timer(Timer),
                %连接远程节点
            case inet_tcp:connect(Ip, TcpPort, 
                      [{active, false}, 
                       {packet,2}]) of
            %拿到Socket之后,定义各种回调函数,状态以及状态机函数
            {ok, Socket} ->
                HSData = #hs_data{
                  kernel_pid = Kernel,
                  other_node = Node,
                  this_node = MyNode,
                  socket = Socket,
                  timer = Timer,
                  this_flags = 0,
                  other_version = Version,
                  f_send = fun inet_tcp:send/2,
                  f_recv = fun inet_tcp:recv/3,
                  f_setopts_pre_nodeup = 
                  fun(S) ->
                      inet:setopts
                    (S, 
                     [{active, false},
                      {packet, 4},
                      nodelay()])
                  end,
                  f_setopts_post_nodeup = 
                  fun(S) ->
                      inet:setopts
                    (S, 
                     [{active, true},
                      {deliver, port},
                      {packet, 4},
                      nodelay()])
                  end,
                  f_getll = fun inet:getll/1,
                  f_address = 
                  fun(_,_) ->
                      #net_address{
                   address = {Ip,TcpPort},
                   host = Address,
                   protocol = tcp,
                   family = inet}
                  end,
                  mf_tick = fun ?MODULE:tick/1,
                  mf_getstat = fun ?MODULE:getstat/1,
                  request_type = Type
                 },
                %进行握手
                dist_util:handshake_we_started(HSData);
            _ ->
                %% Other Node may have closed since 
                %% port_please !
                ?trace("other node (~p) "
                   "closed since port_please.~n", 
                   [Node]),
                ?shutdown(Node)
            end;
        _ ->
            ?trace("port_please (~p) "
               "failed.~n", [Node]),
            ?shutdown(Node)
        end;
    _Other ->
        ?trace("inet_getaddr(~p) "
           "failed (~p).~n", [Node,_Other]),
        ?shutdown(Node)
    end.

在这函数当中,可以看到,Erlang每次对外建立连接的时候都需要去对方的epmd上进行查询。inet_tcp_dist主要注重流程和协议,将TCP传输细节交给了inet这个模块来进行,这样大大的减少了相应的代码量。在handshake_we_started和远程节点进行一次验证。这个验证过程非常简单,步骤如下:

1.远程节点生成一个随机数,然后将这个随机数发给当前节点。 2.当前节点用它所知道的远程节点的cookie加上这个随机数生成一个MD5,并将这个MD5返回给远程节点。

当完成了验证,会使用do_setnode,告诉Erlang虚拟机该节点已经和目标节点的连接上了。同时通知net_kernel已经完成远程节点的连接,需要它改变sys_dist的ets状态和进行后续的操作。

dist.c

Erlang虚拟机中,负责管理节点互联的部分,是用纯C实现的。

BIF_RETTYPE setnode_3(BIF_ALIST_3)
{
    BIF_RETTYPE ret;
    Uint flags;
    unsigned long version;
    Eterm ic, oc;
    Eterm *tp;
    DistEntry *dep = NULL;
    Port *pp = NULL;

    /* Prepare for success */
    ERTS_BIF_PREP_RET(ret, am_true);

    /*
     * Check and pick out arguments
     */

    if (!is_node_name_atom(BIF_ARG_1) ||
        is_not_internal_port(BIF_ARG_2) ||
        (erts_this_node->sysname == am_Noname)) {
         goto badarg;
    }

    if (!is_tuple(BIF_ARG_3))
         goto badarg;
    tp = tuple_val(BIF_ARG_3);
    if (*tp++ != make_arityval(4))
         goto badarg;
    if (!is_small(*tp))
         goto badarg;
    flags = unsigned_val(*tp++);
    if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0)
         goto badarg;
    ic = *(++tp);
    oc = *(++tp);
    if (!is_atom(ic) || !is_atom(oc))
         goto badarg;

    /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */
    if (!(DFLAG_EXTENDED_REFERENCES & flags)) {
         erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
         erts_dsprintf(dsbufp, "%T", BIF_P->common.id);
         if (BIF_P->common.u.alive.reg)
              erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name);
         erts_dsprintf(dsbufp,
                       " attempted to enable connection to node %T "
                       "which is not able to handle extended references.\n",
                       BIF_ARG_1);
         erts_send_error_to_logger(BIF_P->group_leader, dsbufp);
         goto badarg;
    }

    /*
     * Arguments seem to be in order.
     */

    /* get dist_entry */
    dep = erts_find_or_insert_dist_entry(BIF_ARG_1);
    if (dep == erts_this_dist_entry)
         goto badarg;
    else if (!dep)
         goto system_limit; /* Should never happen!!! */
//通过Port的ID获取Port的结构
    pp = erts_id2port_sflgs(BIF_ARG_2,
                BIF_P,
                ERTS_PROC_LOCK_MAIN,
                ERTS_PORT_SFLGS_INVALID_LOOKUP);
    erts_smp_de_rwlock(dep);

    if (!pp || (erts_atomic32_read_nob(&pp->state)
        & ERTS_PORT_SFLG_EXITING))
         goto badarg;

    if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
         goto badarg;
//如果当前cid和传入的Port的ID相同,且port的sist_entry和找到的dep相同
//那么直接进入结束阶段
    if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
         goto done; /* Already set */

    if (dep->status & ERTS_DE_SFLG_EXITING) {
         /* Suspend on dist entry waiting for the exit to finish */
         ErtsProcList *plp = erts_proclist_create(BIF_P);
         plp->next = NULL;
         erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
         erts_smp_mtx_lock(&dep->qlock);
         erts_proclist_store_last(&dep->suspended, plp);
         erts_smp_mtx_unlock(&dep->qlock);
         goto yield;
    }

    ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));

    if (pp->dist_entry || is_not_nil(dep->cid))
         goto badarg;

    erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);

    /*
     * Dist-ports do not use the "busy port message queue" functionality, but
     * instead use "busy dist entry" functionality.
     */
    {
         ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
         erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
    }
//更新Port所关联的dist
    pp->dist_entry = dep;

    dep->version = version;
    dep->creation = 0;

    ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);

#if 1
    dep->send = (pp->drv_ptr->outputv
         ? dist_port_commandv
         : dist_port_command);
#else
    dep->send = dist_port_command;
#endif
    ASSERT(dep->send);

#ifdef DEBUG
    erts_smp_mtx_lock(&dep->qlock);
    ASSERT(dep->qsize == 0);
    erts_smp_mtx_unlock(&dep->qlock);
#endif
//更新dist_entry的cid
    erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);

    if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
         create_cache(dep);

    erts_smp_de_rwunlock(dep);
    dep = NULL; /* inc of refc transferred to port (dist_entry field) */
//增加远程节点的数量
    inc_no_nodes();
//发送监控信息到调用的进程
    send_nodes_mon_msgs(BIF_P,
            am_nodeup,
            BIF_ARG_1,
            flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
            NIL);
 done:

    if (dep && dep != erts_this_dist_entry) {
         erts_smp_de_rwunlock(dep);
         erts_deref_dist_entry(dep);
    }

    if (pp)
         erts_port_release(pp);

    return ret;

 yield:
    ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P,
             BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
    goto done;

 badarg:
    ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
    goto done;

 system_limit:
    ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT);
    goto done;
}

setnode函数主要完成下面这几个操作:

1.将得到的远程节点的名字放入dist的hash表中,并且将这个表项和连接到远程节点的Port(TCP连接)进行了关联。 2.将和远程节点进行连接的Port标记为ERTS_PORT_SFLG_DISTRIBUTION。 3.在Erlang虚拟机内广告nodeup消息。

其中给Port设置ERTS_PORT_SFLG_DISTRIBUTION标记是为了下面几个事情:

1.让Port出现Busy的时候我们能区分出是普通的Port还是远程连接的Port。 2.当Port被销毁的时候,确定是否要调用dist.c中的erts_do_net_exits来告诉Erlang虚拟机某个节点掉线。

需要注意的

epmd

当独立进程epmd发现自己和本地节点的连接断了,那么直接将这个node注册的名字和端口从自身缓存中删除掉,但是这个删除是有一定延迟的。

但是当empd被不小杀掉了,当empd被再次启动的时候,数据将会全部清除。而且,本地节点不会自动向epmd重新注册自己的端口等。

dist_port

dist_port负责所有Erlang进程透明调用的数据发送和传输,同时也负责着节点之间存活检测的任务。由于Erlang的节点检测都是以本节点是否能和对应节点有心跳为视角,如果使用使用dist_port传输大量的数据,很容易引起dist_port_busy,从而引起节点离线的误判或性能下降。

Erlang默认会为dist_port设置一个1M的缓存,但是如果在节点之间传输大量的数据很容易就不够用。如果在设计的时候就意识到自己要传输大量数据,可以使用+zdbbl这个参数来改变dist_port的缓存。Erlang提供了erlang:system_info函数来查询dist_piort的缓存大小,同时Erlang还提供erlang:system_monitor函数来监控dist_port_busy。

并且在实际使用中,可以参考Spil Games的架构,对Erlang的集群进行分层: 同时需要注意的是,Erlang的集群虽然可以跨越IDC,但是在实际的使用中,并不推荐这样做,原因如下:

1.IDC间网络延迟偏高,Erlang集群本身对网络延迟敏感。 2.IDC间网络吞吐有限 3.net_kernel并未对高并发连接做优化,很容易被攻击者攻击 4.Erlang集群之间的连接对HA不友好,不如Restful。而IDC之间链路很多时候并不稳定,需要进行冗余,这对Erlang集群并不友好。

节点重启

这个是非常需要注意的,Erlang节点之间确认存活是需要心跳时间的。Erlang在进行跨节点操作的时候,都会监控远程节点的状态,尤其是Mnesia数据库很多操作对Erlang节点存活性是非常敏感的。当一个节点掉线后,不应该立刻重启,一般需要等待该节点心跳事件 * 1.5的时间。为什么要这样做呢?因为这样做是为了让节点彻底“死掉”,集群中所有的节点都知道该节点死掉了,这样才不会出现远程操作被锁死的情况(顺便说一句,这个简单问题,在国内某个著名的通讯SaaS中多次出现,该云的架构师曾经多次提出Erlang不是开箱即用,因为他根本没搞清楚Erlang的底层原理,甚至可以说分布式系统中的基础原理)。

共收到 0 条回复
84794b DavidAlphaFox 将本帖设为了精华贴 09月23日 11:28
需要 登录/注册 后方可回复, 如果你还没有账号请点击这里 注册