Skip to content

Commit

Permalink
Terminate request queue on connection error first before terminating …
Browse files Browse the repository at this point in the history
…response handlers.

Terminating the request queue will ensure that no new requests will be accepted and thus that we could miss a response handler to be terminated.

[#245]

Signed-off-by: Mark Paluch <[email protected]>
  • Loading branch information
mp911de committed Nov 20, 2023
1 parent 9e802b7 commit 3449c84
Showing 1 changed file with 5 additions and 14 deletions.
19 changes: 5 additions & 14 deletions src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,7 @@
import io.r2dbc.mssql.message.header.PacketIdProvider;
import io.r2dbc.mssql.message.tds.ProtocolException;
import io.r2dbc.mssql.message.tds.Redirect;
import io.r2dbc.mssql.message.token.AbstractDoneToken;
import io.r2dbc.mssql.message.token.AbstractInfoToken;
import io.r2dbc.mssql.message.token.Attention;
import io.r2dbc.mssql.message.token.EnvChangeToken;
import io.r2dbc.mssql.message.token.FeatureExtAckToken;
import io.r2dbc.mssql.message.token.LoginAckToken;
import io.r2dbc.mssql.message.token.*;
import io.r2dbc.mssql.message.type.Collation;
import io.r2dbc.mssql.util.Assert;
import io.r2dbc.spi.R2dbcException;
Expand All @@ -50,11 +45,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.core.publisher.*;
import reactor.netty.Connection;
import reactor.netty.NettyOutbound;
import reactor.netty.resources.ConnectionProvider;
Expand Down Expand Up @@ -367,7 +358,8 @@ private Object encodeForSend(ClientMessage message) {
@SuppressWarnings("unchecked")
private <T> Mono<T> resumeError(Throwable throwable) {

handleConnectionError(throwable);
logger.error(this.context.getMessage("Error: {}"), throwable.getMessage(), throwable);

this.requestSink.emitComplete((signalType, emitResult) -> {

if (emitResult.isFailure()) {
Expand All @@ -377,8 +369,7 @@ private <T> Mono<T> resumeError(Throwable throwable) {
return false;
});

logger.error(this.context.getMessage("Error: {}"), throwable.getMessage(), throwable);

handleConnectionError(throwable);
return (Mono<T>) close();
}

Expand Down

0 comments on commit 3449c84

Please sign in to comment.