diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java index 07fbacf77a..502ce4a767 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java @@ -195,9 +195,9 @@ public RedisCommand write(RedisCommand command) { } if (autoFlushCommands) { - - if (isConnected()) { - writeToChannelAndFlush(command); + Channel channel = this.channel; + if (isConnected(channel)) { + writeToChannelAndFlush(channel, command); } else { writeToDisconnectedBuffer(command); } @@ -236,9 +236,9 @@ public RedisCommand write(RedisCommand command) { } if (autoFlushCommands) { - - if (isConnected()) { - writeToChannelAndFlush(commands); + Channel channel = this.channel; + if (isConnected(channel)) { + writeToChannelAndFlush(channel, commands); } else { writeToDisconnectedBuffer(commands); } @@ -288,10 +288,9 @@ private RedisException validateWrite(int commands) { return new RedisException("Connection is closed"); } + final boolean connected = isConnected(this.channel); if (usesBoundedQueues()) { - boolean connected = isConnected(); - if (QUEUE_SIZE.get(this) + commands > clientOptions.getRequestQueueSize()) { return new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops."); @@ -308,7 +307,7 @@ private RedisException validateWrite(int commands) { } } - if (!isConnected() && rejectCommandsWhileDisconnected) { + if (!connected && rejectCommandsWhileDisconnected) { return new RedisException("Currently not connected. Commands are rejected."); } @@ -370,11 +369,11 @@ private void writeToDisconnectedBuffer(RedisCommand command) { commandBuffer.add(command); } - private void writeToChannelAndFlush(RedisCommand command) { + private void writeToChannelAndFlush(Channel channel, RedisCommand command) { QUEUE_SIZE.incrementAndGet(this); - ChannelFuture channelFuture = channelWriteAndFlush(command); + ChannelFuture channelFuture = channelWriteAndFlush(channel, command); if (reliability == Reliability.AT_MOST_ONCE) { // cancel on exceptions and remove from queue, because there is no housekeeping @@ -387,7 +386,7 @@ private void writeToChannelAndFlush(RedisCommand command) { } } - private void writeToChannelAndFlush(Collection> commands) { + private void writeToChannelAndFlush(Channel channel, Collection> commands) { QUEUE_SIZE.addAndGet(this, commands.size()); @@ -395,7 +394,7 @@ private void writeToChannelAndFlush(Collection> // cancel on exceptions and remove from queue, because there is no housekeeping for (RedisCommand command : commands) { - channelWrite(command).addListener(AtMostOnceWriteListener.newInstance(this, command)); + channelWrite(channel, command).addListener(AtMostOnceWriteListener.newInstance(this, command)); } } @@ -403,14 +402,14 @@ private void writeToChannelAndFlush(Collection> // commands are ok to stay within the queue, reconnect will retrigger them for (RedisCommand command : commands) { - channelWrite(command).addListener(RetryListener.newInstance(this, command)); + channelWrite(channel, command).addListener(RetryListener.newInstance(this, command)); } } - channelFlush(); + channelFlush(channel); } - private void channelFlush() { + private void channelFlush(Channel channel) { if (debugEnabled) { logger.debug("{} write() channelFlush", logPrefix()); @@ -419,7 +418,7 @@ private void channelFlush() { channel.flush(); } - private ChannelFuture channelWrite(RedisCommand command) { + private ChannelFuture channelWrite(Channel channel, RedisCommand command) { if (debugEnabled) { logger.debug("{} write() channelWrite command {}", logPrefix(), command); @@ -428,7 +427,7 @@ private ChannelFuture channelWrite(RedisCommand command) { return channel.write(command); } - private ChannelFuture channelWriteAndFlush(RedisCommand command) { + private ChannelFuture channelWriteAndFlush(Channel channel, RedisCommand command) { if (debugEnabled) { logger.debug("{} write() writeAndFlush command {}", logPrefix(), command); @@ -441,7 +440,6 @@ private ChannelFuture channelWriteAndFlush(RedisCommand command) { public void notifyChannelActive(Channel channel) { this.logPrefix = null; - this.channel = channel; this.connectionError = null; if (isClosed()) { @@ -456,6 +454,7 @@ public void notifyChannelActive(Channel channel) { } sharedLock.doExclusive(() -> { + this.channel = channel; try { // Move queued commands to buffer before issuing any commands because of connection activation. @@ -478,7 +477,7 @@ public void notifyChannelActive(Channel channel) { inActivation = false; } - flushCommands(disconnectedBuffer); + flushCommands(channel, disconnectedBuffer); } catch (Exception e) { if (debugEnabled) { @@ -525,7 +524,7 @@ public void notifyException(Throwable t) { doExclusive(this::drainCommands).forEach(cmd -> cmd.completeExceptionally(t)); } - if (!isConnected()) { + if (!isConnected(this.channel)) { connectionError = t; } } @@ -538,16 +537,16 @@ public void registerConnectionWatchdog(ConnectionWatchdog connectionWatchdog) { @Override @SuppressWarnings({ "rawtypes", "unchecked" }) public void flushCommands() { - flushCommands(commandBuffer); + flushCommands(this.channel, commandBuffer); } - private void flushCommands(Queue> queue) { + private void flushCommands(Channel channel, Queue> queue) { if (debugEnabled) { logger.debug("{} flushCommands()", logPrefix()); } - if (isConnected()) { + if (isConnected(channel)) { List> commands = sharedLock.doExclusive(() -> { @@ -563,7 +562,7 @@ private void flushCommands(Queue> queue) { } if (!commands.isEmpty()) { - writeToChannelAndFlush(commands); + writeToChannelAndFlush(channel, commands); } } } @@ -626,10 +625,10 @@ public void disconnect() { private Channel getOpenChannel() { - Channel currentChannel = this.channel; + Channel channel = this.channel; - if (currentChannel != null) { - return currentChannel; + if (channel != null /* && channel.isOpen() is this deliberately omitted? */) { + return channel; } return null; @@ -646,6 +645,7 @@ public void reset() { logger.debug("{} reset()", logPrefix()); } + Channel channel = this.channel; if (channel != null) { channel.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset()); } @@ -718,9 +718,7 @@ public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) { } } - if (isConnected()) { - flushCommands(disconnectedBuffer); - } + flushCommands(this.channel, disconnectedBuffer); }); } @@ -802,9 +800,7 @@ private void cancelCommands(String message, Iterable(CommandType.SELECT, new StatusOutput<>(StringCodec.UTF8)))); + + sut.registerConnectionWatchdog(connectionWatchdog); + doAnswer(i -> sut.write(new Command<>(CommandType.AUTH, new StatusOutput<>(StringCodec.UTF8)))).when(connectionWatchdog) + .arm(); + when(channel.isActive()).thenReturn(true); + + sut.notifyChannelActive(channel); + + DefaultChannelPromise promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); + + when(channel.writeAndFlush(any())).thenAnswer(invocation -> { + if (invocation.getArguments()[0] instanceof RedisCommand) { + queue.add((RedisCommand) invocation.getArguments()[0]); + } + + if (invocation.getArguments()[0] instanceof Collection) { + queue.addAll((Collection) invocation.getArguments()[0]); + } + return promise; + }); + + assertThat(queue).hasSize(2).first().hasFieldOrPropertyWithValue("type", CommandType.SELECT); + assertThat(queue).hasSize(2).last().hasFieldOrPropertyWithValue("type", CommandType.AUTH); + } + @Test void writeConnectedShouldWriteCommandToChannel() {