From b4cbc7448aeb3ab148e70e2b9dfaa78915a9b4e6 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Fri, 31 Jan 2025 16:39:11 +0200 Subject: [PATCH] #2108 * add possibility to configure whether server address should be resolved by ditto in mqtt connection * fix retry timeout strategy unable increase retry count and apply effective backoff Signed-off-by: Vasil Vasilev --- .../service/config/DefaultMqttConfig.java | 7 ++++ .../service/config/MqttConfig.java | 7 ++++ .../mqtt/hivemq/MqttClientActor.java | 12 ++---- .../hivemq/client/HiveMqttClientFactory.java | 8 ++-- .../src/main/resources/connectivity.conf | 6 +++ .../client/HiveMqttClientFactoryTest.java | 38 +++++++++++++++++++ 6 files changed, 67 insertions(+), 11 deletions(-) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/DefaultMqttConfig.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/DefaultMqttConfig.java index dbc96efd78..2ff38806d5 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/DefaultMqttConfig.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/DefaultMqttConfig.java @@ -42,6 +42,7 @@ final class DefaultMqttConfig implements MqttConfig { private final int maxQueueSize; private final int eventLoopThreads; private final boolean cleanSession; + private final boolean shouldResolveServerAddress; private final boolean reconnectForRedelivery; private final Duration reconnectForRedeliveryDelay; private final SessionExpiryInterval sessionExpiryInterval; @@ -54,6 +55,7 @@ final class DefaultMqttConfig implements MqttConfig { private DefaultMqttConfig(final ScopedConfig config) { eventLoopThreads = config.getNonNegativeIntOrThrow(MqttConfigValue.EVENT_LOOP_THREADS); cleanSession = config.getBoolean(MqttConfigValue.CLEAN_SESSION.getConfigPath()); + shouldResolveServerAddress = config.getBoolean(MqttConfigValue.SHOULD_RESOLVE_SERVER_ADDRESS.getConfigPath()); reconnectForRedelivery = config.getBoolean(MqttConfigValue.RECONNECT_FOR_REDELIVERY.getConfigPath()); reconnectForRedeliveryDelay = config.getNonNegativeDurationOrThrow(MqttConfigValue.RECONNECT_FOR_REDELIVERY_DELAY); @@ -108,6 +110,11 @@ public boolean isCleanSession() { return cleanSession; } + @Override + public boolean shouldResolveServerAddress() { + return shouldResolveServerAddress; + } + @Override public boolean shouldReconnectForRedelivery() { return reconnectForRedelivery; diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/MqttConfig.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/MqttConfig.java index e4c0e9d710..46c75d3683 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/MqttConfig.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/MqttConfig.java @@ -44,6 +44,8 @@ public interface MqttConfig { */ boolean isCleanSession(); + boolean shouldResolveServerAddress(); + /** * Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement. * @@ -136,6 +138,11 @@ enum MqttConfigValue implements KnownConfigValue { */ CLEAN_SESSION("clean-session", false), + /** + * Indicates whether the provided connection uri should be resolved in-demand by ditto or on-demand. + */ + SHOULD_RESOLVE_SERVER_ADDRESS("should-resolve-server-address", true), + /** * Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement. */ diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActor.java index c12de8fcd6..83e08c9467 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActor.java @@ -74,9 +74,8 @@ import com.hivemq.client.mqtt.lifecycle.MqttDisconnectSource; import com.typesafe.config.Config; -import scala.concurrent.ExecutionContextExecutor; - import io.reactivex.disposables.Disposable; +import scala.concurrent.ExecutionContextExecutor; /** * Actor for handling connection to an MQTT broker for protocol versions 3 or 5. @@ -89,6 +88,7 @@ public final class MqttClientActor extends BaseClientActor { private final GenericMqttClientFactory genericMqttClientFactory; @Nullable private GenericMqttClient genericMqttClient; private final AtomicBoolean automaticReconnect; + private final RetryTimeoutStrategy retryTimeoutStrategy; @Nullable private ActorRef publishingActorRef; private final List mqttConsumerActorRefs; @Nullable private Disposable unsolicitedPublishesAutoAckSubscription; @@ -105,6 +105,8 @@ private MqttClientActor(final Connection connection, final var connectivityConfig = connectivityConfig(); final var connectionConfig = connectivityConfig.getConnectionConfig(); mqttConfig = connectionConfig.getMqttConfig(); + retryTimeoutStrategy = RetryTimeoutStrategy.newDuplicationRetryTimeoutStrategy( + mqttConfig.getReconnectBackOffConfig().getTimeoutConfig()); mqttSpecificConfig = MqttSpecificConfig.fromConnection(connection, mqttConfig); @@ -312,7 +314,6 @@ private static String getClientId(final ClientRole clientRole, private GenericMqttClientDisconnectedListener getClientDisconnectedListener() { return (context, clientRole) -> { final var mqttClientReconnector = context.getReconnector(); - final var retryTimeoutStrategy = getRetryTimeoutStrategy(); if (0 == mqttClientReconnector.getAttempts()) { retryTimeoutStrategy.reset(); @@ -363,11 +364,6 @@ private GenericMqttClientDisconnectedListener getClientDisconnectedListener() { }; } - private RetryTimeoutStrategy getRetryTimeoutStrategy() { - final var reconnectBackOffConfig = mqttConfig.getReconnectBackOffConfig(); - return RetryTimeoutStrategy.newDuplicationRetryTimeoutStrategy(reconnectBackOffConfig.getTimeoutConfig()); - } - private static boolean isMqttClientInConnectingState(final MqttClientConfig mqttClientConfig) { return MqttClientState.CONNECTING == mqttClientConfig.getState(); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactory.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactory.java index 54e8624be9..980bc2188b 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactory.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactory.java @@ -115,7 +115,8 @@ private static MqttClientBuilder getGenericMqttClientBuilder( final var mqttConfig = hiveMqttClientProperties.getMqttConfig(); return MqttClient.builder() - .serverAddress(getInetSocketAddress(getConnectionUri(hiveMqttClientProperties))) + .serverAddress(getInetSocketAddress(getConnectionUri(hiveMqttClientProperties), + hiveMqttClientProperties.getMqttConfig().shouldResolveServerAddress())) .executorConfig(getMqttClientExecutorConfig(mqttConfig.getEventLoopThreads())) .sslConfig(getMqttClientSslConfig(hiveMqttClientProperties).orElse(null)) .addConnectedListener(getConnectedListener( @@ -134,8 +135,9 @@ private static URI getConnectionUri(final HiveMqttClientProperties hiveMqttClien return sshTunnelState.getURI(hiveMqttClientProperties.getMqttConnection()); } - private static InetSocketAddress getInetSocketAddress(final URI connectionUri) { - return new InetSocketAddress(connectionUri.getHost(), connectionUri.getPort()); + private static InetSocketAddress getInetSocketAddress(final URI connectionUri, final boolean shouldResolveServerAddress) { + return shouldResolveServerAddress ? new InetSocketAddress(connectionUri.getHost(), connectionUri.getPort()) : + InetSocketAddress.createUnresolved(connectionUri.getHost(), connectionUri.getPort()); } private static MqttClientExecutorConfig getMqttClientExecutorConfig(final int eventLoopThreadNumber) { diff --git a/connectivity/service/src/main/resources/connectivity.conf b/connectivity/service/src/main/resources/connectivity.conf index ee3434b22a..753844afc9 100644 --- a/connectivity/service/src/main/resources/connectivity.conf +++ b/connectivity/service/src/main/resources/connectivity.conf @@ -438,6 +438,12 @@ ditto { clean-session = false clean-session = ${?CONNECTIVITY_MQTT_CLEAN_SESSION} + # Indicates whether the provided connection uri should be resolved in-demand by ditto or on-demand. + # When true, the address will be resolved before passing it to the used mqtt client. + # When false, the address will be unresolved and will rely on the used mqtt client to resolve it when needed. + should-resolve-server-address = true + should-resolve-server-address = ${?CONNECTIVITY_MQTT_SHOULD_RESOLVE_SERVER_ADDRESS} + # Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement. reconnect-for-redelivery = false reconnect-for-redelivery = ${?CONNECTIVITY_MQTT_RECONNECT_FOR_REDELIVERY} diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactoryTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactoryTest.java index ef58d32673..3c13a6edfe 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactoryTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactoryTest.java @@ -83,6 +83,7 @@ public void before() { Mockito.when(mqttConnection.getPassword()).thenReturn(Optional.of(PASSWORD)); Mockito.when(mqttConfig.getEventLoopThreads()).thenReturn(EVENT_LOOP_THREAD_NUMBER); + Mockito.when(mqttConfig.shouldResolveServerAddress()).thenReturn(Boolean.TRUE); final var connectionConfig = Mockito.mock(ConnectionConfig.class); Mockito.when(connectionConfig.getMqttConfig()).thenReturn(mqttConfig); @@ -396,4 +397,41 @@ public void getMqtt5ClientWithoutLastWillWithSslReturnsExpected() throws NoMqttC }); } + @Test + public void getMqttClientWithShouldResolveServerAddressFalseAddressShouldBeUnresolved() + throws NoMqttConnectionException { + Mockito.when(mqttConfig.shouldResolveServerAddress()).thenReturn(Boolean.FALSE); + final var hiveMqttClientProperties = HiveMqttClientProperties.builder() + .withMqttConnection(mqttConnection) + .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) + .withSshTunnelStateSupplier(sshTunnelStateSupplier) + .withConnectionLogger(connectionLogger) + .withActorUuid(ACTOR_UUID) + .withClientConnectedListener(mqttClientConnectedListener) + .withClientDisconnectedListener(mqttClientDisconnectedListener) + .build(); + + final var mqtt3ClientUnderTest = HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties, + MQTT_CLIENT_IDENTIFIER, + ClientRole.CONSUMER_PUBLISHER); + + final var mqtt3ClientConfig = mqtt3ClientUnderTest.getConfig(); + softly.assertThat(mqtt3ClientConfig.getTransportConfig()) + .as("transport config") + .satisfies(transportConfig -> { + softly.assertThat(transportConfig.getServerAddress().isUnresolved()).isTrue(); + }); + + final var mqtt5ClientUnderTest = HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties, + MQTT_CLIENT_IDENTIFIER, + ClientRole.CONSUMER_PUBLISHER); + + final var mqtt5ClientConfig = mqtt5ClientUnderTest.getConfig(); + softly.assertThat(mqtt5ClientConfig.getTransportConfig()) + .as("transport config") + .satisfies(transportConfig -> { + softly.assertThat(transportConfig.getServerAddress().isUnresolved()).isTrue(); + }); + } } \ No newline at end of file