Erlang是如何构建集群的

:: 编程, Erlang

By: David Gao

通过对Erlang/OTP和Erts中的代码进行相应的分析,让读者可以清晰的看到,Erlang是如何 进行节点发现和构建集群的。以及在构建Erlang集群中,怎么样构建集群才能更加稳定。

Erlang的集群

Erlang语言本身定义的时候就支持了分布式特性。其中在Erlang虚拟机中,通过定义数据的 编码方式,Erlang进程的表示方法和大量的基础组件来完成Erlang的分布式。

Erlang集群的特性

每个节点在使用非隐藏模式(在启动的时候没有使用-hidden)加入集群,那么这个节点和 集群中所有的节点都会有一个TCP连接,就是大家所知道的无中心和全互联。 Erlang中的 Erlang进程位置透明,不管Erlang进程在集群中任何一个节点上,其它节点的进程均可以向 它发送消息,就如同该进程和发送消息的进程在同一个节点上一样。

Erlang节点直接使用简单的Cookie机制进行验证,防止错误的接入和非法接入。 Erlang节 点间的数据传输使用普通TCP传输,也可以使用TLS进行传输,从而防止被窃听。

Erlang集群是如何创建的

一般情况下,节点都是会被命名成[email protected][email protected]这种模式。可以通过 net_adm:ping(‘[email protected]’)net_adm:ping(‘[email protected]’)来完成节点的加入工 作。但是真正进行节点建立的是net_kernel,因此本篇将重点分析net_kernel都进行了什么 样的动作。

另一种情况,就是Erlang集群中使用了mnesia集群,当mnesia启动的时候,mnesia会要求 Erlang虚拟机连接其它节点加入集群。

Erlang是如何发现别的节点的

每个Erlang虚拟机在启动的时候都会尝试启动自带的epmd。epmd就如同大家所知到的DNS一 样,它运行在一个约定的端口上,Erlang虚拟机启动后会在epmd上注册一个自己的节点名字 和监听的端口号。当节点A想连接节点B的时候,节点A首先会从[email protected]中取出ip部分, 之后去连接这个ip上的epmd,当能成功连接epmd后节点A就会去查询节点B的端口,并进行连 接。

代码分析

net_kernel

net_kernel是Erlang集群构建中最关键的部分之一,它高屋建瓴的控制Erlang虚拟机和OTP 库中其它模块成集群的建立和维护。net_kernel是一个gen_server,它在启动后,会完成下 面这些功能:

  1. 会建立一个定时器进程用来和别的节点进行心跳,检测其它节点是否离开集群。
  2. 创建连接管理表
  3. 启动可以连接的端口,用来接收别的节点的连接。

net_kernel:connect

当一个节点需要连接另一个节点的时候,就需要使用该函数了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
handle_call({connect, _, Node}, From, State) when Node =:= node() ->
    async_reply({reply, true, State}, From);
handle_call({connect, Type, Node}, From, State) ->
    verbose({connect, Type, Node}, 1, State),
    case ets:lookup(sys_dist, Node) of
    [Conn] when Conn#connection.state =:= up ->
        async_reply({reply, true, State}, From);
    [Conn] when Conn#connection.state =:= pending ->
        Waiting = Conn#connection.waiting,
        ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
        {noreply, State};
    [Conn] when Conn#connection.state =:= up_pending ->
        Waiting = Conn#connection.waiting,
        ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
        {noreply, State};
    _ ->
        case setup(Node,Type,From,State) of
        {ok, SetupPid} ->
            Owners = [{SetupPid, Node} | State#state.conn_owners],
            {noreply,State#state{conn_owners=Owners}};
        _  ->
            ?connect_failure(Node, {setup_call, failed}),
            async_reply({reply, false, State}, From)
        end
    end;

从中可以看出,如果目标节点是自身,那么直接就忽略掉,返回成功。 如果目标节点不是自身,先看一下ets中是否有向远程节点连接的进程。当这进行连接的进 程状态是up,则直接返回true,否则将请求进程加入连接等待队列中。如果我们没有向远程 节点进行连接的进程,则调用setup函数来建立一个。在setup函数中,会先找出连接远程节 点所使用的模块名称,一般情况下是inet_tcp_dist这个模块。下面先假定是使用 inet_tcp_dist这个模块,这个时候net_kernel会调用inet_tcp_dist:setup,并将成功后的 Erlang进程PID放入sys_dist这个ets中。

net_kernel的心跳

创建的ticker进程,它专门负责发心跳给net_kernel进程,然后net_kernel进程会遍历所有 远程连接的进程,让其进行一次心跳。当需要改变节点的心跳时间的时候,net_kernel会开 启一个aux_ticker进程帮助我们进行过度,直到所有其它节点都知道了该节点改变了心跳周 期为止,当所有其它节点都知道了这个节点的心跳周期发生了变化,这个aux_ticker进程也 就结束了它的历史性任务,安静的退出了。

当节点之间心跳发生异常了,就会发生TCP数据传输故障。当TCP传输发生异常的时候, Ports会按照约定好的规则进行清理,这个可参见dist.c中的erts_do_net_exits。

inet_tcp_dist

inet_tcp_dist模块在整个集群建立当中,提供了协议的支持和连接接入这些细节操作。

inet_tcp_dist:setup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
    spawn_opt(?MODULE, do_setup, 
          [self(), Node, Type, MyNode, LongOrShortNames, SetupTime],
          [link, {priority, max}]).

do_setup(Kernel, Node, Type, MyNode, LongOrShortNames,SetupTime) ->
    ?trace("~p~n",[{inet_tcp_dist,self(),setup,Node}]),
    [Name, Address] = splitnode(Node, LongOrShortNames),
    case inet:getaddr(Address, inet) of
    {ok, Ip} ->
        Timer = dist_util:start_timer(SetupTime),
        %用epmd协议获得远程节点的端口
        case erl_epmd:port_please(Name, Ip) of
        {port, TcpPort, Version} ->
            ?trace("port_please(~p) -> version ~p~n", 
               [Node,Version]),
            dist_util:reset_timer(Timer),
                %连接远程节点
            case inet_tcp:connect(Ip, TcpPort, 
                      [{active, false}, 
                       {packet,2}]) of
            %拿到Socket之后,定义各种回调函数,状态以及状态机函数
            {ok, Socket} ->
                HSData = #hs_data{
                  kernel_pid = Kernel,
                  other_node = Node,
                  this_node = MyNode,
                  socket = Socket,
                  timer = Timer,
                  this_flags = 0,
                  other_version = Version,
                  f_send = fun inet_tcp:send/2,
                  f_recv = fun inet_tcp:recv/3,
                  f_setopts_pre_nodeup = 
                  fun(S) ->
                      inet:setopts
                    (S, 
                     [{active, false},
                      {packet, 4},
                      nodelay()])
                  end,
                  f_setopts_post_nodeup = 
                  fun(S) ->
                      inet:setopts
                    (S, 
                     [{active, true},
                      {deliver, port},
                      {packet, 4},
                      nodelay()])
                  end,
                  f_getll = fun inet:getll/1,
                  f_address = 
                  fun(_,_) ->
                      #net_address{
                   address = {Ip,TcpPort},
                   host = Address,
                   protocol = tcp,
                   family = inet}
                  end,
                  mf_tick = fun ?MODULE:tick/1,
                  mf_getstat = fun ?MODULE:getstat/1,
                  request_type = Type
                 },
                %进行握手
                dist_util:handshake_we_started(HSData);
            _ ->
                %% Other Node may have closed since 
                %% port_please !
                ?trace("other node (~p) "
                   "closed since port_please.~n", 
                   [Node]),
                ?shutdown(Node)
            end;
        _ ->
            ?trace("port_please (~p) "
               "failed.~n", [Node]),
            ?shutdown(Node)
        end;
    _Other ->
        ?trace("inet_getaddr(~p) "
           "failed (~p).~n", [Node,_Other]),
        ?shutdown(Node)
    end.

在这函数当中,可以看到,Erlang每次对外建立连接的时候都需要去对方的epmd上进行查询。 inet_tcp_dist主要注重流程和协议,将TCP传输细节交给了inet这个模块来进行,这样大大 的减少了相应的代码量。在handshake_we_started和远程节点进行一次验证。这个验证过程 非常简单,步骤如下:

  1. 远程节点生成一个随机数,然后将这个随机数发给当前节点。
  2. 当前节点用它所知道的远程节点的cookie加上这个随机数生成一个MD5,并将这个MD5返 回给远程节点。

当完成了验证,会使用do_setnode,告诉Erlang虚拟机该节点已经和目标节点的连接上了。 同时通知net_kernel已经完成远程节点的连接,需要它改变sys_dist的ets状态和进行后续 的操作。

dist.c

Erlang虚拟机中,负责管理节点互联的部分,是用纯C实现的。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
BIF_RETTYPE setnode_3(BIF_ALIST_3)
{
    BIF_RETTYPE ret;
    Uint flags;
    unsigned long version;
    Eterm ic, oc;
    Eterm *tp;
    DistEntry *dep = NULL;
    Port *pp = NULL;

    /* Prepare for success */
    ERTS_BIF_PREP_RET(ret, am_true);

    /*
     * Check and pick out arguments
     */

    if (!is_node_name_atom(BIF_ARG_1) ||
        is_not_internal_port(BIF_ARG_2) ||
        (erts_this_node->sysname == am_Noname)) {
         goto badarg;
    }

    if (!is_tuple(BIF_ARG_3))
         goto badarg;
    tp = tuple_val(BIF_ARG_3);
    if (*tp++ != make_arityval(4))
         goto badarg;
    if (!is_small(*tp))
         goto badarg;
    flags = unsigned_val(*tp++);
    if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0)
         goto badarg;
    ic = *(++tp);
    oc = *(++tp);
    if (!is_atom(ic) || !is_atom(oc))
         goto badarg;

    /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */
    if (!(DFLAG_EXTENDED_REFERENCES & flags)) {
         erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
         erts_dsprintf(dsbufp, "%T", BIF_P->common.id);
         if (BIF_P->common.u.alive.reg)
              erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name);
         erts_dsprintf(dsbufp,
                       " attempted to enable connection to node %T "
                       "which is not able to handle extended references.\n",
                       BIF_ARG_1);
         erts_send_error_to_logger(BIF_P->group_leader, dsbufp);
         goto badarg;
    }

    /*
     * Arguments seem to be in order.
     */

    /* get dist_entry */
    dep = erts_find_or_insert_dist_entry(BIF_ARG_1);
    if (dep == erts_this_dist_entry)
         goto badarg;
    else if (!dep)
         goto system_limit; /* Should never happen!!! */
//通过Port的ID获取Port的结构
    pp = erts_id2port_sflgs(BIF_ARG_2,
                BIF_P,
                ERTS_PROC_LOCK_MAIN,
                ERTS_PORT_SFLGS_INVALID_LOOKUP);
    erts_smp_de_rwlock(dep);

    if (!pp || (erts_atomic32_read_nob(&pp->state)
        & ERTS_PORT_SFLG_EXITING))
         goto badarg;

    if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
         goto badarg;
//如果当前cid和传入的Port的ID相同,且port的sist_entry和找到的dep相同
//那么直接进入结束阶段
    if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
         goto done; /* Already set */

    if (dep->status & ERTS_DE_SFLG_EXITING) {
         /* Suspend on dist entry waiting for the exit to finish */
         ErtsProcList *plp = erts_proclist_create(BIF_P);
         plp->next = NULL;
         erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
         erts_smp_mtx_lock(&dep->qlock);
         erts_proclist_store_last(&dep->suspended, plp);
         erts_smp_mtx_unlock(&dep->qlock);
         goto yield;
    }

    ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));

    if (pp->dist_entry || is_not_nil(dep->cid))
         goto badarg;

    erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);

    /*
     * Dist-ports do not use the "busy port message queue" functionality, but
     * instead use "busy dist entry" functionality.
     */
    {
         ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
         erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
    }
//更新Port所关联的dist
    pp->dist_entry = dep;

    dep->version = version;
    dep->creation = 0;

    ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);

#if 1
    dep->send = (pp->drv_ptr->outputv
         ? dist_port_commandv
         : dist_port_command);
#else
    dep->send = dist_port_command;
#endif
    ASSERT(dep->send);

