-
Notifications
You must be signed in to change notification settings - Fork 4k
Support Direct Reply-To for AMQP 1.0 #14474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
ansd
wants to merge
1
commit into
main
Choose a base branch
from
amqp-direct-reply-to
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
5bdc738
to
9b34aef
Compare
acogoluegnes
added a commit
to rabbitmq/rabbitmq-amqp-java-client
that referenced
this pull request
Sep 1, 2025
7299e82
to
e06a26a
Compare
acogoluegnes
added a commit
to rabbitmq/rabbitmq-amqp-java-client
that referenced
this pull request
Sep 2, 2025
References rabbitmq/rabbitmq-server#14474 Conflicts: src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java
9166ab1
to
f203803
Compare
# What? * Support Direct Reply-To for AMQP 1.0 * Compared to AMQP 0.9.1, this PR allows for multiple volatile queues on a single AMQP 1.0 session. Use case: JMS clients can create multiple temporary queues on the same JMS/AMQP session: * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue() * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue() * Fix missing metrics in for Direct Reply-To in AMQP 0.9.1, e.g. `messages_delivered_total` * Fix missing metrics (even without using Direct Reply-To ) in AMQP 0.9.1: If stats level is not `fine`, global metrics `rabbitmq_global_messages_delivered_*` should still be incremented. # Why? * Allow for scalable at-most-once RPC reply delivery Example use case: thousands of requesters connect, send a single request, wait for a single reply, and disconnect. This PR won't create any queue and won't write to the metadata store. * Feature parity with AMQP 0.9.1 # How? This PR extracts the previously channel specific Direct Reply-To code into a new queue type: `rabbit_volatile_queue`. "Volatile" describes the semantics, not a use-case. It signals non-durable, zero-buffer, at-most-once, may-drop, and "not stored in Khepri." This new queue type is then used for AMQP 1.0 and AMQP 0.9.1. Sending to the volatile queue is stateless like previously with Direct Reply-To in AMQP 0.9.1 and like done for the MQTT QoS 0 queue. This allows for use cases where a single responder replies to e.g. 100k different requesters. RabbitMQ will automatically auto grant new link-credit to the responder because the new queue type confirms immediately. The key gets implicitly checked by the channel/session: If the queue name (including the key) doesn’t exist, the `handle_event` callback for this queue isn’t invoked and therefore no delivery will be sent to the responder. This commit supports Direct Reply-To across AMQP 1.0 and 0.9.1. In other words, the requester can be an AMQP 1.0 client while the responder is an AMQP 0.9.1 client or vice versa. RabbitMQ will internally convert between AMQP 0.9.1 `reply_to` and AMQP 1.0 `/queues/<queue>` address. The AMQP 0.9.1 `reply_to` property is expected to contain a queue name. That's in line with the AMQP 0.9.1 spec: > One of the standard message properties is Reply-To, which is designed specifically for carrying the name of reply queues. Compared to AMQP 0.9.1 where the requester sets the `reply_to` property to `amq.rabbitmq.reply-to` and RabbitMQ modifies this field when forwarding the message to the request queue, in AMQP 1.0 the requester learns about the queue name from the broker at link attachment time. The requester has to set the reply-to property to the server generated queue name. That's because the server isn't allowed to modify the bare message. During link attachment time, the client has to set certain fields. These fields are exected to be set by the RabbitMQ client libraries. Here is an Erlang example: ```erl Source = #{address => undefined, durable => none, expiry_policy => <<"link-detach">>, dynamic => true, capabilities => [<<"rabbitmq:volatile-queue">>]}, AttachArgs = #{name => <<"receiver">>, role => {receiver, Source, self()}, snd_settle_mode => settled, rcv_settle_mode => first}, {ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs), AddressReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} -> #'v1_0.attach'{source = #'v1_0.source'{address = {utf8, Addr}}} = Attach, Addr end, ``` The client then sends the message by setting the reply-to address as follows: ```erl amqp10_client:send_msg( SenderRequester, amqp10_msg:set_properties( #{message_id => <<"my ID">>, reply_to => AddressReplyQ}, amqp10_msg:new(<<"tag">>, <<"request">>))), ``` If the responder attaches to the queue target in the reply-to field, RabbitMQ will check if the requester link is still attached. If the requester detached, the link will be refused. The responder can also attach to the anonymous null target and set the `to` field to the `reply-to` address. It's the requester's reponsiblity to grant sufficient link credit to volatile queue. RabbitMQ will drop replies if the requester ran out of link credit. The following Prometheus metric will be incremented: ``` rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"} 0.0 ``` That's in line with the MQTT QoS 0 queue type. The main difference between the volatile queue and the MQTT QoS 0 queue is that the former isn't written to the metadata store. # Breaking Change Prior to this PR the following [documented caveat](https://www.rabbitmq.com/docs/4.0/direct-reply-to#limitations) applied: > If the RPC server publishes with the mandatory flag set then `amq.rabbitmq.reply-to.*` is treated as **not** a queue; i.e. if the server only publishes to this name then the message will be considered "not routed"; a `basic.return` will be sent if the mandatory flag was set. This PR removes this caveat. This PR introduces the following new behaviour: > If the RPC server publishes with the mandatory flag set, then `amq.rabbitmq.reply-to.*` is treated as a queue (assuming this queue name is encoded correctly). However, whether the requester is still there to consume the reply is not checked at routing time. In other words, if the RPC server only publishes to this name, then the message will be considered "routed" and RabbitMQ will therefore not send a `basic.return`.
aecf0e2
to
f4f7bec
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What?
messages_delivered_total
fine
, global metricsrabbitmq_global_messages_delivered_*
should still be incremented.This PR supersedes #5103
Why?
Example use case: thousands of requesters connect, send a single request, wait for a single reply, and disconnect. This PR won't create any queue and won't write to the metadata store.
How?
This PR extracts the previously channel specific Direct Reply-To code into a new queue type:
rabbit_volatile_queue
."Volatile" describes the semantics, not a use-case. It signals non-durable, zero-buffer, at-most-once, may-drop, and "not stored in Khepri."
This new queue type is then used for AMQP 1.0 and AMQP 0.9.1.
Sending to the volatile queue is stateless like previously with Direct Reply-To in AMQP 0.9.1 and like done for the MQTT QoS 0 queue.
This allows for use cases where a single responder replies to e.g. 100k different requesters.
RabbitMQ will automatically auto grant new link-credit to the responder because the new queue type confirms immediately.
The key gets implicitly checked by the channel/session: If the queue name (including the key) doesn’t exist, the
handle_event
callback for this queue isn’t invoked and therefore no delivery will be sent to the responder.This commit supports Direct Reply-To across AMQP 1.0 and 0.9.1. In other words, the requester can be an AMQP 1.0 client while the responder is an AMQP 0.9.1 client or vice versa.
RabbitMQ will internally convert between AMQP 0.9.1
reply_to
and AMQP 1.0/queues/<queue>
address. The AMQP 0.9.1reply_to
property is expected to contain a queue name. That's in line with the AMQP 0.9.1 spec:Compared to AMQP 0.9.1 where the requester sets the
reply_to
property toamq.rabbitmq.reply-to
and RabbitMQ modifies this field when forwarding the message to the request queue, in AMQP 1.0 the requester learns about the queue name from the broker at link attachment time. The requester has to set the reply-to property to the server generated queue name. That's because the server isn't allowed to modify the bare message.During link attachment time, the client has to set certain fields. These fields are exected to be set by the RabbitMQ client libraries. Here is an Erlang example:
The client then sends the message by setting the reply-to address as follows:
If the responder attaches to the queue target in the reply-to field, RabbitMQ will check if the requester link is still attached. If the requester detached, the link will be refused.
In addition, the responder can check at any time whether the requester is still there by doing an HTTP GET (over AMQP) on the
/queues/<queue>
endpoint. If the HTTP return code is 404, the requester is gone. The motivation is similar to what our docs state for AMQP 0.9.1:This existence check works across both AMQP versions.
The responder can also attach to the anonymous null target and set the
to
field to thereply-to
address.It's the requester's reponsiblity to grant sufficient link credit to volatile queue. RabbitMQ will drop replies if the requester ran out of link credit. The following Prometheus metric will be incremented:
That's in line with the MQTT QoS 0 queue type.
The main difference between the volatile queue and the MQTT QoS 0 queue is that the former isn't written to the metadata store.
Breaking Change
Prior to this PR the following documented caveat applied:
This PR fixes this caveat.
This PR introduces the following new behaviour: