diff --git a/src/riak_dt_orswot.erl b/src/riak_dt_orswot.erl index 970e399..7db8701 100644 --- a/src/riak_dt_orswot.erl +++ b/src/riak_dt_orswot.erl @@ -279,6 +279,7 @@ remove_all([Elem | Rest], Actor, ORSet, Ctx) -> {ok, ORSet2} = update({remove, Elem}, Actor, ORSet, Ctx), remove_all(Rest, Actor, ORSet2, Ctx). + -spec merge(orswot(), orswot()) -> orswot(). merge({_LHSC, LHSE, _LHSD}=LHS, {_RHSC, RHSE, _RHSD}=RHS) when is_list(LHSE); is_list(RHSE) -> @@ -287,59 +288,92 @@ merge({Clock, Entries, Deferred}, {Clock, Entries, Deferred}) -> {Clock, Entries, Deferred}; merge({LHSClock, LHSEntries, LHSDeferred}, {RHSClock, RHSEntries, RHSDeferred}) -> Clock = riak_dt_vclock:merge([LHSClock, RHSClock]), + RHSEmpty = + dict:is_empty(RHSEntries) andalso + RHSClock == [], + Entries = + case RHSEmpty of + true -> + LHSEntries; + false -> + really_merge({LHSClock, LHSEntries}, {RHSClock, RHSEntries}) + end, + Deffered = merge_deferred(LHSDeferred, RHSDeferred), + apply_deferred(Clock, Entries, Deffered). + + +really_merge({LHSClock, LHSEntries}, {RHSClock, RHSEntries}) -> {Keep, RHSElems} = - ?DICT:fold(fun(Elem, Dots, {Acc, RHSRemaining}) -> - case ?DICT:find(Elem, RHSEntries) of - error -> - %% Only on left, trim dots and keep surviving - case riak_dt_vclock:subtract_dots(Dots, RHSClock) of - [] -> - %% Removed - {Acc, RHSRemaining}; - NewDots -> - {?DICT:store(Elem, NewDots, Acc), RHSRemaining} - end; - {ok, RHSDots} -> - %% On both sides - CommonDots = ordsets:intersection( - ordsets:from_list(Dots), - ordsets:from_list(RHSDots)), - LHSUnique = ordsets:to_list( - ordsets:subtract(ordsets:from_list(Dots), - CommonDots)), - RHSUnique = ordsets:to_list( - ordsets:subtract(ordsets:from_list(RHSDots), - CommonDots)), - LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), - RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), - V = riak_dt_vclock:merge([ordsets:to_list(CommonDots), LHSKeep, RHSKeep]), - %% Perfectly possible that an item in both sets should be dropped - case V of - [] -> - %% Removed from both sides - {Acc, ?DICT:erase(Elem, RHSRemaining)}; - _ -> - {?DICT:store(Elem, V, Acc), ?DICT:erase(Elem, RHSRemaining)} - end - end - end, - {?DICT:new(), RHSEntries}, - LHSEntries), + ?DICT:fold( + fun(Elem, Dots, {Acc, RHSRemaining}) -> + case ?DICT:find(Elem, RHSEntries) of + error -> + %% Only on left, trim dots and keep surviving + case riak_dt_vclock:subtract_dots(Dots, RHSClock) of + [] -> + %% Removed + {Acc, RHSRemaining}; + NewDots -> + {?DICT:store(Elem, NewDots, Acc), RHSRemaining} + end; + {ok, RHSDots} -> + %% On both sides + CommonDots = + ordsets:intersection( + ordsets:from_list(Dots), + ordsets:from_list(RHSDots) + ), + LHSUnique = + ordsets:to_list( + ordsets:subtract( + ordsets:from_list(Dots), + CommonDots + ) + ), + RHSUnique = + ordsets:to_list( + ordsets:subtract( + ordsets:from_list(RHSDots), + CommonDots + ) + ), + LHSKeep = + riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), + RHSKeep = + riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), + V = + riak_dt_vclock:merge( + [ordsets:to_list(CommonDots), LHSKeep, RHSKeep] + ), + %% Perfectly possible that an item in both sets should be dropped + case V of + [] -> + %% Removed from both sides + {Acc, ?DICT:erase(Elem, RHSRemaining)}; + _ -> + { + ?DICT:store(Elem,V, Acc), + ?DICT:erase(Elem, RHSRemaining) + } + end + end + end, + {?DICT:new(), RHSEntries}, + LHSEntries), %%Now what about the stuff left from the right hand side? Do the same to that! - Entries = ?DICT:fold(fun(Elem, Dots, Acc) -> - case riak_dt_vclock:subtract_dots(Dots, LHSClock) of - [] -> - %% Removed - Acc; - NewDots -> - ?DICT:store(Elem, NewDots, Acc) - end - end, - Keep, - RHSElems), - Deffered = merge_deferred(LHSDeferred, RHSDeferred), + ?DICT:fold( + fun(Elem, Dots, Acc) -> + case riak_dt_vclock:subtract_dots(Dots, LHSClock) of + [] -> + %% Removed + Acc; + NewDots -> + ?DICT:store(Elem, NewDots, Acc) + end + end, + Keep, + RHSElems). - apply_deferred(Clock, Entries, Deffered). %% @private merge the deffered operations for both sets. -spec merge_deferred(deferred(), deferred()) -> deferred().