Skip to content

Commit ca96adb

Browse files
authored
Merge pull request #232 from bosch-io/bugfix/reconnecting-state-still-true-after-successful-reconnect
re-check reconnecting state before emitting
2 parents a193eb4 + 83b32b5 commit ca96adb

File tree

1 file changed

+50
-34
lines changed

1 file changed

+50
-34
lines changed

java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java

Lines changed: 50 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.Optional;
24-
import java.util.concurrent.Callable;
25-
import java.util.concurrent.CompletableFuture;
26-
import java.util.concurrent.CompletionException;
27-
import java.util.concurrent.CompletionStage;
28-
import java.util.concurrent.ConcurrentHashMap;
29-
import java.util.concurrent.CountDownLatch;
30-
import java.util.concurrent.ExecutionException;
31-
import java.util.concurrent.ExecutorService;
32-
import java.util.concurrent.Executors;
33-
import java.util.concurrent.ScheduledExecutorService;
34-
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.*;
3525
import java.util.concurrent.atomic.AtomicBoolean;
3626
import java.util.concurrent.atomic.AtomicReference;
3727
import java.util.function.Consumer;
@@ -83,6 +73,9 @@ public final class WebSocketMessagingProvider extends WebSocketAdapter implement
8373
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketMessagingProvider.class);
8474
private static final int CONNECTION_TIMEOUT_MS = 5000;
8575
private static final int RECONNECTION_TIMEOUT_SECONDS = 5;
76+
private static final long INITIAL_CHECK_DELAY = 0;
77+
private static final long RETRY_CHECK_PERIOD = 20;
78+
private static final int MIN_RECONNECTING_CHECK_TRIES = 4;
8679

8780
private final AdaptableBus adaptableBus;
8881
private final MessagingConfiguration messagingConfiguration;
@@ -104,21 +97,22 @@ public final class WebSocketMessagingProvider extends WebSocketAdapter implement
10497
private final AtomicBoolean manuallyPerformReconnect = new AtomicBoolean(false);
10598

10699
private Runnable channelCloser;
107-
@Nullable private Throwable lastReceivedDittoProtocolError = null;
100+
@Nullable
101+
private Throwable lastReceivedDittoProtocolError = null;
108102
private CountDownLatch lastReceivedDittoProtocolErrorLatch = new CountDownLatch(1);
109103

