日期:2014-05-16  浏览次数:20417 次

mnesia的普通transaction写过程(五)事务提交

上一篇博文介绍了mnesia的事务提交准备过程,为每个事务参与结点构造了其提交结构commit,下面将进入到提交过程中,此后将继续分析。

?

mnesia_tm.erl

?

multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->

? ? {DiscNs, RamNs} = commit_nodes(CR, [], []),

? ? Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),

? ? ?ets_insert(Store, Pending),

?

? ? {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),

? ? {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),

? ? ?eval_debug_fun({?MODULE, multi_commit_sym},

? ?[{tid, Tid}, {outcome, Outcome}]),

? ? rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),

? ? rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),

? ? case Outcome of

do_commit ->

? ?mnesia_recover:note_decision(Tid, committed),

? ?do_dirty(Tid, Local),

? ?mnesia_locker:release_tid(Tid),

? ??MODULE ! {delete_transaction, Tid};

{do_abort, _Reason} ->

? ?mnesia_recover:note_decision(Tid, aborted)

? ? end,

? ? ?eval_debug_fun({?MODULE, multi_commit_sym, post},

? ?[{tid, Tid}, {outcome, Outcome}]),

Outcome;

commit_nodes([C | Tail], AccD, AccR)

? ? ? ? when C#commit.disc_copies == [],

? ? ? ? ? ? ?C#commit.disc_only_copies ?== [],

? ? ? ? ? ? ?C#commit.schema_ops == [] ->

? ? commit_nodes(Tail, AccD, [C#commit.node | AccR]);

commit_nodes([C | Tail], AccD, AccR) ->

? ? commit_nodes(Tail, [C#commit.node | AccD], AccR);

commit_nodes([], AccD, AccR) ->

? ? {AccD, AccR}.

取出所有参与事务的结点,这实质上等同于where_to_commit属性记录的所有结点。

ask_commit(Protocol, Tid, CR, DiscNs, RamNs) ->
? ? ask_commit(Protocol, Tid, CR, DiscNs, RamNs, [], no_local).
ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) ->
? ? Node = Head#commit.node,
? ? if
Node == node() ->
? ?ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head);
true ->
? ?Bin = opt_term_to_binary(Protocol, Head, DiscNs++RamNs),
? ?Msg = {ask_commit, Protocol, Tid, Bin, DiscNs, RamNs},
? ?{?MODULE, Node} ! {self(), Msg},
? ?ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local)
? ? end;
ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) ->
? ? {WaitFor, Local}.
第一阶段提交,这次是同步过程,由ask_commit和rec_all组成,向每个结点的事务管理器通知事务属性,包括事务tid和事务协议等,及该结点的commit结构,此处使用的事务协议是sym_trans,为异步同构协议,异步表示该事务一旦在所有参与结点完成提交,则立即完成,不必等待每个结点都将事务日志落盘;同构表示所有事务参与结点的表结构都相同。
同时观察各个结点的事务管理器的执行过程:
doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
{From, {ask_commit, Protocol, Tid, Commit, DiscNs, RamNs}} ->
? ??eval_debug_fun({?MODULE, doit_ask_commit},
? ?[{tid, Tid}, {prot, Protocol}]),<