Skip to content

Commit e06a26a

Browse files
committed
Support Direct Reply-To for AMQP 1.0
# What? * Support Direct Reply-To for AMQP 1.0 * Compared to AMQP 0.9.1, this PR allows for multiple volatile queues on a single AMQP 1.0 session. Use case: JMS clients can create multiple temporary queues on the same JMS/AMQP session: * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue() * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue() * Huge speed up for Direct Reply-To in AMQP 0.9.1 because prior to this PR sending each RPC reply was bottlenecked by the slowest network link between the RabbitMQ node the responder connected to and all other RabbitMQ nodes. * Fix missing metrics in for Direct Reply-To in AMQP 0.9.1, e.g. `messages_delivered_total` * Fix missing metrics (even without using Direct Reply-To ) in AMQP 0.9.1: If stats level is not `fine`, global metrics `rabbitmq_global_messages_delivered_*` should still be incremented. # Why? * Allow for scalable at-most-once RPC reply delivery Example use case: thousands of requesters connect, send a single request, wait for a single reply, and disconnect. This PR won't create any queue and won't write to the metadata store. * Feature parity with AMQP 0.9.1 # How? This PR extracts the previously channel specific Direct Reply-To code into a new queue type: `rabbit_volatile_queue`. "Volatile" describes the semantics, not a use-case. It signals non-durable, zero-buffer, at-most-once, may-drop, and "not stored in Khepri." This new queue type is then used for AMQP 1.0 and AMQP 0.9.1. Sending to the volatile queue is stateless like previously with Direct Reply-To in AMQP 0.9.1 and like done for the MQTT QoS 0 queue. This allows for use cases where a single responder replies to e.g. 100k different requesters. RabbitMQ will automatically auto grant new link-credit to the responder because the new queue type confirms immediately. The key gets implicitly checked by the channel/session: If the queue name (including the key) doesn’t exist, the `handle_event` callback for this queue isn’t invoked and therefore no delivery will be sent to the responder. This commit supports Direct Reply-To across AMQP 1.0 and 0.9.1. In other words, the requester can be an AMQP 1.0 client while the responder is an AMQP 0.9.1 client or vice versa. RabbitMQ will internally convert between AMQP 0.9.1 `reply_to` and AMQP 1.0 `/queues/<queue>` address. The AMQP 0.9.1 `reply_to` property is expected to contain a queue name. That's in line with the AMQP 0.9.1 spec: > One of the standard message properties is Reply-To, which is designed specifically for carrying the name of reply queues. Compared to AMQP 0.9.1 where the requester sets the `reply_to` property to `amq.rabbitmq.reply-to` and RabbitMQ modifies this field when forwarding the message to the request queue, in AMQP 1.0 the requester learns about the queue name from the broker at link attachment time. The requester has to set the reply-to property to the server generated queue name. That's because the server isn't allowed to modify the bare message. During link attachment time, the client has to set certain fields. These fields are exected to be set by the RabbitMQ client libraries. Here is an Erlang example: ```erl Source = #{address => undefined, durable => none, expiry_policy => <<"link-detach">>, dynamic => true, capabilities => [<<"rabbitmq:volatile-queue">>]}, AttachArgs = #{name => <<"receiver">>, role => {receiver, Source, self()}, snd_settle_mode => settled, rcv_settle_mode => first}, {ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs), AddressReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} -> #'v1_0.attach'{source = #'v1_0.source'{address = {utf8, Addr}}} = Attach, Addr end, ``` The client then sends the message by setting the reply-to address as follows: ```erl amqp10_client:send_msg( SenderRequester, amqp10_msg:set_properties( #{message_id => <<"my ID">>, reply_to => AddressReplyQ}, amqp10_msg:new(<<"tag">>, <<"request">>))), ``` If the responder attaches to the queue target in the reply-to field, RabbitMQ will check if the requester link is still attached. If the requester detached, the link will be refused. The responder can also attach to the anonymous null target and set the `to` field to the `reply-to` address. It's the requester's reponsiblity to grant sufficient link credit to volatile queue. RabbitMQ will drop replies if the requester ran out of link credit. The following Prometheus metric will be incremented: ``` rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"} 0.0 ``` That's in line with the MQTT QoS 0 queue type. The main difference between the volatile queue and the MQTT QoS 0 queue is that the former isn't written to the metadata store.
1 parent bb7ead2 commit e06a26a

32 files changed

+1738
-453
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@
9292
-type max_message_size() :: undefined | non_neg_integer().
9393
-type footer_opt() :: crc32 | adler32.
9494

95-
-type attach_args() :: #{name => binary(),
96-
role => attach_role(),
95+
-type attach_args() :: #{name := binary(),
96+
role := attach_role(),
9797
snd_settle_mode => snd_settle_mode(),
9898
rcv_settle_mode => rcv_settle_mode(),
9999
filter => filter(),
@@ -739,13 +739,19 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
739739

740740
make_source(#{role := {sender, _}}) ->
741741
#'v1_0.source'{};
742-
make_source(#{role := {receiver, Source, _Pid},
743-
filter := Filter}) ->
742+
make_source(#{role := {receiver, Source, _Pid}} = AttachArgs) ->
744743
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
744+
ExpiryPolicy = case Source of
745+
#{expiry_policy := Policy} when is_binary(Policy) ->
746+
{symbol, Policy};
747+
_ ->
748+
undefined
749+
end,
745750
Dynamic = maps:get(dynamic, Source, false),
746-
TranslatedFilter = translate_filters(Filter),
751+
TranslatedFilter = translate_filters(maps:get(filter, AttachArgs, #{})),
747752
#'v1_0.source'{address = make_address(Source),
748753
durable = {uint, Durable},
754+
expiry_policy = ExpiryPolicy,
749755
dynamic = Dynamic,
750756
filter = TranslatedFilter,
751757
capabilities = make_capabilities(Source)}.

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ endef
258258

259259
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
260260
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
261-
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
261+
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack direct_reply_to_amqp direct_reply_to_amqpl backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
262262
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
263263

264264
PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange

deps/rabbit/ct.test.spec

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
, amqp_dotnet_SUITE
2222
, amqp_jms_SUITE
2323
, amqpl_consumer_ack_SUITE
24-
, amqpl_direct_reply_to_SUITE
24+
, direct_reply_to_amqpl_SUITE
25+
, direct_reply_to_amqp_SUITE
2526
, amqqueue_backward_compatibility_SUITE
2627
, backing_queue_SUITE
2728
, bindings_SUITE

deps/rabbit/src/mc_amqpl.erl

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,14 @@ convert_from(mc_amqp, Sections, Env) ->
228228
%% drop it, what else can we do?
229229
undefined
230230
end,
231-
231+
ReplyTo = case unwrap_shortstr(ReplyTo0) of
232+
<<"/queues/", Queue/binary>> ->
233+
try cow_uri:urldecode(Queue)
234+
catch error:_ -> undefined
235+
end;
236+
Other ->
237+
Other
238+
end,
232239
BP = #'P_basic'{message_id = MsgId091,
233240
delivery_mode = DelMode,
234241
expiration = Expiration,
@@ -237,7 +244,7 @@ convert_from(mc_amqp, Sections, Env) ->
237244
[] -> undefined;
238245
AllHeaders -> AllHeaders
239246
end,
240-
reply_to = unwrap_shortstr(ReplyTo0),
247+
reply_to = ReplyTo,
241248
type = Type,
242249
app_id = unwrap_shortstr(GroupId),
243250
priority = Priority,
@@ -349,7 +356,7 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
349356
delivery_mode = DelMode,
350357
headers = Headers0,
351358
user_id = UserId,
352-
reply_to = ReplyTo,
359+
reply_to = ReplyTo0,
353360
type = Type,
354361
priority = Priority,
355362
app_id = AppId,
@@ -382,25 +389,32 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
382389
ttl = wrap(uint, Ttl),
383390
%% TODO: check Priority is a ubyte?
384391
priority = wrap(ubyte, Priority)},
392+
ReplyTo = case ReplyTo0 of
393+
undefined ->
394+
undefined;
395+
_ ->
396+
Queue = uri_string:quote(ReplyTo0),
397+
{utf8, <<"/queues/", Queue/binary>>}
398+
end,
385399
CorrId = case mc_util:urn_string_to_uuid(CorrId0) of
386400
{ok, CorrUUID} ->
387401
{uuid, CorrUUID};
388402
_ ->
389403
wrap(utf8, CorrId0)
390404
end,
391405
MsgId = case mc_util:urn_string_to_uuid(MsgId0) of
392-
{ok, MsgUUID} ->
393-
{uuid, MsgUUID};
394-
_ ->
395-
wrap(utf8, MsgId0)
396-
end,
406+
{ok, MsgUUID} ->
407+
{uuid, MsgUUID};
408+
_ ->
409+
wrap(utf8, MsgId0)
410+
end,
397411
P = case amqp10_section_header(?AMQP10_PROPERTIES_HEADER, Headers) of
398412
undefined ->
399413
#'v1_0.properties'{message_id = MsgId,
400414
user_id = wrap(binary, UserId),
401415
to = undefined,
402416
% subject = wrap(utf8, RKey),
403-
reply_to = wrap(utf8, ReplyTo),
417+
reply_to = ReplyTo,
404418
correlation_id = CorrId,
405419
content_type = wrap(symbol, ContentType),
406420
content_encoding = wrap(symbol, ContentEncoding),

deps/rabbit/src/pid_recomposition.erl

Lines changed: 0 additions & 60 deletions
This file was deleted.

deps/rabbit/src/rabbit.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1002,6 +1002,7 @@ start(normal, []) ->
10021002
end,
10031003

10041004
persist_static_configuration(),
1005+
persistent_term:put(cp_dot, binary:compile_pattern(<<".">>)),
10051006

10061007
?LOG_DEBUG(""),
10071008
?LOG_DEBUG("== Boot steps =="),
@@ -1157,7 +1158,6 @@ pg_local_amqp_connection() ->
11571158
pg_local_scope(Prefix) ->
11581159
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).
11591160

1160-
11611161
-spec update_cluster_tags() -> 'ok'.
11621162

11631163
update_cluster_tags() ->

0 commit comments

Comments
 (0)