Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 84 additions & 50 deletions src/riak_dt_orswot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ remove_all([Elem | Rest], Actor, ORSet, Ctx) ->
{ok, ORSet2} = update({remove, Elem}, Actor, ORSet, Ctx),
remove_all(Rest, Actor, ORSet2, Ctx).


-spec merge(orswot(), orswot()) -> orswot().
merge({_LHSC, LHSE, _LHSD}=LHS, {_RHSC, RHSE, _RHSD}=RHS) when is_list(LHSE);
is_list(RHSE) ->
Expand All @@ -287,59 +288,92 @@ merge({Clock, Entries, Deferred}, {Clock, Entries, Deferred}) ->
{Clock, Entries, Deferred};
merge({LHSClock, LHSEntries, LHSDeferred}, {RHSClock, RHSEntries, RHSDeferred}) ->
Clock = riak_dt_vclock:merge([LHSClock, RHSClock]),
RHSEmpty =
dict:is_empty(RHSEntries) andalso
RHSClock == [],
Entries =
case RHSEmpty of
true ->
LHSEntries;
false ->
really_merge({LHSClock, LHSEntries}, {RHSClock, RHSEntries})
end,
Deffered = merge_deferred(LHSDeferred, RHSDeferred),
apply_deferred(Clock, Entries, Deffered).


really_merge({LHSClock, LHSEntries}, {RHSClock, RHSEntries}) ->
{Keep, RHSElems} =
?DICT:fold(fun(Elem, Dots, {Acc, RHSRemaining}) ->
case ?DICT:find(Elem, RHSEntries) of
error ->
%% Only on left, trim dots and keep surviving
case riak_dt_vclock:subtract_dots(Dots, RHSClock) of
[] ->
%% Removed
{Acc, RHSRemaining};
NewDots ->
{?DICT:store(Elem, NewDots, Acc), RHSRemaining}
end;
{ok, RHSDots} ->
%% On both sides
CommonDots = ordsets:intersection(
ordsets:from_list(Dots),
ordsets:from_list(RHSDots)),
LHSUnique = ordsets:to_list(
ordsets:subtract(ordsets:from_list(Dots),
CommonDots)),
RHSUnique = ordsets:to_list(
ordsets:subtract(ordsets:from_list(RHSDots),
CommonDots)),
LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock),
RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock),
V = riak_dt_vclock:merge([ordsets:to_list(CommonDots), LHSKeep, RHSKeep]),
%% Perfectly possible that an item in both sets should be dropped
case V of
[] ->
%% Removed from both sides
{Acc, ?DICT:erase(Elem, RHSRemaining)};
_ ->
{?DICT:store(Elem, V, Acc), ?DICT:erase(Elem, RHSRemaining)}
end
end
end,
{?DICT:new(), RHSEntries},
LHSEntries),
?DICT:fold(
fun(Elem, Dots, {Acc, RHSRemaining}) ->
case ?DICT:find(Elem, RHSEntries) of
error ->
%% Only on left, trim dots and keep surviving
case riak_dt_vclock:subtract_dots(Dots, RHSClock) of
[] ->
%% Removed
{Acc, RHSRemaining};
NewDots ->
{?DICT:store(Elem, NewDots, Acc), RHSRemaining}
end;
{ok, RHSDots} ->
%% On both sides
CommonDots =
ordsets:intersection(
ordsets:from_list(Dots),
ordsets:from_list(RHSDots)
),
LHSUnique =
ordsets:to_list(
ordsets:subtract(
ordsets:from_list(Dots),
CommonDots
)
),
RHSUnique =
ordsets:to_list(
ordsets:subtract(
ordsets:from_list(RHSDots),
CommonDots
)
),
LHSKeep =
riak_dt_vclock:subtract_dots(LHSUnique, RHSClock),
RHSKeep =
riak_dt_vclock:subtract_dots(RHSUnique, LHSClock),
V =
riak_dt_vclock:merge(
[ordsets:to_list(CommonDots), LHSKeep, RHSKeep]
),
%% Perfectly possible that an item in both sets should be dropped
case V of
[] ->
%% Removed from both sides
{Acc, ?DICT:erase(Elem, RHSRemaining)};
_ ->
{
?DICT:store(Elem,V, Acc),
?DICT:erase(Elem, RHSRemaining)
}
end
end
end,
{?DICT:new(), RHSEntries},
LHSEntries),
%%Now what about the stuff left from the right hand side? Do the same to that!
Entries = ?DICT:fold(fun(Elem, Dots, Acc) ->
case riak_dt_vclock:subtract_dots(Dots, LHSClock) of
[] ->
%% Removed
Acc;
NewDots ->
?DICT:store(Elem, NewDots, Acc)
end
end,
Keep,
RHSElems),
Deffered = merge_deferred(LHSDeferred, RHSDeferred),
?DICT:fold(
fun(Elem, Dots, Acc) ->
case riak_dt_vclock:subtract_dots(Dots, LHSClock) of
[] ->
%% Removed
Acc;
NewDots ->
?DICT:store(Elem, NewDots, Acc)
end
end,
Keep,
RHSElems).

apply_deferred(Clock, Entries, Deffered).

%% @private merge the deffered operations for both sets.
-spec merge_deferred(deferred(), deferred()) -> deferred().
Expand Down