110104
/**
111105
* Constructs a new {@code WsMessagingProvider}.
112106
*
113-
* @param adaptableBus the bus to publish all messages to.
107+
* @param adaptableBus the bus to publish all messages to.
114108
* @param messagingConfiguration the specific configuration to apply.
115109
* @param authenticationProvider provider for the authentication method with which to open the websocket.
116-
* @param callbackExecutor the executor service to run callbacks with.
110+
* @param callbackExecutor the executor service to run callbacks with.
117111
*/
118112
private WebSocketMessagingProvider(final AdaptableBus adaptableBus,
119-
final MessagingConfiguration messagingConfiguration,
120-
final AuthenticationProvider<WebSocket> authenticationProvider,
121-
final ExecutorService callbackExecutor) {
113+
final MessagingConfiguration messagingConfiguration,
114+
final AuthenticationProvider<WebSocket> authenticationProvider,
115+
final ExecutorService callbackExecutor) {
122116
this.adaptableBus = adaptableBus;
123117
this.messagingConfiguration = messagingConfiguration;
124118
this.authenticationProvider = authenticationProvider;
@@ -129,7 +123,8 @@ private WebSocketMessagingProvider(final AdaptableBus adaptableBus,
129123
subscriptionMessages = new ConcurrentHashMap<>();
130124
webSocket = new AtomicReference<>();
131125

132-
channelCloser = () -> {};
126+
channelCloser = () -> {
127+
};
133128
disconnectionHandler = new DisconnectedContext.DisconnectionHandler() {
134129

135130
@Override
@@ -171,14 +166,14 @@ private static ScheduledExecutorService createConnectExecutor(final String sessi
171166
*
172167
* @param messagingConfiguration configuration of messaging.
173168
* @param authenticationProvider provides authentication.
174-
* @param defaultExecutor the executor for messages.
175-
* @param scheduledExecutor the scheduled executor for scheduling tasks.
169+
* @param defaultExecutor the executor for messages.
170+
* @param scheduledExecutor the scheduled executor for scheduling tasks.
176171
* @return the provider.
177172
*/
178173
public static WebSocketMessagingProvider newInstance(final MessagingConfiguration messagingConfiguration,
179-
final AuthenticationProvider<WebSocket> authenticationProvider,
180-
final ExecutorService defaultExecutor,
181-
final ScheduledExecutorService scheduledExecutor) {
174+
final AuthenticationProvider<WebSocket> authenticationProvider,
175+
final ExecutorService defaultExecutor,
176+
final ScheduledExecutorService scheduledExecutor) {
182177
checkNotNull(messagingConfiguration, "messagingConfiguration");
183178
checkNotNull(authenticationProvider, "authenticationProvider");
184179
checkNotNull(defaultExecutor, "defaultExecutor");
@@ -364,15 +359,37 @@ public void onConnected(final WebSocket websocket, final Map<String, List<String
364359
if (!subscriptionMessages.isEmpty()) {
365360
LOGGER.info("Client <{}>: Subscribing again for messages from backend after reconnection",
366361
sessionId);
367-
subscriptionMessages.values().forEach(this::emit);
362+
final CompletableFuture<Boolean> isReconnecting = new CompletableFuture<>();
363+
final Runnable checkTask = () -> {
364+
if (!reconnecting.get()) {
365+
isReconnecting.complete(true); // Complete the future if flag is true
366+
}
367+
};
368+
final ScheduledFuture<?> fixedRateChecker = connectExecutor.scheduleAtFixedRate(checkTask, INITIAL_CHECK_DELAY, RETRY_CHECK_PERIOD, TimeUnit.MILLISECONDS);
369+
try {
370+
if (Boolean.TRUE.equals(isReconnecting.get(RETRY_CHECK_PERIOD * MIN_RECONNECTING_CHECK_TRIES, TimeUnit.MILLISECONDS))) { // Ensures 4 retries of the scheduleAtFixedRate method.
371+
fixedRateChecker.cancel(true);
372+
LOGGER.debug("Reconnecting is completed -> emitting subscriptionMessages: {}", subscriptionMessages);
373+
subscriptionMessages.values().forEach(this::emit);
374+
}
375+
} catch (TimeoutException | ExecutionException e) {
376+
isReconnecting.complete(false);
377+
fixedRateChecker.cancel(true);
378+
LOGGER.error("Reconnecting failed: {}", e.getMessage());
379+
} catch (InterruptedException e) {
380+
isReconnecting.complete(false);
381+
fixedRateChecker.cancel(true);
382+
LOGGER.error("Reconnecting failed due to thread being interrupted: {}", e.getMessage());
383+
Thread.currentThread().interrupt();
384+
}
368385
}
369386
});
370387
}
371388

372389
@Override
373390
public void onDisconnected(final WebSocket websocket, final WebSocketFrame serverCloseFrame,
374-
final WebSocketFrame clientCloseFrame,
375-
final boolean closedByServer) {
391+
final WebSocketFrame clientCloseFrame,
392+
final boolean closedByServer) {
376393

377394
callbackExecutor.execute(() -> {
378395
if (closedByServer) {
@@ -390,8 +407,7 @@ public void onDisconnected(final WebSocket websocket, final WebSocketFrame serve
390407
sessionId, messagingConfiguration.getEndpointUri());
391408
awaitLastReceivedDittoProtocolError();
392409
handleReconnectionIfEnabled(DisconnectedContext.Source.CLIENT, lastReceivedDittoProtocolError);
393-
}
394-
else {
410+
} else {
395411
// only when close() was called we should end here
396412
LOGGER.info("Client <{}>: WebSocket connection to endpoint <{}> was closed by user",
397413
sessionId, messagingConfiguration.getEndpointUri());
@@ -427,15 +443,15 @@ public void onError(final WebSocket websocket, final WebSocketException cause) {
427443
}
428444

429445
private CompletionStage<WebSocket> connectWithPotentialRetries(final String actionName,
430-
final Supplier<WebSocket> webSocket,
431-
final CompletableFuture<WebSocket> future,
432-
final boolean retry) {
446+
final Supplier<WebSocket> webSocket,
447+
final CompletableFuture<WebSocket> future,
448+
final boolean retry) {
433449

434450
try {
435451
final Predicate<Throwable> isRecoverable =
436452
retry ? WebSocketMessagingProvider::isRecoverable : exception -> false;
437453
return Retry.retryTo(actionName,
438-
() -> initiateConnection(webSocket.get()))
454+
() -> initiateConnection(webSocket.get()))
439455
.inClientSession(sessionId)
440456
.withExecutors(connectExecutor, callbackExecutor)
441457
.notifyOnError(messagingConfiguration.getConnectionErrorHandler().orElse(null))
@@ -448,7 +464,7 @@ private CompletionStage<WebSocket> connectWithPotentialRetries(final String acti
448464
}
449465

450466
private void handleReconnectionIfEnabled(final DisconnectedContext.Source disconnectionSource,
451-
@Nullable final Throwable throwableSupplier) {
467+
@Nullable final Throwable throwableSupplier) {
452468

453469
final Optional<Consumer<DisconnectedContext>> disconnectedListener =
454470
messagingConfiguration.getDisconnectedListener();
@@ -492,7 +508,7 @@ private void doReconnect() {
492508

493509
private void reconnectWithRetries() {
494510
this.connectWithPotentialRetries("reconnect WebSocket", this::recreateWebSocket, new CompletableFuture<>(),
495-
messagingConfiguration.isReconnectEnabled() || manuallyPerformReconnect.get())
511+
messagingConfiguration.isReconnectEnabled() || manuallyPerformReconnect.get())
496512
.thenAccept(reconnectedWebSocket -> {
497513
setWebSocket(reconnectedWebSocket);
498514
reconnecting.set(false);

0 commit comments

Comments
 (0)