Skip to content

Commit

Permalink
reject write at leader if conflict
Browse files Browse the repository at this point in the history
This should prevent spurious intra-cluster conflicts most of the
time. It is not true consistency, however. spurious conflicts are
still possible whenever the nodes in the cluster disagree on the
current live set of other nodes.
  • Loading branch information
rnewson committed Jan 10, 2025
1 parent d3eb273 commit fdf909c
Showing 1 changed file with 141 additions and 44 deletions.
185 changes: 141 additions & 44 deletions src/fabric/src/fabric_doc_update.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
doc_count,
w,
grouped_docs,
reply
reply,
update_options,
leaders = [],
started = []
}).

go(_, [], _) ->
Expand All @@ -33,25 +36,25 @@ go(DbName, AllDocs0, Opts) ->
validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
Options = lists:delete(all_or_nothing, Opts),
GroupedDocs = lists:map(
fun({#shard{name = Name, node = Node} = Shard, Docs}) ->
Docs1 = untag_docs(Docs),
Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs1, Options]}),
{Shard#shard{ref = Ref}, Docs}
fun({#shard{} = Shard, Docs}) ->
{Shard#shard{ref = make_ref()}, Docs}
end,
group_docs_by_shard(DbName, AllDocs)
),
{Workers, _} = lists:unzip(GroupedDocs),
RexiMon = fabric_util:create_monitors(Workers),
W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
Acc0 = #acc{
update_options = Options,
waiting_count = length(Workers),
doc_count = length(AllDocs),
w = list_to_integer(W),
grouped_docs = GroupedDocs,
reply = dict:new()
},
Timeout = fabric_util:request_timeout(),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
Acc1 = start_leaders(Acc0),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc1, infinity, Timeout) of
{ok, {Health, Results}} when
Health =:= ok; Health =:= accepted; Health =:= error
->
Expand All @@ -72,61 +75,78 @@ go(DbName, AllDocs0, Opts) ->
rexi_monitor:stop(RexiMon)
end.

handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #acc{} = Acc0) ->
handle_message({rexi_DOWN, _, {_, NodeRef}, _}, Worker, #acc{} = Acc0) ->
#acc{grouped_docs = GroupedDocs} = Acc0,
NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= NodeRef],
skip_message(Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message({rexi_EXIT, _}, Worker, #acc{} = Acc0) ->
#acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message({error, all_dbs_active}, Worker, #acc{} = Acc0) ->
% treat it like rexi_EXIT, the hope at least one copy will return successfully
#acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message(internal_server_error, Worker, #acc{} = Acc0) ->
% happens when we fail to load validation functions in an RPC worker
#acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) ->
{ok, Acc0};
handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
#acc{
waiting_count = WaitingCount,
doc_count = DocCount,
w = W,
grouped_docs = GroupedDocs,
reply = DocReplyDict0
} = Acc0,
{value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
case {WaitingCount, dict:size(DocReplyDict)} of
{1, _} ->
% last message has arrived, we need to conclude things
{Health, W, Reply} = dict:fold(
fun force_reply/3,
{ok, W, []},
DocReplyDict
),
{stop, {Health, Reply}};
{_, DocCount} ->
% we've got at least one reply for each document, let's take a look
case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
continue ->
{ok, Acc0#acc{
waiting_count = WaitingCount - 1,
grouped_docs = NewGrpDocs,
reply = DocReplyDict
}};
{stop, W, FinalReplies} ->
{stop, {ok, FinalReplies}}
end;
_ ->
{ok, Acc0#acc{
waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict
}}
{value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs),
IsLeader = lists:member(Worker#shard.ref, Acc0#acc.leaders),
DocReplyDict = append_update_replies(Docs, Replies, W, IsLeader ,DocReplyDict0),
Acc1 = Acc0#acc{grouped_docs = NewGrpDocs0, reply = DocReplyDict},
Acc2 = remove_conflicts(Docs, Replies, Acc1),
NewGrpDocs = Acc2#acc.grouped_docs,
case skip_message(Acc2) of
{stop, Msg} ->
{stop, Msg};
{ok, Acc3} ->
Acc4 = start_followers(Worker, Acc3),
case {Acc4#acc.waiting_count, dict:size(DocReplyDict)} of
{1, _} ->
% last message has arrived, we need to conclude things
{Health, W, Reply} = dict:fold(
fun force_reply/3,
{ok, W, []},
DocReplyDict
),
{stop, {Health, Reply}};
{_, DocCount} ->
% we've got at least one reply for each document, let's take a look
case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
continue ->
{ok, Acc4#acc{
waiting_count = Acc4#acc.waiting_count - 1,
grouped_docs = NewGrpDocs
}};
{stop, W, FinalReplies} ->
{stop, {ok, FinalReplies}}
end;
_ ->
{ok, Acc4#acc{
waiting_count = Acc4#acc.waiting_count - 1,
grouped_docs = NewGrpDocs
}}
end
end;
handle_message({missing_stub, Stub}, _, _) ->
throw({missing_stub, Stub});
Expand Down Expand Up @@ -318,13 +338,90 @@ group_docs_by_shard(DbName, Docs) ->
)
).

append_update_replies([], [], DocReplyDict) ->
%% use 'lowest' node that hosts this shard range as leader
is_leader(Worker, Workers) ->
Worker == lists:min([W#shard.node || W <- Workers, W#shard.range == Worker#shard.range]).

start_leaders(#acc{} = Acc0) ->
#acc{grouped_docs = GroupedDocs} = Acc0,
{Workers, _} = lists:unzip(GroupedDocs),
LeaderRefs = lists:foldl(
fun({Worker, Docs}, RefAcc) ->
case is_leader(Worker, Workers) of
true ->
start_worker(Worker, Docs, Acc0),
[Worker#shard.ref | RefAcc];
false ->
RefAcc
end
end,
[],
GroupedDocs
),
Acc0#acc{leaders = LeaderRefs, started = LeaderRefs}.

start_followers(#shard{} = Leader, #acc{} = Acc0) ->
Followers = [
{Worker, Docs}
|| {Worker, Docs} <- Acc0#acc.grouped_docs,
Worker#shard.range == Leader#shard.range,
not lists:member(Worker#shard.ref, Acc0#acc.started)
],
lists:foreach(
fun({Worker, Docs}) ->
start_worker(Worker, Docs, Acc0)
end,
Followers
),
Started = [Ref || {#shard{ref = Ref}, _Docs} <- Followers],
Acc0#acc{started = lists:append([Started, Acc0#acc.started])}.

start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc0) when is_reference(Ref) ->
#shard{name = Name, node = Node} = Worker,
#acc{update_options = UpdateOptions} = Acc0,
rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name, untag_docs(Docs), UpdateOptions]}),
ok;
start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
% for unit tests below.
ok.

append_update_replies([], [], _W, _IsLeader, DocReplyDict) ->
DocReplyDict;
append_update_replies([Doc | Rest], [], Dict0) ->
append_update_replies([Doc | Rest], [], W, IsLeader, Dict0) ->
% icky, if replicated_changes only errors show up in result
append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) ->
append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
append_update_replies(Rest, [], W, IsLeader, dict:append(Doc, noreply, Dict0));
append_update_replies([Doc | Rest1], [conflict | Rest2], W, true, Dict0) ->
%% fake conflict replies from followers as we won't ask them
append_update_replies(
Rest1, Rest2, W, true, dict:append_list(Doc, lists:duplicate(W, conflict), Dict0)
);
append_update_replies([Doc | Rest1], [Reply | Rest2], W, IsLeader, Dict0) ->
append_update_replies(Rest1, Rest2, W, IsLeader, dict:append(Doc, Reply, Dict0)).

%% leader found a conflict, remove that doc from the other (follower) workers,
%% removing the worker entirely if no docs remain.
remove_conflicts([], [], #acc{} = Acc0) ->
Acc0;
remove_conflicts([Doc | DocRest], [conflict | ReplyRest], #acc{} = Acc0) ->
#acc{grouped_docs = GroupedDocs0} = Acc0,
GroupedDocs1 = lists:foldl(
fun({Worker, Docs}, FoldAcc) ->
case lists:delete(Doc, Docs) of
[] ->
FoldAcc#acc{waiting_count = FoldAcc#acc.waiting_count - 1};
Rest ->
[{Worker, Rest} | FoldAcc]
end
end,
[],
GroupedDocs0
),
Acc1 = Acc0#acc{grouped_docs = GroupedDocs1},
remove_conflicts(DocRest, ReplyRest, Acc1);
remove_conflicts([_Doc | DocRest], [_Reply | ReplyRest], #acc{} = Acc0) ->
remove_conflicts(DocRest, ReplyRest, Acc0);
remove_conflicts([_Doc | DocRest], [], #acc{} = Acc0) ->
remove_conflicts(DocRest, [], Acc0).

skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict}) ->
{Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict),
Expand Down

0 comments on commit fdf909c

Please sign in to comment.