#ifdef DEBUG
    erts_smp_mtx_lock(&dep->qlock);
    ASSERT(dep->qsize == 0);
    erts_smp_mtx_unlock(&dep->qlock);
#endif
//更新dist_entry的cid
    erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);

    if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
         create_cache(dep);

    erts_smp_de_rwunlock(dep);
    dep = NULL; /* inc of refc transferred to port (dist_entry field) */
//增加远程节点的数量
    inc_no_nodes();
//发送监控信息到调用的进程
    send_nodes_mon_msgs(BIF_P,
            am_nodeup,
            BIF_ARG_1,
            flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
            NIL);
 done:

    if (dep && dep != erts_this_dist_entry) {
         erts_smp_de_rwunlock(dep);
         erts_deref_dist_entry(dep);
    }

    if (pp)
         erts_port_release(pp);

    return ret;

 yield:
    ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P,
             BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
    goto done;

 badarg:
    ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
    goto done;

 system_limit:
    ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT);
    goto done;
}

setnode函数主要完成下面这几个操作:

  1. 将得到的远程节点的名字放入dist的hash表中,并且将这个表项和连接到远程节点的 Port(TCP连接)进行了关联。
  2. 将和远程节点进行连接的Port标记为ERTS_PORT_SFLG_DISTRIBUTION。
  3. 在Erlang虚拟机内广告nodeup消息。

其中给Port设置ERTS_PORT_SFLG_DISTRIBUTION标记是为了下面几个事情:

  1. 让Port出现Busy的时候我们能区分出是普通的Port还是远程连接的Port。
  2. 当Port被销毁的时候,确定是否要调用dist.c中的erts_do_net_exits来告诉Erlang虚拟 机某个节点掉线。

如何提高集群稳定性

epmd

当独立进程epmd发现自己和本地节点的连接断了,那么直接将这个node注册的名字和端口从 自身缓存中删除掉,但是这个删除是有一定延迟的。

但是当empd被不小杀掉了,当empd被再次启动的时候,数据将会全部清除。而且,本地节点 不会自动向epmd重新注册自己的端口等。

dist_port

dist_port负责所有Erlang进程透明调用的数据发送和传输,同时也负责着节点之间存活检 测的任务。由于Erlang的节点检测都是以本节点是否能和对应节点有心跳为视角,如果使用 使用dist_port传输大量的数据,很容易引起dist_port_busy,从而引起节点离线的误判或 性能下降。

Erlang默认会为dist_port设置一个1M的缓存,但是如果在节点之间传输大量的数据很容易 就不够用。如果在设计的时候就意识到自己要传输大量数据,可以使用+zdbbl这个参数来改 变dist_port的缓存。Erlang提供了erlang:system_info函数来查询dist_piort的缓存大小, 同时Erlang还提供erlang:system_monitor函数来监控dist_port_busy。

并且在实际使用中,可以参考Spil Games的架构,对Erlang的集群进行分层, 参考Scaling Distributed Systems

同时需要注意的是,Erlang的集群虽然可以跨越IDC,但是在实际的使用中,并不推荐这样 做,原因如下: 1. IDC间网络延迟偏高,Erlang集群本身对网络延迟敏感。 1. IDC间网络吞吐有限 1. net_kernel并未对高并发连接做优化,很容易被攻击者攻击 1. Erlang集群之间的连接对HA不友好,不如Restful。而IDC之间链路很多时候并不稳定, 需要进行冗余,这对Erlang集群并不友好。

节点重启

这个是非常需要注意的,Erlang节点之间确认存活是需要心跳时间的。Erlang在进行跨节点 操作的时候,都会监控远程节点的状态,尤其是Mnesia数据库很多操作对Erlang节点存活性 是非常敏感的。当一个节点掉线后,不应该立刻重启,一般需要等待该节点心跳事件 * 1.5 的时间。为什么要这样做呢?因为这样做是为了让节点彻底“死掉”,集群中所有的节点都知 道该节点死掉了,这样才不会出现远程操作被锁死的情况(顺便说一句,这个简单问题,在 国内某个著名的通讯SaaS中多次出现,该云的架构师曾经多次提出Erlang不是开箱即用,因 为他根本没搞清楚Erlang的底层原理,甚至可以说分布式系统中的基础原理)。