Skip to content

Commit

Permalink
GH-9112: Workaround for Paho stopReconnectCycle
Browse files Browse the repository at this point in the history
Fixes: #9112

`Mqttv5ClientManager` hangs in `stop()` if never was connected.
The scheduled reconnect timer in the client is never cancelled.

* Call `stopReconnectCycle()` on the client via reflection when we disconnect
from the client in Spring Integration MQTT components

(cherry picked from commit 937da13)
  • Loading branch information
artembilan authored and spring-builds committed May 21, 2024
1 parent ace6cfe commit f11de7f
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.aot;

import java.util.stream.Stream;

import org.springframework.aot.hint.ExecutableMode;
import org.springframework.aot.hint.ReflectionHints;
import org.springframework.aot.hint.RuntimeHints;
import org.springframework.aot.hint.RuntimeHintsRegistrar;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;

/**
* {@link RuntimeHintsRegistrar} for Spring Integration MQTT module.
*
* @author Artem Bilan
*
* @since 6.1.9
*/
class MqttRuntimeHints implements RuntimeHintsRegistrar {

@Override
public void registerHints(RuntimeHints hints, ClassLoader classLoader) {
ReflectionHints reflectionHints = hints.reflection();
// TODO until the real fix in Paho library.
Stream.of("org.eclipse.paho.client.mqttv3.MqttAsyncClient", "org.eclipse.paho.mqttv5.client.MqttAsyncClient")
.filter((typeName) -> ClassUtils.isPresent(typeName, classLoader))
.map((typeName) -> loadClassByName(typeName, classLoader))
.flatMap((type) -> Stream.ofNullable(ReflectionUtils.findMethod(type, "stopReconnectCycle")))
.forEach(method -> reflectionHints.registerMethod(method, ExecutableMode.INVOKE));
}

private static Class<?> loadClassByName(String typeName, ClassLoader classLoader) {
try {
return ClassUtils.forName(typeName, classLoader);
}
catch (ClassNotFoundException ex) {
throw new IllegalArgumentException(ex);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/**
* Provides classes to support Spring AOT.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.integration.mqtt.aot;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.support.MqttUtils;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -149,6 +150,9 @@ public void stop() {
}
try {
client.disconnectForcibly(getDisconnectCompletionTimeout());
if (getConnectionInfo().isAutomaticReconnect()) {
MqttUtils.stopClientReconnectCycle(client);
}
}
catch (MqttException e) {
logger.error("Could not disconnect from the client", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.support.MqttUtils;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -151,6 +152,9 @@ public void stop() {

try {
client.disconnectForcibly(getDisconnectCompletionTimeout());
if (getConnectionInfo().isAutomaticReconnect()) {
MqttUtils.stopClientReconnectCycle(client);
}
}
catch (MqttException e) {
logger.error("Could not disconnect from the client", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ protected void doStop() {

try {
this.client.disconnectForcibly(getDisconnectCompletionTimeout());
if (getConnectionInfo().isAutomaticReconnect()) {
MqttUtils.stopClientReconnectCycle(this.client);
}
}
catch (MqttException ex) {
logger.error(ex, "Exception while disconnecting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.integration.mqtt.support.MqttUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
Expand Down Expand Up @@ -247,6 +248,9 @@ protected void doStop() {
}
if (getClientManager() == null) {
this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout());
if (getConnectionInfo().isAutomaticReconnect()) {
MqttUtils.stopClientReconnectCycle(this.mqttClient);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -176,6 +176,9 @@ protected void doStop() {
IMqttAsyncClient theClient = this.client;
if (theClient != null) {
theClient.disconnect().waitForCompletion(getDisconnectCompletionTimeout());
if (getConnectionInfo().isAutomaticReconnect()) {
MqttUtils.stopClientReconnectCycle(theClient);
}
theClient.close();
this.client = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent;
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.integration.mqtt.support.MqttUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
Expand Down Expand Up @@ -185,6 +186,9 @@ protected void doStop() {
try {
if (getClientManager() == null) {
this.mqttClient.disconnect().waitForCompletion(getDisconnectCompletionTimeout());
if (getConnectionInfo().isAutomaticReconnect()) {
MqttUtils.stopClientReconnectCycle(this.mqttClient);
}
}
}
catch (MqttException ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,9 +16,13 @@

package org.springframework.integration.mqtt.support;

import java.lang.reflect.Method;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.springframework.beans.BeanUtils;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;

/**
* MQTT Utilities.
Expand All @@ -30,6 +34,38 @@
*/
public final class MqttUtils {

private static final boolean PAHO_MQTTV3_PRESENT =
ClassUtils.isPresent("org.eclipse.paho.client.mqttv3.MqttAsyncClient", null);

private static final boolean PAHO_MQTTV5_PRESENT =
ClassUtils.isPresent("org.eclipse.paho.mqttv5.client.MqttAsyncClient", null);

private static final Method V3_STOP_RECONNECT_CYCLE_METHOD;

private static final Method V5_STOP_RECONNECT_CYCLE_METHOD;

static {
if (PAHO_MQTTV3_PRESENT) {
V3_STOP_RECONNECT_CYCLE_METHOD =
ReflectionUtils.findMethod(org.eclipse.paho.client.mqttv3.MqttAsyncClient.class,
"stopReconnectCycle");
ReflectionUtils.makeAccessible(V3_STOP_RECONNECT_CYCLE_METHOD);
}
else {
V3_STOP_RECONNECT_CYCLE_METHOD = null;
}

if (PAHO_MQTTV5_PRESENT) {
V5_STOP_RECONNECT_CYCLE_METHOD =
ReflectionUtils.findMethod(org.eclipse.paho.mqttv5.client.MqttAsyncClient.class,
"stopReconnectCycle");
ReflectionUtils.makeAccessible(V5_STOP_RECONNECT_CYCLE_METHOD);
}
else {
V5_STOP_RECONNECT_CYCLE_METHOD = null;
}
}

private MqttUtils() {
}

Expand All @@ -47,4 +83,26 @@ public static MqttConnectOptions cloneConnectOptions(MqttConnectOptions options)
return options2;
}

/**
* Perform a {@code stopReconnectCycle()} (via reflection) method on the provided client
* to clean up resources on client stop.
* TODO until the real fix in Paho library.
* @param client the MQTTv3 Paho client instance.
* @since 6.1.9
*/
public static void stopClientReconnectCycle(org.eclipse.paho.client.mqttv3.IMqttAsyncClient client) {
ReflectionUtils.invokeMethod(V3_STOP_RECONNECT_CYCLE_METHOD, client);
}

/**
* Perform a {@code stopReconnectCycle()} (via reflection) method on the provided client
* to clean up resources on client stop.
* TODO until the real fix in Paho library.
* @param client the MQTTv5 Paho client instance.
* @since 6.1.9
*/
public static void stopClientReconnectCycle(org.eclipse.paho.mqttv5.client.IMqttAsyncClient client) {
ReflectionUtils.invokeMethod(V5_STOP_RECONNECT_CYCLE_METHOD, client);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.aot.hint.RuntimeHintsRegistrar=org.springframework.integration.mqtt.aot.MqttRuntimeHints
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -488,8 +488,8 @@ public void testDifferentQos() throws Exception {
given(token.getGrantedQos()).willReturn(new int[] {2, 0});
willReturn(token).given(client).subscribe(any(String[].class), any(int[].class), any());

MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory,
"baz", "fix");
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://mqtt.host", "bar", factory, "baz", "fix");
AtomicReference<Method> method = new AtomicReference<>();
ReflectionUtils.doWithMethods(MqttPahoMessageDrivenChannelAdapter.class, m -> {
m.setAccessible(true);
Expand Down

0 comments on commit f11de7f

Please sign in to comment.