Skip to content

Commit

Permalink
Send script load command to all slots in cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Kubovic <[email protected]>
  • Loading branch information
MichaelKubovic committed Nov 20, 2023
1 parent 6832aac commit 0244b96
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.vertx.redis.client.Command.ASKING;
import static io.vertx.redis.client.Command.AUTH;
import static io.vertx.redis.client.Command.SCRIPT;
import static io.vertx.redis.client.Request.cmd;

public class RedisClusterConnection implements RedisConnection {
Expand Down Expand Up @@ -138,23 +140,12 @@ public Future<Response> send(Request request) {
case 0:
// can run anywhere
if (REDUCERS.containsKey(cmd)) {
final List<Future<Response>> responses = new ArrayList<>(slots.size());

for (int i = 0; i < slots.size(); i++) {
String[] endpoints = slots.endpointsForSlot(i);

final Promise<Response> p = vertx.promise();
send(selectMasterOrReplicaEndpoint(cmd.isReadOnly(args), endpoints, forceMasterEndpoint), RETRIES, req, p);
responses.add(p.future());
}

Future.all(responses).onComplete(composite -> {
if (composite.failed()) {
// means if one of the operations failed, then we can fail the handler
promise.fail(composite.cause());
} else {
promise.complete(REDUCERS.get(cmd).apply(composite.result().list()));
}
sendToAllSlots(promise, req, cmd, args, forceMasterEndpoint, REDUCERS.get(cmd));
} else if(cmd.equals(SCRIPT) && Arrays.equals(args.get(0), "LOAD".getBytes())) {
sendToAllSlots(promise, req, cmd, args, forceMasterEndpoint, responses -> {
// all nodes should compute the same sha
assert responses.stream().map(Response::toString).collect(Collectors.toSet()).size() == 1;
return responses.get(0);
});
} else {
// it doesn't matter which node to use
Expand Down Expand Up @@ -213,6 +204,27 @@ public Future<Response> send(Request request) {
}
}

private void sendToAllSlots(Promise<Response> promise, RequestImpl req, CommandImpl cmd, List<byte[]> args, boolean forceMasterEndpoint, Function<List<Response>, Response> reducer) {
final List<Future<Response>> responses = new ArrayList<>(slots.size());

for (int i = 0; i < slots.size(); i++) {
String[] endpoints = slots.endpointsForSlot(i);

final Promise<Response> p = vertx.promise();
send(selectMasterOrReplicaEndpoint(cmd.isReadOnly(args), endpoints, forceMasterEndpoint), RETRIES, req, p);
responses.add(p.future());
}

Future.all(responses).onComplete(composite -> {
if (composite.failed()) {
// means if one of the operations failed, then we can fail the handler
promise.fail(composite.cause());
} else {
promise.complete(reducer.apply(composite.result().list()));
}
});
}

private Map<Integer, Request> splitRequest(CommandImpl cmd, List<byte[]> args) {
// we will split the request across the slots
final Map<Integer, Request> map = new IdentityHashMap<>();
Expand Down
54 changes: 54 additions & 0 deletions src/test/java/io/vertx/redis/client/test/RedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1223,4 +1223,58 @@ public void preservesContext(TestContext should) {
PreservesContext.connectThenSend(client, should);
PreservesContext.connectThenBatch(client, should);
}

/**
* This test runs EVALSHA with two keys hashed to different slots to
* verify that SCRIPT LOAD was run on both nodes
*/
@Test
public void testScriptLoadRunsOnEveryMaster(TestContext should) {
final Async test = should.async();

final String key1 = "{hash_tag}.some-key1";
final String argv1 = "some-value1";

final String key2 = "{other_hash_tag}.other-key1";
final String argv2 = "other-value1";

String script = "return redis.call('GET', KEYS[1])";

client.connect().compose(cluster -> {
cluster.exceptionHandler(should::fail);
Future<@Nullable Response> setFuture1 = cluster.send(cmd(SET).arg(key1).arg(argv1));
Future<@Nullable Response> setFuture2 = cluster.send(cmd(SET).arg(key2).arg(argv2));
Future<@Nullable Response> scriptLoadFuture = cluster.send(cmd(SCRIPT).arg("LOAD").arg(script));

return Future.all(setFuture1, setFuture2, scriptLoadFuture)
.compose(compositeRet -> {
System.out.println("set and script load successful");

return scriptLoadFuture;
})
.compose(scriptLoadResponse -> {
String scriptSha = scriptLoadResponse.toString();

System.out.println("script load successful, sha: "+scriptSha);

return Future.all(
cluster.send(cmd(EVALSHA).arg(scriptSha).arg(1).arg(key1)),
cluster.send(cmd(EVALSHA).arg(scriptSha).arg(1).arg(key2))
);
})
.compose(compositeEval -> {
System.out.println("eval operations successful");
Set<String> evalRet = compositeEval.list().stream().map(Object::toString)
.collect(Collectors.toSet());
should.assertTrue(evalRet.contains(argv1));
should.assertTrue(evalRet.contains(argv2));

test.complete();
return Future.succeededFuture();
});
}).onFailure(throwable -> {
throwable.printStackTrace();
should.fail(throwable);
});
}
}

0 comments on commit 0244b96

Please sign in to comment.