Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: make sure FIFO order between write and notify channel active #2597

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 30 additions & 34 deletions src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
}

if (autoFlushCommands) {

if (isConnected()) {
writeToChannelAndFlush(command);
Channel channel = this.channel;
if (isConnected(channel)) {
writeToChannelAndFlush(channel, command);
} else {
writeToDisconnectedBuffer(command);
}
Expand Down Expand Up @@ -236,9 +236,9 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
}

if (autoFlushCommands) {

if (isConnected()) {
writeToChannelAndFlush(commands);
Channel channel = this.channel;
if (isConnected(channel)) {
writeToChannelAndFlush(channel, commands);
} else {
writeToDisconnectedBuffer(commands);
}
Expand Down Expand Up @@ -288,10 +288,9 @@ private RedisException validateWrite(int commands) {
return new RedisException("Connection is closed");
}

final boolean connected = isConnected(this.channel);
if (usesBoundedQueues()) {

boolean connected = isConnected();

if (QUEUE_SIZE.get(this) + commands > clientOptions.getRequestQueueSize()) {
return new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
Expand All @@ -308,7 +307,7 @@ private RedisException validateWrite(int commands) {
}
}

if (!isConnected() && rejectCommandsWhileDisconnected) {
if (!connected && rejectCommandsWhileDisconnected) {
return new RedisException("Currently not connected. Commands are rejected.");
}

Expand Down Expand Up @@ -370,11 +369,11 @@ private void writeToDisconnectedBuffer(RedisCommand<?, ?, ?> command) {
commandBuffer.add(command);
}

private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
private void writeToChannelAndFlush(Channel channel, RedisCommand<?, ?, ?> command) {

QUEUE_SIZE.incrementAndGet(this);

ChannelFuture channelFuture = channelWriteAndFlush(command);
ChannelFuture channelFuture = channelWriteAndFlush(channel, command);

if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
Expand All @@ -387,30 +386,30 @@ private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
}
}

private void writeToChannelAndFlush(Collection<? extends RedisCommand<?, ?, ?>> commands) {
private void writeToChannelAndFlush(Channel channel, Collection<? extends RedisCommand<?, ?, ?>> commands) {

QUEUE_SIZE.addAndGet(this, commands.size());

if (reliability == Reliability.AT_MOST_ONCE) {

// cancel on exceptions and remove from queue, because there is no housekeeping
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(command).addListener(AtMostOnceWriteListener.newInstance(this, command));
channelWrite(channel, command).addListener(AtMostOnceWriteListener.newInstance(this, command));
}
}

if (reliability == Reliability.AT_LEAST_ONCE) {

// commands are ok to stay within the queue, reconnect will retrigger them
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(command).addListener(RetryListener.newInstance(this, command));
channelWrite(channel, command).addListener(RetryListener.newInstance(this, command));
}
}

channelFlush();
channelFlush(channel);
}

private void channelFlush() {
private void channelFlush(Channel channel) {

if (debugEnabled) {
logger.debug("{} write() channelFlush", logPrefix());
Expand All @@ -419,7 +418,7 @@ private void channelFlush() {
channel.flush();
}

private ChannelFuture channelWrite(RedisCommand<?, ?, ?> command) {
private ChannelFuture channelWrite(Channel channel, RedisCommand<?, ?, ?> command) {

if (debugEnabled) {
logger.debug("{} write() channelWrite command {}", logPrefix(), command);
Expand All @@ -428,7 +427,7 @@ private ChannelFuture channelWrite(RedisCommand<?, ?, ?> command) {
return channel.write(command);
}

private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {
private ChannelFuture channelWriteAndFlush(Channel channel, RedisCommand<?, ?, ?> command) {

if (debugEnabled) {
logger.debug("{} write() writeAndFlush command {}", logPrefix(), command);
Expand All @@ -441,7 +440,6 @@ private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {
public void notifyChannelActive(Channel channel) {

this.logPrefix = null;
this.channel = channel;
this.connectionError = null;

if (isClosed()) {
Expand All @@ -456,6 +454,7 @@ public void notifyChannelActive(Channel channel) {
}

sharedLock.doExclusive(() -> {
this.channel = channel;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you roll-back all the other changes and leave out only this one then your test case still succeeds.

Can you elaborate on why you need them too?


try {
// Move queued commands to buffer before issuing any commands because of connection activation.
Expand All @@ -478,7 +477,7 @@ public void notifyChannelActive(Channel channel) {
inActivation = false;
}

flushCommands(disconnectedBuffer);
flushCommands(channel, disconnectedBuffer);
} catch (Exception e) {

if (debugEnabled) {
Expand Down Expand Up @@ -525,7 +524,7 @@ public void notifyException(Throwable t) {
doExclusive(this::drainCommands).forEach(cmd -> cmd.completeExceptionally(t));
}

if (!isConnected()) {
if (!isConnected(this.channel)) {
connectionError = t;
}
}
Expand All @@ -538,16 +537,16 @@ public void registerConnectionWatchdog(ConnectionWatchdog connectionWatchdog) {
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public void flushCommands() {
flushCommands(commandBuffer);
flushCommands(this.channel, commandBuffer);
}

private void flushCommands(Queue<RedisCommand<?, ?, ?>> queue) {
private void flushCommands(Channel channel, Queue<RedisCommand<?, ?, ?>> queue) {

if (debugEnabled) {
logger.debug("{} flushCommands()", logPrefix());
}

if (isConnected()) {
if (isConnected(channel)) {

List<RedisCommand<?, ?, ?>> commands = sharedLock.doExclusive(() -> {

Expand All @@ -563,7 +562,7 @@ private void flushCommands(Queue<RedisCommand<?, ?, ?>> queue) {
}

if (!commands.isEmpty()) {
writeToChannelAndFlush(commands);
writeToChannelAndFlush(channel, commands);
}
}
}
Expand Down Expand Up @@ -626,10 +625,10 @@ public void disconnect() {

private Channel getOpenChannel() {

Channel currentChannel = this.channel;
Channel channel = this.channel;

if (currentChannel != null) {
return currentChannel;
if (channel != null /* && channel.isOpen() is this deliberately omitted? */) {
return channel;
}

return null;
Expand All @@ -646,6 +645,7 @@ public void reset() {
logger.debug("{} reset()", logPrefix());
}

Channel channel = this.channel;
if (channel != null) {
channel.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset());
}
Expand Down Expand Up @@ -718,9 +718,7 @@ public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) {
}
}

if (isConnected()) {
flushCommands(disconnectedBuffer);
}
flushCommands(this.channel, disconnectedBuffer);
});
}

Expand Down Expand Up @@ -802,9 +800,7 @@ private void cancelCommands(String message, Iterable<? extends RedisCommand<?, ?
}
}

private boolean isConnected() {

Channel channel = this.channel;
private boolean isConnected(Channel channel) {
return channel != null && channel.isActive();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,34 @@ void before() {
sut.setConnectionFacade(connectionFacade);
}

@Test
void writeShouldGuaranteeFIFOOrder() {
sut.write(Collections.singletonList(new Command<>(CommandType.SELECT, new StatusOutput<>(StringCodec.UTF8))));

sut.registerConnectionWatchdog(connectionWatchdog);
doAnswer(i -> sut.write(new Command<>(CommandType.AUTH, new StatusOutput<>(StringCodec.UTF8)))).when(connectionWatchdog)
.arm();
when(channel.isActive()).thenReturn(true);

sut.notifyChannelActive(channel);

DefaultChannelPromise promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);

when(channel.writeAndFlush(any())).thenAnswer(invocation -> {
if (invocation.getArguments()[0] instanceof RedisCommand) {
queue.add((RedisCommand) invocation.getArguments()[0]);
}

if (invocation.getArguments()[0] instanceof Collection) {
queue.addAll((Collection) invocation.getArguments()[0]);
}
return promise;
});

assertThat(queue).hasSize(2).first().hasFieldOrPropertyWithValue("type", CommandType.SELECT);
assertThat(queue).hasSize(2).last().hasFieldOrPropertyWithValue("type", CommandType.AUTH);
}

@Test
void writeConnectedShouldWriteCommandToChannel() {

Expand Down
Loading