Skip to content

Commit aecf0e2

Browse files
committed
ref 2
1 parent 846db26 commit aecf0e2

File tree

2 files changed

+30
-32
lines changed

2 files changed

+30
-32
lines changed

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
is/1,
2121
key_from_name/1,
2222
pid_from_name/2,
23-
local_cast/2,
24-
local_call/2,
2523
exists/1,
26-
ff_enabled/0]).
24+
ff_enabled/0,
25+
local_cast/2,
26+
local_call/2]).
2727

28+
%% rabbit_queue_type callbacks
2829
-export([declare/2,
2930
supports_stateful_delivery/0,
3031
deliver/3,
@@ -79,28 +80,25 @@
7980
-spec new(rabbit_amqqueue:name()) ->
8081
amqqueue:amqqueue() | error.
8182
new(#resource{virtual_host = Vhost,
82-
name = <<"amq.rabbitmq.reply-to">>} = QName) ->
83-
new0(QName, self(), Vhost);
83+
name = <<"amq.rabbitmq.reply-to">>} = Name) ->
84+
new0(Name, self(), Vhost);
8485
new(#resource{virtual_host = Vhost,
85-
name = QNameBin} = QName) ->
86-
case pid_from_name(QNameBin, nodes_with_hashes()) of
86+
name = NameBin} = Name) ->
87+
case pid_from_name(NameBin, nodes_with_hashes()) of
8788
{ok, Pid} when is_pid(Pid) ->
88-
new0(QName, Pid, Vhost);
89+
new0(Name, Pid, Vhost);
8990
_ ->
9091
error
9192
end.
9293

93-
new0(QName, Pid, Vhost) ->
94-
amqqueue:new(QName, Pid, false, true, none, [], Vhost, #{}, ?MODULE).
95-
96-
nodes_with_hashes() ->
97-
#{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}.
94+
new0(Name, Pid, Vhost) ->
95+
amqqueue:new(Name, Pid, false, true, none, [], Vhost, #{}, ?MODULE).
9896

9997
-spec is(rabbit_misc:resource_name()) ->
10098
boolean().
10199
is(<<?PREFIX, _/binary>>) ->
102100
true;
103-
is(_) ->
101+
is(Name) when is_binary(Name) ->
104102
false.
105103

106104
init(Q) ->
@@ -121,15 +119,15 @@ consume(_Q, Spec, State) ->
121119
credit = Credit}}.
122120

123121
declare(Q, _Node) ->
124-
#resource{name = QNameBin} = QName = amqqueue:get_name(Q),
125-
case QNameBin of
122+
#resource{name = NameBin} = Name = amqqueue:get_name(Q),
123+
case NameBin of
126124
<<"amq.rabbitmq.reply-to">> ->
127125
{existing, Q};
128126
_ ->
129-
case exists(QName) of
127+
case exists(Name) of
130128
true ->
131129
{existing, Q};
132-
_ ->
130+
false ->
133131
{absent, Q, stopped}
134132
end
135133
end.
@@ -142,7 +140,6 @@ exists(#resource{kind = queue,
142140
case ff_enabled() of
143141
true ->
144142
Request = {has_state, QName, ?MODULE},
145-
%% We use delegate instead of erpc to guarantee that order is preserved.
146143
MFA = {?MODULE, local_call, [Request]},
147144
try delegate:invoke(Pid, MFA)
148145
catch _:_ -> false
@@ -187,7 +184,6 @@ deliver0(Q, Msg) ->
187184
case ff_enabled() of
188185
true ->
189186
Request = {queue_event, QName, {deliver, Msg}},
190-
%% We use delegate instead of erpc to guarantee that order is preserved.
191187
MFA = {?MODULE, local_cast, [Request]},
192188
delegate:invoke_no_result(QPid, MFA);
193189
false ->
@@ -253,20 +249,23 @@ credit(_QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain,
253249
end,
254250
{State#?STATE{delivery_count = DeliveryCount,
255251
credit = Credit},
256-
[{credit_reply, CTag, DeliveryCount, Credit, 0, Drain}]}.
252+
[{credit_reply, CTag, DeliveryCount, Credit, _Available = 0, Drain}]}.
257253

258254
close(#?STATE{}) ->
259255
ok.
260256

261-
update(_, State) ->
257+
update(_, #?STATE{} = State) ->
262258
State.
263259

264-
cancel(_, _, State) ->
260+
cancel(_, _, #?STATE{} = State) ->
265261
{ok, State}.
266262

267263
is_enabled() ->
268264
true.
269265

266+
ff_enabled() ->
267+
rabbit_feature_flags:is_enabled('rabbitmq_4.2.0').
268+
270269
is_compatible(_, _, _) ->
271270
true.
272271

@@ -322,18 +321,18 @@ delete(_, _, _, _) ->
322321
recover(_, _) ->
323322
{[], []}.
324323

325-
settle(_, _, _, _, State) ->
324+
settle(_, _, _, _, #?STATE{} = State) ->
326325
{State, []}.
327326

328-
credit_v1(_, _, _, _, State) ->
327+
credit_v1(_, _, _, _, #?STATE{} = State) ->
329328
{State, []}.
330329

331330
dequeue(_, _, _, _, #?STATE{name = Name}) ->
332331
{protocol_error, not_implemented,
333332
"basic.get not supported by volatile ~ts",
334333
[rabbit_misc:rs(Name)]}.
335334

336-
state_info(_) ->
335+
state_info(#?STATE{}) ->
337336
#{}.
338337

339338
info(_, _) ->
@@ -342,9 +341,6 @@ info(_, _) ->
342341
policy_apply_to_name() ->
343342
<<>>.
344343

345-
ff_enabled() ->
346-
rabbit_feature_flags:is_enabled('rabbitmq_4.2.0').
347-
348344
-spec new_name() ->
349345
rabbit_misc:resource_name().
350346
new_name() ->
@@ -408,3 +404,6 @@ key_from_name(<<?PREFIX, Suffix/binary>>) ->
408404
end;
409405
key_from_name(_) ->
410406
error.
407+
408+
nodes_with_hashes() ->
409+
#{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}.

deps/rabbit_common/src/delegate.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
%% consistent route, to prevent them being reordered. In fact all
1919
%% AMQP-ish things (such as queue declaration results and basic.get)
2020
%% must take the same route as well, to ensure that clients see causal
21-
%% ordering correctly. Therefore we have a rather generic mechanism
22-
%% here rather than just a message-reflector. That's also why we pick
23-
%% the delegate process to use based on a hash of the source pid.
21+
%% ordering correctly. Therefore we can't use erpc. That's also why we
22+
%% pick the delegate process to use based on a hash of the source pid.
2423
%%
2524
%% When a function is invoked using delegate:invoke/2,
2625
%% or delegate:invoke_no_result/2 on a group of pids, the pids are first split

0 commit comments

Comments
 (0)