Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remember failures when collecting initial information in cluster/replication/sentinel #426

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,24 +226,29 @@ private Future<Slots> getSlots(ContextInternal context) {
if (this.slots.compareAndSet(null, future)) {
LOG.debug("Obtaining hash slot assignment");
// attempt to load the slots from the first good endpoint
getSlots(connectOptions.getEndpoints(), 0, promise);
getSlots(connectOptions.getEndpoints(), 0, ConcurrentHashMap.newKeySet(), promise);
return future;
}
}
}

private void getSlots(List<String> endpoints, int index, Handler<AsyncResult<Slots>> onGotSlots) {
private void getSlots(List<String> endpoints, int index, Set<Throwable> failures, Handler<AsyncResult<Slots>> onGotSlots) {
if (index >= endpoints.size()) {
// stop condition
onGotSlots.handle(Future.failedFuture(new RedisConnectException("Cannot connect to any of the provided endpoints")));
StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints");
for (Throwable failure : failures) {
message.append("\n- ").append(failure);
}
onGotSlots.handle(Future.failedFuture(new RedisConnectException(message.toString())));
scheduleCachedSlotsExpiration();
return;
}

connectionManager.getConnection(endpoints.get(index), RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
// try with the next endpoint
getSlots(endpoints, index + 1, onGotSlots);
failures.add(err);
getSlots(endpoints, index + 1, failures, onGotSlots);
})
.onSuccess(conn -> {
getSlots(endpoints.get(index), conn).onComplete(result -> {
Expand All @@ -253,7 +258,8 @@ private void getSlots(List<String> endpoints, int index, Handler<AsyncResult<Slo

if (result.failed()) {
// the slots command failed, try with next endpoint
getSlots(endpoints, index + 1, onGotSlots);
failures.add(result.cause());
getSlots(endpoints, index + 1, failures, onGotSlots);
} else {
Slots slots = result.result();
onGotSlots.handle(Future.succeededFuture(slots));
Expand All @@ -269,10 +275,16 @@ private Future<Slots> getSlots(String endpoint, RedisConnection conn) {
.compose(reply -> {
if (reply == null || reply.size() == 0) {
// no slots available we can't really proceed
return Future.failedFuture("SLOTS No slots available in the cluster.");
return Future.failedFuture("CLUSTER SLOTS No slots available in the cluster.");
}

return Future.succeededFuture(new Slots(endpoint, reply));
Slots result;
try {
result = new Slots(endpoint, reply);
} catch (Exception e) {
return Future.failedFuture("CLUSTER SLOTS response invalid: " + e);
}
return Future.succeededFuture(result);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static io.vertx.redis.client.Command.*;
Expand Down Expand Up @@ -109,21 +110,26 @@ public Future<RedisConnection> connect() {
// make a copy as we may need to mutate the list during discovery
final List<String> endpoints = new LinkedList<>(connectOptions.getEndpoints());
// attempt to discover the topology from the first good endpoint
connect(endpoints, 0, promise);
connect(endpoints, 0, ConcurrentHashMap.newKeySet(), promise);
return promise.future();
}

private void connect(List<String> endpoints, int index, Handler<AsyncResult<RedisConnection>> onConnect) {
private void connect(List<String> endpoints, int index, Set<Throwable> failures, Handler<AsyncResult<RedisConnection>> onConnect) {
if (index >= endpoints.size()) {
// stop condition
onConnect.handle(Future.failedFuture(new RedisConnectException("Cannot connect to any of the provided endpoints")));
StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints");
for (Throwable failure : failures) {
message.append("\n- ").append(failure);
}
onConnect.handle(Future.failedFuture(new RedisConnectException(message.toString())));
return;
}

connectionManager.getConnection(endpoints.get(index), null)
.onFailure(err -> {
// failed try with the next endpoint
connect(endpoints, index + 1, onConnect);
failures.add(err);
connect(endpoints, index + 1, failures, onConnect);
})
.onSuccess(conn -> {
// fetch slots from the cluster immediately to ensure slots are correct
Expand All @@ -132,7 +138,8 @@ private void connect(List<String> endpoints, int index, Handler<AsyncResult<Redi
// the slots command failed.
conn.close();
// try with the next one
connect(endpoints, index + 1, onConnect);
failures.add(getNodes.cause());
connect(endpoints, index + 1, failures, onConnect);
return;
}

Expand Down
15 changes: 11 additions & 4 deletions src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;
Expand Down Expand Up @@ -158,7 +160,7 @@ private void createConnectionInternal(RedisSentinelConnectOptions options, Redis
private static void resolveClient(final Resolver checkEndpointFn, final RedisSentinelConnectOptions options, final Handler<AsyncResult<RedisURI>> callback) {
// Because finding the master is going to be an async list we will terminate
// when we find one then use promises...
iterate(0, checkEndpointFn, options, iterate -> {
iterate(0, ConcurrentHashMap.newKeySet(), checkEndpointFn, options, iterate -> {
if (iterate.failed()) {
callback.handle(Future.failedFuture(iterate.cause()));
} else {
Expand All @@ -175,12 +177,16 @@ private static void resolveClient(final Resolver checkEndpointFn, final RedisSen
});
}

private static void iterate(final int idx, final Resolver checkEndpointFn, final RedisSentinelConnectOptions argument, final Handler<AsyncResult<Pair<Integer, RedisURI>>> resultHandler) {
private static void iterate(final int idx, final Set<Throwable> failures, final Resolver checkEndpointFn, final RedisSentinelConnectOptions argument, final Handler<AsyncResult<Pair<Integer, RedisURI>>> resultHandler) {
// stop condition
final List<String> endpoints = argument.getEndpoints();

if (idx >= endpoints.size()) {
resultHandler.handle(Future.failedFuture("No more endpoints in chain."));
StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints");
for (Throwable failure : failures) {
message.append("\n- ").append(failure);
}
resultHandler.handle(Future.failedFuture(new RedisConnectException(message.toString())));
return;
}

Expand All @@ -190,7 +196,8 @@ private static void iterate(final int idx, final Resolver checkEndpointFn, final
resultHandler.handle(Future.succeededFuture(new Pair<>(idx, res.result())));
} else {
// try again with next endpoint
iterate(idx + 1, checkEndpointFn, argument, resultHandler);
failures.add(res.cause());
iterate(idx + 1, failures, checkEndpointFn, argument, resultHandler);
}
});
}
Expand Down
Loading