Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load script on all master nodes in cluster #418

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public static void addMasterOnlyCommand(Command command) {
}).sum()));

addMasterOnlyCommand(WAIT);
addMasterOnlyCommand(SCRIPT);

addMasterOnlyCommand(SUBSCRIBE);
addMasterOnlyCommand(PSUBSCRIBE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.vertx.redis.client.Command.ASKING;
import static io.vertx.redis.client.Command.AUTH;
import static io.vertx.redis.client.Command.SCRIPT;
import static io.vertx.redis.client.Request.cmd;

public class RedisClusterConnection implements RedisConnection {
Expand Down Expand Up @@ -138,23 +140,12 @@ public Future<Response> send(Request request) {
case 0:
// can run anywhere
if (REDUCERS.containsKey(cmd)) {
final List<Future<Response>> responses = new ArrayList<>(slots.size());

for (int i = 0; i < slots.size(); i++) {
String[] endpoints = slots.endpointsForSlot(i);

final Promise<Response> p = vertx.promise();
send(selectMasterOrReplicaEndpoint(cmd.isReadOnly(args), endpoints, forceMasterEndpoint), RETRIES, req, p);
responses.add(p.future());
}

Future.all(responses).onComplete(composite -> {
if (composite.failed()) {
// means if one of the operations failed, then we can fail the handler
promise.fail(composite.cause());
} else {
promise.complete(REDUCERS.get(cmd).apply(composite.result().list()));
}
sendToAllSlots(promise, req, cmd, args, forceMasterEndpoint, REDUCERS.get(cmd));
} else if (cmd.equals(SCRIPT) && args.size() > 1 && args.get(0).length == 4 && "LOAD".equalsIgnoreCase(new String(args.get(0)))) {
sendToAllSlots(promise, req, cmd, args, forceMasterEndpoint, responses -> {
// all nodes should compute the same sha
assert responses.stream().map(Response::toString).collect(Collectors.toSet()).size() == 1;
return responses.get(0);
});
} else {
// it doesn't matter which node to use
Expand Down Expand Up @@ -213,6 +204,27 @@ public Future<Response> send(Request request) {
}
}

private void sendToAllSlots(Promise<Response> promise, RequestImpl req, CommandImpl cmd, List<byte[]> args, boolean forceMasterEndpoint, Function<List<Response>, Response> reducer) {
final List<Future<Response>> responses = new ArrayList<>(slots.size());

for (int i = 0; i < slots.size(); i++) {
String[] endpoints = slots.endpointsForSlot(i);

final Promise<Response> p = vertx.promise();
send(selectMasterOrReplicaEndpoint(cmd.isReadOnly(args), endpoints, forceMasterEndpoint), RETRIES, req, p);
responses.add(p.future());
}

Future.all(responses).onComplete(composite -> {
if (composite.failed()) {
// means if one of the operations failed, then we can fail the handler
promise.fail(composite.cause());
} else {
promise.complete(reducer.apply(composite.result().list()));
}
});
}

private Map<Integer, Request> splitRequest(CommandImpl cmd, List<byte[]> args) {
// we will split the request across the slots
final Map<Integer, Request> map = new IdentityHashMap<>();
Expand Down
56 changes: 55 additions & 1 deletion src/test/java/io/vertx/redis/client/test/RedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class RedisClusterTest {

@ClassRule
public static final GenericContainer<?> redis = new FixedHostPortGenericContainer<>("grokzen/redis-cluster:6.2.0")
public static final GenericContainer<?> redis = new FixedHostPortGenericContainer<>("grokzen/redis-cluster:6.2.11")
.withEnv("IP", "0.0.0.0")
.withEnv("STANDALONE", "true")
.withEnv("SENTINEL", "true")
Expand Down Expand Up @@ -1223,4 +1223,58 @@ public void preservesContext(TestContext should) {
PreservesContext.connectThenSend(client, should);
PreservesContext.connectThenBatch(client, should);
}

/**
* This test runs EVALSHA with two keys hashed to different slots to
* verify that SCRIPT LOAD was run on both nodes
*/
@Test
public void testScriptLoadRunsOnEveryMaster(TestContext should) {
final Async test = should.async();

final String key1 = "{hash_tag}.some-key1";
final String argv1 = "some-value1";

final String key2 = "{other_hash_tag}.other-key1";
final String argv2 = "other-value1";

String script = "return redis.call('GET', KEYS[1])";

client.connect().compose(cluster -> {
cluster.exceptionHandler(should::fail);
Future<@Nullable Response> setFuture1 = cluster.send(cmd(SET).arg(key1).arg(argv1));
Future<@Nullable Response> setFuture2 = cluster.send(cmd(SET).arg(key2).arg(argv2));
Future<@Nullable Response> scriptLoadFuture = cluster.send(cmd(SCRIPT).arg("load").arg(script));

return Future.all(setFuture1, setFuture2, scriptLoadFuture)
.compose(compositeRet -> {
System.out.println("set and script load successful");

return scriptLoadFuture;
})
.compose(scriptLoadResponse -> {
String scriptSha = scriptLoadResponse.toString();

System.out.println("script load successful, sha: "+scriptSha);

return Future.all(
cluster.send(cmd(EVALSHA).arg(scriptSha).arg(1).arg(key1)),
cluster.send(cmd(EVALSHA).arg(scriptSha).arg(1).arg(key2))
);
})
.compose(compositeEval -> {
System.out.println("eval operations successful");
Set<String> evalRet = compositeEval.list().stream().map(Object::toString)
.collect(Collectors.toSet());
should.assertTrue(evalRet.contains(argv1));
should.assertTrue(evalRet.contains(argv2));

test.complete();
return Future.succeededFuture();
});
}).onFailure(throwable -> {
throwable.printStackTrace();
should.fail(throwable);
});
}
}