Skip to content

Commit

Permalink
Only (re-)send MEET packet once every handshake timeout period (#1441)
Browse files Browse the repository at this point in the history
Add `meet_sent` field in `clusterNode` indicating the last time we sent
a MEET packet. Use this field to only (re-)send a MEET packet once every
handshake timeout period when detecting a node without an inbound link.

When receiving multiple MEET packets on the same link while the node is
in handshake state, instead of dropping the packet, we now simply
prevent the creation of a new node. This way we still process the MEET
packet's gossip and reply with a PONG as any other packets.

Improve some logging messages to include `human_nodename`. Add
`nodeExceedsHandshakeTimeout()` function.

This is a follow-up to this previous PR:
#1307
And a partial fix to the crash described in:
#1436

---------

Signed-off-by: Pierre Turin <[email protected]>
  • Loading branch information
pieturin authored Dec 30, 2024
1 parent e470735 commit e4179f1
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 68 deletions.
139 changes: 75 additions & 64 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);

/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
Expand Down Expand Up @@ -1346,9 +1347,10 @@ clusterLink *createClusterLink(clusterNode *node) {
* with this link will have the 'link' field set to NULL. */
void freeClusterLink(clusterLink *link) {
serverAssert(link != NULL);
serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s",
serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s (%s)",
link->node ? link->node->name : "<unknown>",
link->inbound ? "inbound" : "outbound");
link->inbound ? "inbound" : "outbound",
link->node ? link->node->human_nodename : "<unknown>");

if (link->conn) {
connClose(link->conn);
Expand Down Expand Up @@ -1502,6 +1504,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->last_in_ping_gossip = 0;
node->ping_sent = node->pong_received = 0;
node->data_received = 0;
node->meet_sent = 0;
node->fail_time = 0;
node->link = NULL;
node->inbound_link = NULL;
Expand Down Expand Up @@ -1723,7 +1726,7 @@ void clusterAddNode(clusterNode *node) {
*/
void clusterDelNode(clusterNode *delnode) {
serverAssert(delnode != NULL);
serverLog(LL_DEBUG, "Deleting node %.40s from cluster view", delnode->name);
serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, delnode->human_nodename);

int j;
dictIterator *di;
Expand Down Expand Up @@ -3143,27 +3146,6 @@ int clusterProcessPacket(clusterLink *link) {
return 1;
}

if (type == CLUSTERMSG_TYPE_MEET && link->node && nodeInHandshake(link->node)) {
/* If the link is bound to a node and the node is in the handshake state, and we receive
* a MEET packet, it may be that the sender sent multiple MEET packets so in here we are
* dropping the MEET to avoid the assert in setClusterNodeToInboundClusterLink. The assert
* will happen if the other sends a MEET packet because it detects that there is no inbound
* link, this node creates a new node in HANDSHAKE state (with a random node name), and
* respond with a PONG. The other node receives the PONG and removes the CLUSTER_NODE_MEET
* flag. This node is supposed to open an outbound connection to the other node in the next
* cron cycle, but before this happens, the other node re-sends a MEET on the same link
* because it still detects no inbound connection. We improved the re-send logic of MEET in
* #1441, now we will only re-send MEET packet once every handshake timeout period.
*
* Note that in getNodeFromLinkAndMsg, the node in the handshake state has a random name
* and not truly "known", so we don't know the sender. Dropping the MEET packet can prevent
* us from creating a random node, avoid incorrect link binding, and avoid duplicate MEET
* packet eliminate the handshake state. */
serverLog(LL_NOTICE, "Dropping MEET packet from node %.40s because the node is already in handshake state",
link->node->name);
return 1;
}

uint16_t flags = ntohs(hdr->flags);
uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0;
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
Expand Down Expand Up @@ -3261,42 +3243,59 @@ int clusterProcessPacket(clusterLink *link) {

if (type == CLUSTERMSG_TYPE_MEET) {
if (!sender) {
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, replicaof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. The exception
* to this is the flag that indicates extensions are supported, as
* we want to send extensions right away in the return PONG in order
* to reduce the amount of time needed to stabilize the shard ID. */
clusterNode *node;

node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK);
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
node->cport = ntohs(hdr->cport);
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
if (!link->node) {
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, replicaof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. The exception
* to this is the flag that indicates extensions are supported, as
* we want to send extensions right away in the return PONG in order
* to reduce the amount of time needed to stabilize the shard ID. */
clusterNode *node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
if (nodeIp2String(node->ip, link, hdr->myip) != C_OK) {
/* We cannot get the IP info from the link, it probably means the connection is closed. */
serverLog(LL_NOTICE, "Closing link even though we received a MEET packet on it, "
"because the connection has an error");
freeClusterLink(link);
freeClusterNode(node);
return 0;
}
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
node->cport = ntohs(hdr->cport);
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}
setClusterNodeToInboundClusterLink(node, link);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
} else {
/* A second MEET packet was received on an existing link during the handshake process.
* This happens when the other node detects no inbound link, and re-sends a MEET packet
* before this node can respond with a PING. This MEET is a no-op.
*
* Note: Nodes in HANDSHAKE state are not fully "known" (random names), so the sender
* remains unidentified at this point. The MEET packet might be re-sent if the inbound
* connection is still unestablished by the next cron cycle.
*/
debugServerAssert(link->inbound && nodeInHandshake(link->node));
}
setClusterNodeToInboundClusterLink(node, link);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);

/* If this is a MEET packet from an unknown node, we still process
* the gossip section here since we have to trust the sender because
* of the message type. */
clusterProcessGossipSection(hdr, link);
} else if (sender->link && now - sender->ctime > server.cluster_node_timeout) {
} else if (sender->link && nodeExceedsHandshakeTimeout(sender, now)) {
/* The MEET packet is from a known node, after the handshake timeout, so the sender thinks that I do not
* know it.
* Freeing my outbound link to that node, to force a reconnect and sending a PING.
* Free my outbound link to that node, triggering a reconnect and a PING over the new link.
* Once that node receives our PING, it should recognize the new connection as an inbound link from me.
* We should only free the outbound link if the node is known for more time than the handshake timeout,
* since during this time, the other side might still be trying to complete the handshake. */

/* We should always receive a MEET packet on an inbound link. */
serverAssert(link != sender->link);
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node",
sender->name);
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s (%s) after receiving a MEET packet from this known node",
sender->name, sender->human_nodename);
freeClusterLink(sender->link);
}
}
Expand Down Expand Up @@ -4062,7 +4061,12 @@ void clusterSendPing(clusterLink *link, int type) {
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, estlen);
clusterMsg *hdr = getMessageFromSendBlock(msgblock);

if (!link->inbound && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime();
if (!link->inbound) {
if (type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
else if (type == CLUSTERMSG_TYPE_MEET)
link->node->meet_sent = mstime();
}

/* Populate the gossip fields */
int maxiterations = wanted * 3;
Expand Down Expand Up @@ -4981,10 +4985,22 @@ void clusterHandleManualFailover(void) {
* CLUSTER cron job
* -------------------------------------------------------------------------- */

static mstime_t getHandshakeTimeout(void) {
/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the cluster_node_timeout value, but when cluster_node_timeout is
* too small we use the value of 1 second. */
return max(server.cluster_node_timeout, 1000);
}

static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now) {
return now - node->ctime > getHandshakeTimeout() ? 1 : 0;
}

/* Check if the node is disconnected and re-establish the connection.
* Also update a few stats while we are here, that can be used to make
* better decisions in other part of the code. */
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now) {
/* Not interested in reconnecting the link with myself or nodes
* for which we have no address. */
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) return 1;
Expand All @@ -4993,19 +5009,22 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_

/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
serverLog(LL_WARNING, "Clusterbus handshake timeout %s:%d after %lldms", node->ip,
node->cport, handshake_timeout);
if (nodeInHandshake(node) && nodeExceedsHandshakeTimeout(node, now)) {
serverLog(LL_WARNING, "Clusterbus handshake timeout %s:%d", node->ip, node->cport);
clusterDelNode(node);
return 1;
}
if (node->link != NULL && node->inbound_link == NULL && nodeInNormalState(node) &&
now - node->inbound_link_freed_time > handshake_timeout) {
if (nodeInNormalState(node) && node->link != NULL && node->inbound_link == NULL &&
now - node->inbound_link_freed_time > getHandshakeTimeout() &&
now - node->meet_sent > getHandshakeTimeout()) {
/* Node has an outbound link, but no inbound link for more than the handshake timeout.
* This probably means this node does not know us yet, whereas we know it.
* So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view. */
* So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view.
* We make sure to not re-send a MEET packet more than once every handshake timeout period, so as to
* leave the other node time to complete the handshake. */
node->flags |= CLUSTER_NODE_MEET;
serverLog(LL_NOTICE, "Sending MEET packet to node %.40s because there is no inbound link for it", node->name);
serverLog(LL_NOTICE, "Sending MEET packet to node %.40s (%s) because there is no inbound link for it",
node->name, node->human_nodename);
clusterSendPing(node->link, CLUSTERMSG_TYPE_MEET);
}

Expand Down Expand Up @@ -5066,19 +5085,11 @@ void clusterCron(void) {
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
static unsigned long long iteration = 0;
mstime_t handshake_timeout;

iteration++; /* Number of times this function was called so far. */

clusterUpdateMyselfHostname();

/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

/* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
server.cluster->stats_pfail_nodes = 0;
/* Run through some of the operations we want to do on each cluster node. */
Expand All @@ -5091,7 +5102,7 @@ void clusterCron(void) {
/* The protocol is that function(s) below return non-zero if the node was
* terminated.
*/
if (clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
if (clusterNodeCronHandleReconnect(node, now)) continue;
}
dictReleaseIterator(di);

Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ struct _clusterNode {
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t meet_sent; /* Unix time we sent latest meet packet */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned primary condition */
Expand Down
7 changes: 3 additions & 4 deletions tests/unit/cluster/cluster-reliable-meet.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ tags {tls:skip external:skip cluster} {
[CI 0 cluster_stats_messages_meet_received] >= 4 &&
[CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received]
} else {
fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]"
fail "Unexpected cluster state: node 1 cluster_state:[CI 1 cluster_state], node 0 cluster_state: [CI 0 cluster_state]"
}
}
} ;# stop servers
Expand Down Expand Up @@ -178,14 +178,13 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout

# Wait for Node 0's handshake to timeout
wait_for_condition 50 100 {
[cluster_get_first_node_in_handshake 1] eq {}
[cluster_get_first_node_in_handshake 0] eq {}
} else {
fail "Node 0 never exited handshake state"
}

# At this point Node 0 knows Node 1 & 2 through the gossip, but they don't know Node 0.
# At this point Node 0 knows Node 2 through the gossip, but Node 1 & 2 don't know Node 0.
wait_for_condition 50 100 {
[cluster_get_node_by_id 0 $node1_id] != {} &&
[cluster_get_node_by_id 0 $node2_id] != {} &&
[cluster_get_node_by_id 1 $node0_id] eq {} &&
[cluster_get_node_by_id 2 $node0_id] eq {}
Expand Down

0 comments on commit e4179f1

Please sign in to comment.