Erlang/Elixir 深入浅出 Mnesia-schema 创建 (1)

DavidAlphaFox · 发布于 2017年12月14日 · 488 次阅读
84794b

Mnesia是什么

Mnesia是一个分布式数据库管理系统(DBMS),适合于电信和其它需要持续运行和具备软实时特性的Erlang应用,是构建电信应用的控制系统平台开放式电信平台(OTP)的一部分。 从这里可以看出Mnesia是Erlang/OTP平台内置的数据库。开发该数据库的原因是电信应用苛刻的容错和高可靠性需求,这些需求如下:

  1. 实时快速的键/值检索
  2. 非实时的复杂查询主要在运营和维护时进行
  3. 分布式的应用,从而数据也必须分布
  4. 高容错性
  5. 可动态重新配置
  6. 存储复杂的对象数据

如何使用Mnesia

Mnesia作为一个数据库,使用的时候就有一定的要求,相对于其它数据库而言,这些需求是非常简单的。

使用Mnesia需要满足以下需求:

  1. 操作系统可以运行Erlang/OTP平台
  2. 已经创建Mnesia的schema表

满足这两点Mnesia就可以使用了。本文将介绍Mnesia是如何创建schema表的

Mnesia的cstruct结构

-record(cstruct, {name,                    % Atom 表名字
          type = set,                      % set | bag
          ram_copies = [],                 % [Node]
          disc_copies = [],                % [Node]
          disc_only_copies = [],           % [Node]
          load_order = 0,                  % Integer
          access_mode = read_write,        % read_write | read_only
          majority = false,                % true | false
          index = [],                      % [Integer]
          snmp = [],                       % Snmp Ustruct
          local_content = false,           % true | false
          record_name = {bad_record_name}, % Atom (Default = Name) 表中存放的record的名字
          attributes = [key, val],         % [Atom] record中的属性名字
          user_properties = [],            % [Record]
          frag_properties = [],            % [{Key, Val]
          storage_properties = [],         % [{Key, Val]
                  cookie = ?unique_cookie,         % Term
                  version = {{2, 0}, []}}).        % {{Integer, Integer}, [Node]}

Erlang的cstruct非常简明扼要的定义了Mnesia的一张表的属性。对Mnesia来讲,一张表最基本需要包含下面的信息

  1. name,表名字
  2. type,存储模式
  3. access_mode,访问权限
  4. record_name,存储字段的record名称
  5. attributes,字段名称

剩下的字段,更多是和集群,容错以及分片相关的。因为有分片技术的存在,就不要再说Mnesia存储上限是4G啥的了。

schema创建

确认无schema阶段

在mnesia_bup的create_schema中会使用mnesia_schema:ensure_no_schema来确认单节点或集群的所有节点上都没有schema相关的数据。

%尝试读取远程的schema
ensure_no_schema([H|T]) when is_atom(H) ->
    case rpc:call(H, ?MODULE, remote_read_schema, []) of
        {badrpc, Reason} ->
            %% 返回建表失败
            {H, {"All nodes not running", H, Reason}};
        {ok,Source, _} when Source /= default ->
            %% 返回的source是非default的时候,就代表已经存在了schema表
            {H, {already_exists, H}};
        _ ->
            ensure_no_schema(T)
    end;
ensure_no_schema([H|_]) ->
    {error,{badarg, H}};
ensure_no_schema([]) ->
    ok.

ensure_no_schema是通过Erlang/OTP平台的rpc模块来尝试读取所有节点是否存在,如果存在了会告诉发起创建请求的进程already_exists,如果某个节点无法链接,就会报错。如果在这阶段出现异常,会立刻终止创建。

构建临时备份阶段

mnesia_bup会在mnesia数据目录下创建一个节点名+时间戳的临时文件,类似 [email protected] 这种形式。之后会通过make_initial_backup来从0构建一个backup文件,用来创建Mnesia的schema。

make_initial_backup(Ns, Opaque, Mod) ->
    %%获取最开始的元数据表
    %%元数据是cstruct的[{key,value}]形式
    Orig = mnesia_schema:get_initial_schema(disc_copies, Ns),
    %% 删除掉storage_properties和majority这两个字段
    Modded = proplists:delete(storage_properties, proplists:delete(majority, Orig)),
    %% 向schema表中写入表名和cstruct
    Schema = [{schema, schema, Modded}],
    O2 = do_apply(Mod, open_write, [Opaque], Opaque),
    %写入日志头
    %% 包括日志版本,日志类型,mnesia版本,节点名称,生成时间
    %% 这里日志版本1.2  类型 backup_log
    O3 = do_apply(Mod, write, [O2, [mnesia_log:backup_log_header()]], O2),
    %写入schema数据
    O4 = do_apply(Mod, write, [O3, Schema], O3),
    %%生成Opaque所代表的文件
    O5 = do_apply(Mod, commit_write, [O4], O4),
    {ok, O5}.

先通过mnesia_schema:get_initial_schema构建出一个schema的cstruct结构,然后通过mnesia_backup的日志模式,[{schema, schema, Modded}]写入的日志文件中。在写入真实数据前,会先写入一个mnesia_log:backup_log_header()的日志头,用来说明是什么日志和日志的版本。

安装备份阶段

mnesia_bup会使用do_install_fallback来将上一个阶段构建出来的临时备份安装到单节点或集群上。 安装过程可以明确为以下几步:

  1. 初始化安装进程
  2. 初始化安装状态
  3. 在集群个节点上创建fallback_receiver
  4. 从上阶段临时文件中读取数据,并同步到集群各个节点上
install_fallback_master(ClientPid, FA) ->
    %% 捕获退出异常,关联进程崩溃,但是并不真正捕获
    %% 而是防止崩溃后引起当前进程退出,打断元数据创建
    process_flag(trap_exit, true),
    %% 设置状态
    State = {start, FA},
    %% 拿出日志文件
    Opaque = FA#fallback_args.opaque,
    Mod = FA#fallback_args.module,
    Res = (catch iterate(Mod, fun restore_recs/4, Opaque, State)),
    unlink(ClientPid),
    ClientPid ! {self(), Res},
    exit(shutdown).

安装备份文件的时候,会创建一个进程来进行备份文件安装。该进程为了防止请求进程打断安装过程,会进行退出异常捕获。

-spec fallback_receiver(pid(), fallback_args()) -> no_return().
%Master,在此处表示,整个mnesia集群在create_schema的时候的发起者
fallback_receiver(Master, FA) ->
    process_flag(trap_exit, true),
%将自己注册到本地名字库,防止创建出另一个fallback_receiver进程
    case catch register(mnesia_fallback, self()) of
        {'EXIT', _} ->
            Reason = {already_exists, node()},
            local_fallback_error(Master, Reason);
        true ->
            FA2 = check_fallback_dir(Master, FA),
            Bup = FA2#fallback_args.fallback_bup,
            %检查是否有backup
            case mnesia_lib:exists(Bup) of
                true ->
                    %如果有则报错
                    Reason2 = {already_exists, node()},
                    local_fallback_error(Master, Reason2);
                false ->
                    %如果没有,创建新的backup的临时文件
                    Mod = mnesia_backup,
                    %% 删除FALLBACK.TMP文件
                    Tmp = FA2#fallback_args.fallback_tmp,
                    R = #restore{mode = replace,
                                 bup_module = Mod,
                                 bup_data = Tmp},
                    file:delete(Tmp),
                    %开始接收fallback信息
                    case catch fallback_receiver_loop(Master, R, FA2, schema) of
                        {error, Reason} ->
                            local_fallback_error(Master, Reason);
                        Other ->
                            exit(Other)
                    end
            end
    end.

fallback_receiver进程会在集群的每个节点上创建一个,其中的Master就是上面所说的install_fallback_master这个函数所在的进程。fallback_receiver只是做一些基本的防止重入和错误检查,真正的业务是在fallback_receiver_loop函数中处理。

fallback_receiver_loop(Master, R, FA, State) ->
    receive
        {Master, {start, Header, Schema}} when State =:= schema ->
            Dir = FA#fallback_args.mnesia_dir,
            throw_bad_res(ok, mnesia_schema:opt_create_dir(true, Dir)),
            %% 创建FALLBACK.TMP文件
            R2 = safe_apply(R, open_write, [R#restore.bup_data]),
            R3 = safe_apply(R2, write, [R2#restore.bup_data, [Header]]),
            BupSchema = [schema2bup(S) || S <- Schema],
            R4 = safe_apply(R3, write, [R3#restore.bup_data, BupSchema]),
            Master ! {self(), ok},
            %% schema的日志已经写入文件了
            %% 状态切换到接收records
            fallback_receiver_loop(Master, R4, FA, records);

        {Master, {records, Recs}} when State =:= records ->
            R2 = safe_apply(R, write, [R#restore.bup_data, Recs]),
            Master ! {self(), ok},
            fallback_receiver_loop(Master, R2, FA, records);
        %收到swap,进行commit,并将临时文件重命名为backup文件
        {Master, swap} when State =/= schema ->
            ?eval_debug_fun({?MODULE, fallback_receiver_loop, pre_swap}, []),
            safe_apply(R, commit_write, [R#restore.bup_data]),
            Bup = FA#fallback_args.fallback_bup,
            Tmp = FA#fallback_args.fallback_tmp,
            %% 立刻重命名文件,将FALLBACK.TMP重命名为FALLBACK.BUP
            throw_bad_res(ok, file:rename(Tmp, Bup)),
            catch mnesia_lib:set(active_fallback, true),
            ?eval_debug_fun({?MODULE, fallback_receiver_loop, post_swap}, []),
            Master ! {self(), ok},
            fallback_receiver_loop(Master, R, FA, stop);

        {Master, stop} when State =:= stop ->
            stopped;

        Msg ->
            safe_apply(R, abort_write, [R#restore.bup_data]),
            Tmp = FA#fallback_args.fallback_tmp,
            file:delete(Tmp),
            throw({error, "Unexpected msg fallback_receiver_loop", Msg})
    end.

fallback_receiver_loop循环State的初始值为{start,FA},接着不断从发起者出接受schema数据和record数据,并写入FALLBACK.TMP中,当发起者传送完所有数据会要求fallback_receiver进程将FALLBACK.TMP文件重命名为FALLBACK.BUP。

总结

至此,schema创建的第一个阶段已经结束,但是发现mnesia数据目录下并没有生成schema.DAT文件,在后续的文章中将会介绍如何生成schema.DAT文件

共收到 0 条回复
84794b DavidAlphaFox 深入浅出 Mnesia-schema 创建 (2) 中提及了此贴 12月19日 09:18
需要 登录/注册 后方可回复, 如果你还没有账号请点击这里 注册