Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT: Support wildcards in topic filters matching retained messages #13048

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fb51c28
test(rabbitmq_mqtt): add wildcard tests for retained messages in ETS …
getlarge Dec 30, 2024
a48599b
feat(rabbitmq_mqtt): add has_wildcards function to check for wildcard…
getlarge Jan 9, 2025
c44392f
feat(rabbitmq_mqtt): add rabbit_globber module for wildcard topic mat…
getlarge Jan 9, 2025
9be537f
test(rabbitmq_mqtt): add unit tests for rabbit_globber module functio…
getlarge Jan 9, 2025
417a3a8
refactor(rabbitmq_mqtt): rename test functions for consistency and cl…
getlarge Jan 10, 2025
d6fb829
chore(rabbitmq_mqtt): update build config
getlarge Jan 10, 2025
b017875
chore: rename rabbit_globber to rabbit_mqtt_topic_matcher
getlarge Jan 13, 2025
e16e709
chore: run `bazel run gazelle`
getlarge Jan 13, 2025
08b1460
refactor(rabbitmq_mqtt): add retained message ets store structure and…
getlarge Jan 17, 2025
055e6bc
refactor(rabbitmq_mqtt): support sending retained messages list
getlarge Jan 18, 2025
a19e1be
test(rabbitmq_mqtt): update publish expectations
getlarge Jan 18, 2025
6d9083b
refactor(rabbitmq_mqtt): remove rabbit_mqtt_topic_matcher and associa…
getlarge Jan 19, 2025
098a7c6
refactor(rabbitmq_mqtt): support retained message list handling
getlarge Jan 19, 2025
cca66b3
fix(rabbitmq_mqtt): enhance ETS table management and recovery process
getlarge Jan 19, 2025
7a6250e
fix(rabbitmq_mqtt): replace ets:delete_object with ets:match_delete
getlarge Jan 19, 2025
54c992d
test(rabbitmq_mqtt): update retained message tests and add recovery s…
getlarge Jan 19, 2025
9bf8665
refactor(rabbitmq_mqtt): add support for wildcard filtering for DETS …
getlarge Jan 20, 2025
7767e62
test(rabbitmq_mqtt): add wildcard filtering tests for DETS store
getlarge Jan 20, 2025
30c9a42
fix(rabbitmq_mqtt): improve topics deletion efficiency
getlarge Jan 20, 2025
092b10d
refactor(rabbitmq_mqtt): improve root node handling and add custom fi…
getlarge Jan 22, 2025
0cdf063
test(rabbitmq_mqtt): ensure DETS tables are unique
getlarge Jan 22, 2025
690f915
feat(rabbitmq_mqtt): add max retained messages count configuration fo…
getlarge Jan 22, 2025
d022993
Merge branch 'main' into rabbitmq-server-8096
getlarge Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,18 @@ rabbitmq_app(
"syntax_tools",
"xmerl",
"crypto",
"horus",
],
license_files = [":license_files"],
priv = [":priv"],
deps = [
"//deps/amqp10_common:erlang_app",
"//deps/khepri:erlang_app",
"//deps/khepri_mnesia_migration:erlang_app",
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_prelaunch:erlang_app",
"@cuttlefish//:erlang_app",
"@gen_batch_server//:erlang_app",
"@khepri//:erlang_app",
"@khepri_mnesia_migration//:erlang_app",
"@observer_cli//:erlang_app",
"@osiris//:erlang_app",
"@ra//:erlang_app",
Expand Down
10 changes: 5 additions & 5 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ def all_beam_files(name = "all_beam_files"):
erlc_opts = "//:erlc_opts",
deps = [
"//deps/amqp10_common:erlang_app",
"//deps/khepri:erlang_app",
"//deps/khepri_mnesia_migration:erlang_app",
"//deps/rabbit_common:erlang_app",
"@khepri//:erlang_app",
"@khepri_mnesia_migration//:erlang_app",
"@ra//:erlang_app",
"@ranch//:erlang_app",
"@stdout_formatter//:erlang_app",
Expand Down Expand Up @@ -507,9 +507,9 @@ def all_test_beam_files(name = "all_test_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = [
"//deps/amqp10_common:erlang_app",
"//deps/khepri:erlang_app",
"//deps/khepri_mnesia_migration:erlang_app",
"//deps/rabbit_common:erlang_app",
"@khepri//:erlang_app",
"@khepri_mnesia_migration//:erlang_app",
"@ra//:erlang_app",
"@ranch//:erlang_app",
"@stdout_formatter//:erlang_app",
Expand Down Expand Up @@ -2021,7 +2021,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/metadata_store_phase1_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app", "@khepri//:erlang_app"],
deps = ["//deps/khepri:erlang_app", "//deps/rabbit_common:erlang_app"],
)
erlang_bytecode(
name = "mc_unit_SUITE_beam_files",
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_consistent_hash_exchange/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ rabbitmq_app(
license_files = [":license_files"],
priv = [":priv"],
deps = [
"//deps/khepri:erlang_app",
"//deps/khepri_mnesia_migration:erlang_app",
"//deps/rabbit:erlang_app",
"//deps/rabbit_common:erlang_app",
"@khepri//:erlang_app",
"@khepri_mnesia_migration//:erlang_app",
],
)

Expand Down
8 changes: 4 additions & 4 deletions deps/rabbitmq_consistent_hash_exchange/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ def all_beam_files(name = "all_beam_files"):
dest = "ebin",
erlc_opts = "//:erlc_opts",
deps = [
"//deps/khepri:erlang_app",
"//deps/khepri_mnesia_migration:erlang_app",
"//deps/rabbit:erlang_app",
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_cli:erlang_app",
"@khepri//:erlang_app",
"@khepri_mnesia_migration//:erlang_app",
],
)

Expand All @@ -47,11 +47,11 @@ def all_test_beam_files(name = "all_test_beam_files"):
dest = "test",
erlc_opts = "//:test_erlc_opts",
deps = [
"//deps/khepri:erlang_app",
"//deps/khepri_mnesia_migration:erlang_app",
"//deps/rabbit:erlang_app",
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_cli:erlang_app",
"@khepri//:erlang_app",
"@khepri_mnesia_migration//:erlang_app",
],
)

Expand Down
1 change: 1 addition & 0 deletions deps/rabbitmq_ct_client_helpers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ rabbitmq_app(
hdrs = [":public_hdrs"],
app_name = "rabbitmq_ct_client_helpers",
beam_files = [":beam_files"],
extra_apps = ["rabbit_common"],
license_files = [":license_files"],
priv = [":priv"],
deps = [
Expand Down
6 changes: 6 additions & 0 deletions deps/rabbitmq_ct_client_helpers/MODULE.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
###############################################################################
# Bazel now uses Bzlmod by default to manage external dependencies.
# Please consider migrating your external dependencies from WORKSPACE to MODULE.bazel.
#
# For more details, please check https://github.com/bazelbuild/bazel/issues/18958
###############################################################################
6 changes: 6 additions & 0 deletions deps/rabbitmq_ct_helpers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ rabbitmq_app(
hdrs = [":public_hdrs"],
app_name = "rabbitmq_ct_helpers",
beam_files = [":beam_files"],
extra_apps = [
"common_test",
"eunit",
"inet_tcp_proxy",
"inets",
],
license_files = [":license_files"],
priv = [":priv"],
deps = [
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_jms_topic_exchange/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ rabbitmq_app(
license_files = [":license_files"],
priv = [":priv"],
deps = [
"//deps/khepri:erlang_app",
"//deps/khepri_mnesia_migration:erlang_app",
"//deps/rabbit:erlang_app",
"//deps/rabbit_common:erlang_app",
"@khepri//:erlang_app",
"@khepri_mnesia_migration//:erlang_app",
],
)

Expand Down
8 changes: 4 additions & 4 deletions deps/rabbitmq_jms_topic_exchange/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def all_beam_files(name = "all_beam_files"):
dest = "ebin",
erlc_opts = "//:erlc_opts",
deps = [
"//deps/khepri:erlang_app",
"//deps/khepri_mnesia_migration:erlang_app",
"//deps/rabbit:erlang_app",
"//deps/rabbit_common:erlang_app",
"@khepri//:erlang_app",
"@khepri_mnesia_migration//:erlang_app",
],
)

Expand All @@ -46,10 +46,10 @@ def all_test_beam_files(name = "all_test_beam_files"):
dest = "test",
erlc_opts = "//:test_erlc_opts",
deps = [
"//deps/khepri:erlang_app",
"//deps/khepri_mnesia_migration:erlang_app",
"//deps/rabbit:erlang_app",
"//deps/rabbit_common:erlang_app",
"@khepri//:erlang_app",
"@khepri_mnesia_migration//:erlang_app",
],
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_mqtt/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ define ct_master.erl
halt(0)
endef

PARALLEL_CT_SET_1_A = auth retainer
PARALLEL_CT_SET_1_A = auth rabbit_mqtt_retained_msg_store retainer
PARALLEL_CT_SET_1_B = cluster command config config_schema mc_mqtt packet_prop \
processor protocol_interop proxy_protocol rabbit_mqtt_confirms reader util
PARALLEL_CT_SET_1_C = java v5
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbitmq_mqtt/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "rabbit_mqtt_retained_msg_store_SUITE_beam_files",
testonly = True,
srcs = ["test/rabbit_mqtt_retained_msg_store_SUITE.erl"],
outs = ["test/rabbit_mqtt_retained_msg_store_SUITE.beam"],
hdrs = ["include/rabbit_mqtt_packet.hrl"],
app_name = "rabbitmq_mqtt",
erlc_opts = "//:test_erlc_opts",
)
72 changes: 40 additions & 32 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -956,44 +956,52 @@ send_retained_messages(Subscriptions, State) ->

-spec send_retained_message(topic_filter(), qos(), state()) -> state().
send_retained_message(TopicFilter0, SubscribeQos,
State0 = #state{packet_id = PacketId0,
cfg = #cfg{retainer_pid = RPid}}) ->
State0 = #state{cfg = #cfg{retainer_pid = RPid}}) ->
TopicFilter = amqp_to_mqtt(TopicFilter0),
case rabbit_mqtt_retainer:fetch(RPid, TopicFilter) of
undefined ->
State0;
#mqtt_msg{qos = MsgQos,
retain = Retain,
payload = Payload,
props = Props0} ->
Qos = effective_qos(MsgQos, SubscribeQos),
%% Wildcards are currently not supported when fetching retained
%% messages. Therefore, TopicFilter must must be a topic name.
{Topic, Props, State1} = process_topic_alias_outbound(TopicFilter, Props0, State0),
{PacketId, State} = case Qos of
?QOS_0 ->
{undefined, State1};
?QOS_1 ->
{PacketId0,
State1#state{packet_id = increment_packet_id(PacketId0)}}
end,
Packet = #mqtt_packet{
fixed = #mqtt_packet_fixed{
type = ?PUBLISH,
qos = Qos,
dup = false,
retain = Retain
},
variable = #mqtt_packet_publish{
packet_id = PacketId,
topic_name = Topic,
props = Props
},
payload = Payload},
_ = send(Packet, State),
State
Msgs when is_list(Msgs) ->
lists:foldl(
fun(Msg, S) ->
send_retained_message_to_client(Msg, TopicFilter, SubscribeQos, S)
end, State0, Msgs);
#mqtt_msg{} = SingleMsg ->
send_retained_message_to_client(SingleMsg, TopicFilter, SubscribeQos, State0)
end.

send_retained_message_to_client(#mqtt_msg{qos = MsgQos,
retain = Retain,
payload = Payload,
props = Props0},
TopicFilter,
SubscribeQos,
State0 = #state{packet_id = PacketId0}) ->
Qos = effective_qos(MsgQos, SubscribeQos),
{Topic, Props, State1} = process_topic_alias_outbound(TopicFilter, Props0, State0),
{PacketId, State} = case Qos of
?QOS_0 ->
{undefined, State1};
?QOS_1 ->
{PacketId0,
State1#state{packet_id = increment_packet_id(PacketId0)}}
end,
Packet = #mqtt_packet{
fixed = #mqtt_packet_fixed{
type = ?PUBLISH,
qos = Qos,
dup = false,
retain = Retain
},
variable = #mqtt_packet_publish{
packet_id = PacketId,
topic_name = Topic,
props = Props
},
payload = Payload},
_ = send(Packet, State),
State.

clear_will_msg(#state{cfg = #cfg{vhost = Vhost,
client_id = ClientId}} = State) ->
QNameBin = rabbit_mqtt_util:queue_name_bin(ClientId, will),
Expand Down
Loading