Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Federated MQTT topic exchange with MQTT 5.0 subscribers #13040

Open
ansd opened this issue Jan 8, 2025 · 9 comments · May be fixed by #13115
Open

Federated MQTT topic exchange with MQTT 5.0 subscribers #13040

ansd opened this issue Jan 8, 2025 · 9 comments · May be fixed by #13115

Comments

@ansd
Copy link
Member

ansd commented Jan 8, 2025

Describe the bug

Federation crashes if the MQTT topic exchange (amq.topic by default) is federated and MQTT 5.0 clients subscribed. That's because bindings are sent from downstream to upstream as described in https://www.rabbitmq.com/docs/federated-exchanges#details and binding arguments containing Erlang record mqtt_subscription_opts cannot be encoded in AMQP 0.9.1.

Reproduction steps

See #13033 (comment)

Expected behavior

Federation should work.

Additional context

Exporting definitions in JSON format also omits the mqtt_subscription_opts record from the binding arguments.

One solution is to encode the MQTT subscription options as an AMQP 0.9.1 table in the binding arguments when storing the binding in the database.

@sergio-aguilar-tuhh
Copy link

Hi @ansd, Could you share more details on how to implement that?

Any guidance or partial patch would be helpful.

Thanks again for your time.

@michaelklishin
Copy link
Member

@sergio-aguilar-tuhh as explained in Discussions, this is not a trivial change and we do not know how exact such non-standard bindings should be propagated.

You are welcome to investigate what the options are. Like I said, the binding propagation approach federation uses has been stable for 15 years. We certainly won't rush any possible solution.

Converting an MQTT subscription details in an AMQP 0-9-1 table is trivial. The question is primarily how to transfer and apply them on the other side of the link without breaking backwards compatibility. Federation is fairly commonly used between clusters that run different versions.

rabbit_federation_exchange_link is the relevant module in the federation plugin.

@sergio-aguilar-tuhh
Copy link

sergio-aguilar-tuhh commented Jan 9, 2025

Could you let me know how I could help to fix this issue ?

I’m eager to contribute in any way possible to help resolve this problem.

@michaelklishin
Copy link
Member

@sergio-aguilar-tuhh take a look at the functions linked to above and try to come up with a design that would allow such non-AMQP 0-9-1 bindings to be propagated without a lot of special casing and complexity, and with backwards compatibility with older versions when such bindings are not present.

@ansd
Copy link
Member Author

ansd commented Jan 10, 2025

Hi @sergio-aguilar-tuhh

One solution is to encode the MQTT subscription options as an AMQP 0.9.1 table in the binding arguments when storing the binding in the database.

This encoding/mapping should happen here. As @michaelklishin wrote, mixed version clusters need to work, i.e. RabbitMQ nodes containing the new code clustered with RabbitMQ nodes containing the old code (during a rolling update to the new version). Also, subscriptions (with subscription options) created before the rolling update need to be present (with the same subscription option semantics) after the rolling update.

@sergio-aguilar-tuhh
Copy link

Hi @michaelklishin and @ansd,

After applying the proposed changes to the binding_args_for_proto_ver/3 function, MQTT v5 communication now works between different modules:

Original Implementation:

binding_args_for_proto_ver(?MQTT_PROTO_V5, TopicFilter, SubOpts) ->
    BindingKey = mqtt_to_amqp(TopicFilter),
    [SubOpts, {<<"x-binding-key">>, longstr, BindingKey}].

Modified Implementation:

binding_args_for_proto_ver(?MQTT_PROTO_V5, TopicFilter, SubOpts = #mqtt_subscription_opts{
                                                        qos = QoS,
                                                        no_local = NoLocal,
                                                        retain_as_published = Rap,
                                                        retain_handling = Rh,
                                                        id = Id
                                                      }) ->
%% Convert the topic filter to an AMQP routing key:
BindingKey = mqtt_to_amqp(TopicFilter),

%% Build an AMQP table for MQTT subscription options:
%% - 'signedint' vs. 'long' vs. 'short' depends on your desired type
%% - Filter out or skip fields if they are `undefined`

BaseSubOptsTable = [
      {<<"qos">>,              signedint, QoS},
      {<<"no_local">>,         bool,      NoLocal},
      {<<"retain_as_published">>, bool,   Rap},
      {<<"retain_handling">>,  signedint, Rh}
    ],

%% Add subscription_id 
SubOptsTable =
  case Id of
    undefined ->
      BaseSubOptsTable;
    _ ->
      BaseSubOptsTable ++ [{<<"subscription_id">>, signedint, Id}]
  end,

%% Combine with the binding key. Federation sees a table instead of a raw record.
[
  {<<"mqtt_subscription_opts">>, table, SubOptsTable},
  {<<"x-binding-key">>,          longstr, BindingKey}
].

@michaelklishin
Copy link
Member

@sergio-aguilar-tuhh yes but does the opposite end create a correct binding? To avoid an exception you can use term_to_binary/1 to convert any Erlang data structure into a binary, which can be sent as a single field table.

The question is: what will the opposite end know what to do with it? Older versions won't, so another question is: is this acceptable? what would be the consequences?

@sergio-aguilar-tuhh
Copy link

Hi @michaelklishin,

I'm unsure how to handle the binding arguments on the receiving end, especially regarding backward compatibility with older RabbitMQ versions. Could you provide further guidance or suggestions on how to proceed ?

@ansd ansd self-assigned this Jan 21, 2025
@ansd ansd linked a pull request Jan 21, 2025 that will close this issue
ansd added a commit that referenced this issue Jan 22, 2025
 ## What?
This commit fixes #13040.

Prior to this commit, exchange federation crashed if the MQTT topic exchange
(`amq.topic` by default) got federated and MQTT 5.0 clients subscribed on the
downstream. That's because the federation plugin sends bindings from downstream
to upstream via AMQP 0.9.1. However, binding arguments containing Erlang record
`mqtt_subscription_opts` (henceforth binding args v1) cannot be encoded in AMQP 0.9.1.

 ## Why?
Federating the MQTT topic exchange could be useful for warm standby use cases.

 ## How?
This commit makes binding arguments a valid AMQP 0.9.1 table (henceforth
binding args v2).

Binding args v2 can only be used if all nodes support it. Hence binding
args v2 comes with feature flag `rabbitmq_4.1.0`. Note that the AMQP
over WebSocket
[PR](#13071) already
introduces this same feature flag. Although the feature flag subsystem
supports plugins to define their own feature flags, and the MQTT plugin
defined its own feature flags in the past, reusing feature flag
`rabbitmq_4.1.0` is simpler.

This commit also avoids database migrations for both Mnesia and Khepri
if feature flag `rabbitmq_4.1.0` gets enabled. Instead, it's simpler to
migrate binding args v1 to binding args v2 at MQTT connection establishment
time if the feature flag is enabled. (If the feature flag is disabled at
connection etablishment time, but gets enabled during the connection
lifetime, the connection keeps using bindings args v1.)

This commit adds two new suites:
1. `federation_SUITE` which tests that federating the MQTT topic
   exchange works, and
2. `feature_flag_SUITE` which tests the binding args migration from v1 to v2.
@ansd
Copy link
Member Author

ansd commented Jan 22, 2025

@sergio-aguilar-tuhh I implemented a fix in #13115. Can you please try this PR and confirm that this PR fixes your use case? (Make sure that the new feature flag 'rabbitmq_4.1.0' is enabled in your tests).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants