Skip to content

Commit f4f7bec

Browse files
committed
Support Direct Reply-To for AMQP 1.0
# What? * Support Direct Reply-To for AMQP 1.0 * Compared to AMQP 0.9.1, this PR allows for multiple volatile queues on a single AMQP 1.0 session. Use case: JMS clients can create multiple temporary queues on the same JMS/AMQP session: * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue() * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue() * Fix missing metrics in for Direct Reply-To in AMQP 0.9.1, e.g. `messages_delivered_total` * Fix missing metrics (even without using Direct Reply-To ) in AMQP 0.9.1: If stats level is not `fine`, global metrics `rabbitmq_global_messages_delivered_*` should still be incremented. # Why? * Allow for scalable at-most-once RPC reply delivery Example use case: thousands of requesters connect, send a single request, wait for a single reply, and disconnect. This PR won't create any queue and won't write to the metadata store. * Feature parity with AMQP 0.9.1 # How? This PR extracts the previously channel specific Direct Reply-To code into a new queue type: `rabbit_volatile_queue`. "Volatile" describes the semantics, not a use-case. It signals non-durable, zero-buffer, at-most-once, may-drop, and "not stored in Khepri." This new queue type is then used for AMQP 1.0 and AMQP 0.9.1. Sending to the volatile queue is stateless like previously with Direct Reply-To in AMQP 0.9.1 and like done for the MQTT QoS 0 queue. This allows for use cases where a single responder replies to e.g. 100k different requesters. RabbitMQ will automatically auto grant new link-credit to the responder because the new queue type confirms immediately. The key gets implicitly checked by the channel/session: If the queue name (including the key) doesn’t exist, the `handle_event` callback for this queue isn’t invoked and therefore no delivery will be sent to the responder. This commit supports Direct Reply-To across AMQP 1.0 and 0.9.1. In other words, the requester can be an AMQP 1.0 client while the responder is an AMQP 0.9.1 client or vice versa. RabbitMQ will internally convert between AMQP 0.9.1 `reply_to` and AMQP 1.0 `/queues/<queue>` address. The AMQP 0.9.1 `reply_to` property is expected to contain a queue name. That's in line with the AMQP 0.9.1 spec: > One of the standard message properties is Reply-To, which is designed specifically for carrying the name of reply queues. Compared to AMQP 0.9.1 where the requester sets the `reply_to` property to `amq.rabbitmq.reply-to` and RabbitMQ modifies this field when forwarding the message to the request queue, in AMQP 1.0 the requester learns about the queue name from the broker at link attachment time. The requester has to set the reply-to property to the server generated queue name. That's because the server isn't allowed to modify the bare message. During link attachment time, the client has to set certain fields. These fields are exected to be set by the RabbitMQ client libraries. Here is an Erlang example: ```erl Source = #{address => undefined, durable => none, expiry_policy => <<"link-detach">>, dynamic => true, capabilities => [<<"rabbitmq:volatile-queue">>]}, AttachArgs = #{name => <<"receiver">>, role => {receiver, Source, self()}, snd_settle_mode => settled, rcv_settle_mode => first}, {ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs), AddressReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} -> #'v1_0.attach'{source = #'v1_0.source'{address = {utf8, Addr}}} = Attach, Addr end, ``` The client then sends the message by setting the reply-to address as follows: ```erl amqp10_client:send_msg( SenderRequester, amqp10_msg:set_properties( #{message_id => <<"my ID">>, reply_to => AddressReplyQ}, amqp10_msg:new(<<"tag">>, <<"request">>))), ``` If the responder attaches to the queue target in the reply-to field, RabbitMQ will check if the requester link is still attached. If the requester detached, the link will be refused. The responder can also attach to the anonymous null target and set the `to` field to the `reply-to` address. It's the requester's reponsiblity to grant sufficient link credit to volatile queue. RabbitMQ will drop replies if the requester ran out of link credit. The following Prometheus metric will be incremented: ``` rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"} 0.0 ``` That's in line with the MQTT QoS 0 queue type. The main difference between the volatile queue and the MQTT QoS 0 queue is that the former isn't written to the metadata store. # Breaking Change Prior to this PR the following [documented caveat](https://www.rabbitmq.com/docs/4.0/direct-reply-to#limitations) applied: > If the RPC server publishes with the mandatory flag set then `amq.rabbitmq.reply-to.*` is treated as **not** a queue; i.e. if the server only publishes to this name then the message will be considered "not routed"; a `basic.return` will be sent if the mandatory flag was set. This PR removes this caveat. This PR introduces the following new behaviour: > If the RPC server publishes with the mandatory flag set, then `amq.rabbitmq.reply-to.*` is treated as a queue (assuming this queue name is encoded correctly). However, whether the requester is still there to consume the reply is not checked at routing time. In other words, if the RPC server only publishes to this name, then the message will be considered "routed" and RabbitMQ will therefore not send a `basic.return`.
1 parent 41337bb commit f4f7bec

35 files changed

+2083
-482
lines changed

.github/workflows/test-make-tests.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
- parallel-ct-set-2
2929
- parallel-ct-set-3
3030
- parallel-ct-set-4
31+
- parallel-ct-set-5
3132
- ct-amqp_client
3233
- ct-clustering_management
3334
- eunit ct-dead_lettering

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@
9292
-type max_message_size() :: undefined | non_neg_integer().
9393
-type footer_opt() :: crc32 | adler32.
9494

95-
-type attach_args() :: #{name => binary(),
96-
role => attach_role(),
95+
-type attach_args() :: #{name := binary(),
96+
role := attach_role(),
9797
snd_settle_mode => snd_settle_mode(),
9898
rcv_settle_mode => rcv_settle_mode(),
9999
filter => filter(),
@@ -739,13 +739,19 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
739739

740740
make_source(#{role := {sender, _}}) ->
741741
#'v1_0.source'{};
742-
make_source(#{role := {receiver, Source, _Pid},
743-
filter := Filter}) ->
742+
make_source(#{role := {receiver, Source, _Pid}} = AttachArgs) ->
744743
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
744+
ExpiryPolicy = case Source of
745+
#{expiry_policy := Policy} when is_binary(Policy) ->
746+
{symbol, Policy};
747+
_ ->
748+
undefined
749+
end,
745750
Dynamic = maps:get(dynamic, Source, false),
746-
TranslatedFilter = translate_filters(Filter),
751+
TranslatedFilter = translate_filters(maps:get(filter, AttachArgs, #{})),
747752
#'v1_0.source'{address = make_address(Source),
748753
durable = {uint, Durable},
754+
expiry_policy = ExpiryPolicy,
749755
dynamic = Dynamic,
750756
filter = TranslatedFilter,
751757
capabilities = make_capabilities(Source)}.

deps/rabbit/Makefile

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,15 +236,19 @@ define ct_master.erl
236236
{ok, Pid2, _} = peer:start(StartOpts#{name => "rabbit_shard2"}),
237237
{ok, Pid3, _} = peer:start(StartOpts#{name => "rabbit_shard3"}),
238238
{ok, Pid4, _} = peer:start(StartOpts#{name => "rabbit_shard4"}),
239+
{ok, Pid5, _} = peer:start(StartOpts#{name => "rabbit_shard5"}),
239240
peer:call(Pid1, net_kernel, set_net_ticktime, [5]),
240241
peer:call(Pid2, net_kernel, set_net_ticktime, [5]),
241242
peer:call(Pid3, net_kernel, set_net_ticktime, [5]),
242243
peer:call(Pid4, net_kernel, set_net_ticktime, [5]),
244+
peer:call(Pid5, net_kernel, set_net_ticktime, [5]),
243245
peer:call(Pid1, persistent_term, put, [rabbit_ct_tcp_port_base, 16000]),
244246
peer:call(Pid2, persistent_term, put, [rabbit_ct_tcp_port_base, 20000]),
245247
peer:call(Pid3, persistent_term, put, [rabbit_ct_tcp_port_base, 24000]),
246248
peer:call(Pid4, persistent_term, put, [rabbit_ct_tcp_port_base, 28000]),
249+
peer:call(Pid5, persistent_term, put, [rabbit_ct_tcp_port_base, 32000]),
247250
[{[_], {ok, Results}}] = ct_master_fork:run("$1"),
251+
peer:stop(Pid5),
248252
peer:stop(Pid4),
249253
peer:stop(Pid3),
250254
peer:stop(Pid2),
@@ -258,7 +262,7 @@ endef
258262

259263
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
260264
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
261-
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
265+
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
262266
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
263267

264268
PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange
@@ -276,13 +280,16 @@ PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit ra
276280
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
277281
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue
278282

283+
PARALLEL_CT_SET_5_A = rabbit_direct_reply_to_prop direct_reply_to_amqpl direct_reply_to_amqp
284+
279285
PARALLEL_CT_SET_1 = $(sort $(PARALLEL_CT_SET_1_A) $(PARALLEL_CT_SET_1_B) $(PARALLEL_CT_SET_1_C) $(PARALLEL_CT_SET_1_D))
280286
PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARALLEL_CT_SET_2_C) $(PARALLEL_CT_SET_2_D))
281287
PARALLEL_CT_SET_3 = $(sort $(PARALLEL_CT_SET_3_A) $(PARALLEL_CT_SET_3_B) $(PARALLEL_CT_SET_3_C) $(PARALLEL_CT_SET_3_D))
282288
PARALLEL_CT_SET_4 = $(sort $(PARALLEL_CT_SET_4_A) $(PARALLEL_CT_SET_4_B) $(PARALLEL_CT_SET_4_C) $(PARALLEL_CT_SET_4_D))
289+
PARALLEL_CT_SET_5 = $(PARALLEL_CT_SET_5_A)
283290

284291
SEQUENTIAL_CT_SUITES = amqp_client clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue rabbit_fifo_prop
285-
PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4)
292+
PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4) $(PARALLEL_CT_SET_5)
286293

287294
ifeq ($(filter-out $(SEQUENTIAL_CT_SUITES) $(PARALLEL_CT_SUITES),$(CT_SUITES)),)
288295
parallel-ct-sanity-check:
@@ -308,16 +315,19 @@ define tpl_parallel_ct_test_spec
308315
{node, shard2, 'rabbit_shard2@localhost'}.
309316
{node, shard3, 'rabbit_shard3@localhost'}.
310317
{node, shard4, 'rabbit_shard4@localhost'}.
318+
{node, shard5, 'rabbit_shard5@localhost'}.
311319

312320
{define, 'Set1', [$(call comma_list,$(addsuffix _SUITE,$1))]}.
313321
{define, 'Set2', [$(call comma_list,$(addsuffix _SUITE,$2))]}.
314322
{define, 'Set3', [$(call comma_list,$(addsuffix _SUITE,$3))]}.
315323
{define, 'Set4', [$(call comma_list,$(addsuffix _SUITE,$4))]}.
324+
{define, 'Set5', [$(call comma_list,$(addsuffix _SUITE,$5))]}.
316325

317326
{suites, shard1, "test/", 'Set1'}.
318327
{suites, shard2, "test/", 'Set2'}.
319328
{suites, shard3, "test/", 'Set3'}.
320329
{suites, shard4, "test/", 'Set4'}.
330+
{suites, shard5, "test/", 'Set5'}.
321331
endef
322332

323333
define parallel_ct_set_target
@@ -330,7 +340,7 @@ parallel-ct-set-$(1): test-build
330340
$$(call erlang,$$(call ct_master.erl,ct.set-$(1).spec),-sname parallel_ct_$(PROJECT)@localhost -hidden -kernel net_ticktime 5)
331341
endef
332342

333-
$(foreach set,1 2 3 4,$(eval $(call parallel_ct_set_target,$(set))))
343+
$(foreach set,1 2 3 4 5,$(eval $(call parallel_ct_set_target,$(set))))
334344

335345
# --------------------------------------------------------------------
336346
# Compilation.

deps/rabbit/src/mc_amqpl.erl

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,14 @@ convert_from(mc_amqp, Sections, Env) ->
228228
%% drop it, what else can we do?
229229
undefined
230230
end,
231-
231+
ReplyTo = case unwrap_shortstr(ReplyTo0) of
232+
<<"/queues/", Queue/binary>> ->
233+
try cow_uri:urldecode(Queue)
234+
catch error:_ -> undefined
235+
end;
236+
Other ->
237+
Other
238+
end,
232239
BP = #'P_basic'{message_id = MsgId091,
233240
delivery_mode = DelMode,
234241
expiration = Expiration,
@@ -237,7 +244,7 @@ convert_from(mc_amqp, Sections, Env) ->
237244
[] -> undefined;
238245
AllHeaders -> AllHeaders
239246
end,
240-
reply_to = unwrap_shortstr(ReplyTo0),
247+
reply_to = ReplyTo,
241248
type = Type,
242249
app_id = unwrap_shortstr(GroupId),
243250
priority = Priority,
@@ -349,7 +356,7 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
349356
delivery_mode = DelMode,
350357
headers = Headers0,
351358
user_id = UserId,
352-
reply_to = ReplyTo,
359+
reply_to = ReplyTo0,
353360
type = Type,
354361
priority = Priority,
355362
app_id = AppId,
@@ -382,25 +389,32 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
382389
ttl = wrap(uint, Ttl),
383390
%% TODO: check Priority is a ubyte?
384391
priority = wrap(ubyte, Priority)},
392+
ReplyTo = case ReplyTo0 of
393+
undefined ->
394+
undefined;
395+
_ ->
396+
Queue = uri_string:quote(ReplyTo0),
397+
{utf8, <<"/queues/", Queue/binary>>}
398+
end,
385399
CorrId = case mc_util:urn_string_to_uuid(CorrId0) of
386400
{ok, CorrUUID} ->
387401
{uuid, CorrUUID};
388402
_ ->
389403
wrap(utf8, CorrId0)
390404
end,
391405
MsgId = case mc_util:urn_string_to_uuid(MsgId0) of
392-
{ok, MsgUUID} ->
393-
{uuid, MsgUUID};
394-
_ ->
395-
wrap(utf8, MsgId0)
396-
end,
406+
{ok, MsgUUID} ->
407+
{uuid, MsgUUID};
408+
_ ->
409+
wrap(utf8, MsgId0)
410+
end,
397411
P = case amqp10_section_header(?AMQP10_PROPERTIES_HEADER, Headers) of
398412
undefined ->
399413
#'v1_0.properties'{message_id = MsgId,
400414
user_id = wrap(binary, UserId),
401415
to = undefined,
402416
% subject = wrap(utf8, RKey),
403-
reply_to = wrap(utf8, ReplyTo),
417+
reply_to = ReplyTo,
404418
correlation_id = CorrId,
405419
content_type = wrap(symbol, ContentType),
406420
content_encoding = wrap(symbol, ContentEncoding),

deps/rabbit/src/pid_recomposition.erl

Lines changed: 0 additions & 60 deletions
This file was deleted.

deps/rabbit/src/rabbit.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1157,7 +1157,6 @@ pg_local_amqp_connection() ->
11571157
pg_local_scope(Prefix) ->
11581158
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).
11591159

1160-
11611160
-spec update_cluster_tags() -> 'ok'.
11621161

11631162
update_cluster_tags() ->

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,14 @@ handle_http_req(<<"GET">>,
8787
QName,
8888
fun(Q) ->
8989
{ok, NumMsgs, NumConsumers} = rabbit_amqqueue:stat(Q),
90-
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
91-
{ok, {<<"200">>, RespPayload, PermCaches}}
90+
case rabbit_volatile_queue:is(QNameBin) andalso
91+
not rabbit_volatile_queue:exists(QName) of
92+
true ->
93+
{error, not_found};
94+
false ->
95+
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
96+
{ok, {<<"200">>, RespPayload, PermCaches}}
97+
end
9298
end) of
9399
{ok, Result} ->
94100
Result;

0 commit comments

Comments
 (0)