Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger manual failover on SIGTERM / shutdown to cluster primary #1091

Open
wants to merge 16 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
Expand Down
54 changes: 37 additions & 17 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1233,25 +1233,30 @@ void clusterInitLast(void) {

/* Called when a cluster node receives SHUTDOWN. */
void clusterHandleServerShutdown(void) {
if (server.auto_failover_on_shutdown) {
if (nodeIsPrimary(myself) && server.auto_failover_on_shutdown) {
/* Find the first best replica, that is, the replica with the largest offset. */
client *best_replica = NULL;
listIter replicas_iter;
listNode *replicas_list_node;
listRewind(server.replicas, &replicas_iter);
while ((replicas_list_node = listNext(&replicas_iter)) != NULL) {
client *replica = listNodeValue(replicas_list_node);
/* This is done only when the replica offset is caught up, to avoid data loss */
if (replica->repl_state == REPLICA_STATE_ONLINE && replica->repl_ack_off == server.primary_repl_offset) {
/* This is done only when the replica offset is caught up, to avoid data loss.
* And 0x800ff is 8.0.255, we only support new versions for this feature. */
if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE &&
// replica->repl_data->replica_version > 0x800ff &&
Comment on lines +1244 to +1247
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to uncomment the check?

In the comment, maybe we shall not say "new" because in a few years, this will not be new anymore.

Why not check >= 0x80100 instead > 0x800ff? It's the same but maybe easier to read?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i take this from

             * And 0x702ff is 7.2.255, we only support new versions in this case. */
            if (r->repl_data->repl_state == REPLICA_STATE_ONLINE && r->repl_data->replica_version > 0x702ff) {
                num_eligible_replicas++;
            }

Forgot to uncomment the check?

just a easy way that i can test in local.

replica->name && sdslen(replica->name->ptr) == CLUSTER_NAMELEN &&
replica->repl_data->repl_ack_off == server.primary_repl_offset) {
best_replica = replica;
break;
}
}

if (best_replica) {
/* Send a CLUSTER FAILOVER FORCE to the best replica. */
const char *buf = "*3\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n";
if (connWrite(best_replica->conn, buf, strlen(buf)) == (int)strlen(buf)) {
char buf[128];
size_t buflen = snprintf(buf, sizeof(buf), "*5\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n$9\r\nreplicaid\r\n$%d\r\n%s\r\n", CLUSTER_NAMELEN, (char *)best_replica->name->ptr);
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
if (connWrite(best_replica->conn, buf, buflen) == (int)strlen(buf)) {
serverLog(LL_NOTICE, "Sending CLUSTER FAILOVER FORCE to replica %s succeeded.",
replicationGetReplicaName(best_replica));
} else {
Expand Down Expand Up @@ -4821,8 +4826,9 @@ void clusterHandleReplicaFailover(void) {
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = now;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_count++;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
/* Reset auth_age since it is outdated now and we can bypass the auth_timeout
* check in the next state and start the election ASAP. */
auth_age = 0;
}
serverLog(LL_NOTICE,
"Start of election delayed for %lld milliseconds "
Expand Down Expand Up @@ -7026,32 +7032,46 @@ int clusterCommandSpecial(client *c) {
} else {
addReplyLongLong(c, clusterNodeFailureReportsCount(n));
}
} else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc == 2 || c->argc == 3)) {
/* CLUSTER FAILOVER [FORCE|TAKEOVER] */
} else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc >= 2)) {
/* CLUSTER FAILOVER [FORCE|TAKEOVER] [replicaid <node id>] */
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
int force = 0, takeover = 0;
robj *replicaid = NULL;

if (c->argc == 3) {
if (!strcasecmp(c->argv[2]->ptr, "force")) {
for (int j = 2; j < c->argc; j++) {
int moreargs = (c->argc - 1) - j;
if (!strcasecmp(c->argv[j]->ptr, "force")) {
force = 1;
} else if (!strcasecmp(c->argv[2]->ptr, "takeover")) {
} else if (!strcasecmp(c->argv[j]->ptr, "takeover")) {
takeover = 1;
force = 1; /* Takeover also implies force. */
} else if (!strcasecmp(c->argv[j]->ptr, "replicaid") && moreargs) {
j++;
replicaid = c->argv[j];
} else {
addReplyErrorObject(c, shared.syntaxerr);
return 1;
}
}

/* Check if it should be executed by myself. */
if (replicaid != NULL) {
clusterNode *n = clusterLookupNode(replicaid->ptr, sdslen(replicaid->ptr));
if (n != myself) {
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
/* Ignore this command, including the sanity check and the process. */
addReply(c, shared.ok);
return 1;
}
}

/* Check preconditions. */
if (clusterNodeIsPrimary(myself)) {
addReplyError(c, "You should send CLUSTER FAILOVER to a replica");
if (replicaid == NULL) addReplyError(c, "You should send CLUSTER FAILOVER to a replica");
return 1;
} else if (myself->replicaof == NULL) {
addReplyError(c, "I'm a replica but my master is unknown to me");
if (replicaid == NULL) addReplyError(c, "I'm a replica but my master is unknown to me");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this? If the primary is unknown, the failover can't succeed, so I think we need to return an error even if REPLICAID is sent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if there would be some races, that a replica return an error to the priamry. Or maybe we should always return OK if replicaid is passed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replicas don't send the replies to primaries. Only problem is the confic to panic on repöocation errors. But i can't see any races. Can you?

We can return ok if you want. Maybe we should check that c is the primary fake client and forbid this option for normal clients?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we will write it to the backlog, i am worry that after some down and up, the psync will get the command and return an error. Though I haven't verified it specifically.

return 1;
} else if (!force && (nodeFailed(myself->replicaof) || myself->replicaof->link == NULL)) {
addReplyError(c, "Master is down or failed, "
"please use CLUSTER FAILOVER FORCE");
if (replicaid == NULL) addReplyError(c, "Master is down or failed, please use CLUSTER FAILOVER FORCE");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this? I think it should be an error even with REPLICAID.

return 1;
}
resetManualFailover();
Expand All @@ -7075,7 +7095,7 @@ int clusterCommandSpecial(client *c) {
} else {
serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client);
}
server.cluster->mf_can_start = 1;
manualFailoverCanStart();
/* We can start a manual failover as soon as possible, setting a flag
* here so that we don't need to waiting for the cron to kick in. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3187,6 +3187,7 @@ standardConfig static_configs[] = {
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
createBoolConfig("auto-failover-on-shutdown", NULL, MODIFIABLE_CONFIG, server.auto_failover_on_shutdown, 0, NULL, NULL),

/* String Configs */
Expand Down
26 changes: 26 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3594,6 +3594,14 @@ void syncWithPrimary(connection *conn) {
err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL);
if (err) goto write_error;

/* Inform the primary of our (replica) node name. */
if (server.cluster_enabled) {
char *argv[] = {"CLIENT", "SETNAME", server.cluster->myself->name};
size_t lens[] = {6, 7, CLUSTER_NAMELEN};
err = sendCommandArgv(conn, 3, argv, lens);
if (err) goto write_error;
}

server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
return;
}
Expand Down Expand Up @@ -3684,6 +3692,24 @@ void syncWithPrimary(connection *conn) {
}
sdsfree(err);
err = NULL;
if (server.cluster_enabled) {
server.repl_state = REPL_STATE_RECEIVE_SETNAME_REPLY;
return;
} else {
server.repl_state = REPL_STATE_SEND_PSYNC;
}
}

/* Receive CLIENT SETNAME reply. */
if (server.repl_state == REPL_STATE_RECEIVE_SETNAME_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any. 8.1 introduced this logic and we don't care if it failed. */
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
if (err[0] == '-') {
serverLog(LL_NOTICE, "(Non critical) Primary does not understand CLIENT SETNAME: %s", err);
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC;
}

Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ typedef enum {
REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_SETNAME_REPLY, /* Wait for CLIENT SETNAME reply */
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
Expand Down
2 changes: 1 addition & 1 deletion tests/support/util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ proc check_replica_acked_ofs {primary replica_ip replica_port} {
proc wait_replica_acked_ofs {primary replica replica_ip replica_port} {
$primary config set repl-ping-replica-period 3600
$replica config set hz 500
wait_for_condition 100 100 {
wait_for_condition 1000 50 {
[check_replica_acked_ofs $primary $replica_ip $replica_port] eq 1
} else {
puts "INFO REPLICATION: [$primary info replication]"
Expand Down
Loading