From 884850017678ccc0d249330a5322457c8cf66944 Mon Sep 17 00:00:00 2001 From: stav bentov Date: Thu, 23 Jan 2025 13:59:44 +0000 Subject: [PATCH 1/4] Optimize key expiration check with branch prediction hint Signed-off-by: stav bentov --- src/db.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/db.c b/src/db.c index f2a000030b..e562ab0ce3 100644 --- a/src/db.c +++ b/src/db.c @@ -114,9 +114,11 @@ robj *lookupKey(serverDb *db, robj *key, int flags) { int expire_flags = 0; if (flags & LOOKUP_WRITE && !is_ro_replica) expire_flags |= EXPIRE_FORCE_DELETE_EXPIRED; if (flags & LOOKUP_NOEXPIRE) expire_flags |= EXPIRE_AVOID_DELETE_EXPIRED; - if (expireIfNeededWithDictIndex(db, key, val, expire_flags, dict_index) != KEY_VALID) { - /* The key is no longer valid. */ - val = NULL; + if (unlikely(dbSize(db) > 0)) { // This condition check found to improve branch prediction + if (expireIfNeededWithDictIndex(db, key, val, expire_flags, dict_index) != KEY_VALID) { + /* The key is no longer valid. */ + val = NULL; + } } } From ecb633a83d94fb9c8a6494e5efe1bfc96353ab82 Mon Sep 17 00:00:00 2001 From: stav bentov Date: Sun, 26 Jan 2025 15:19:37 +0000 Subject: [PATCH 2/4] Added IP address validation in node address update Signed-off-by: stav bentov --- src/cluster_legacy.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 5e976d3060..f5e68675de 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -2456,6 +2456,10 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg * if (node->tcp_port == tcp_port && node->cport == cport && node->tls_port == tls_port && strcmp(ip, node->ip) == 0) return 0; + /* We should not update the node address if we were not able to get a valid + * IP address. */ + if (ip[0] == '\0' || strcmp(ip, "?") == 0) return 0; + /* IP / port is different, update it. */ memcpy(node->ip, ip, sizeof(ip)); node->tcp_port = tcp_port; From 64ddd291b9c441be967cc0564c641deda4e825ed Mon Sep 17 00:00:00 2001 From: stav bentov Date: Mon, 27 Jan 2025 08:47:15 +0000 Subject: [PATCH 3/4] Refactor: extract loop logic into processSingleFileEvent() dedicated function Signed-off-by: stav bentov --- src/ae.c | 99 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 52 insertions(+), 47 deletions(-) diff --git a/src/ae.c b/src/ae.c index 643ff17070..34dfeda958 100644 --- a/src/ae.c +++ b/src/ae.c @@ -393,6 +393,57 @@ int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) { return ret; } +static void processSingleFileEvent( + aeEventLoop *eventLoop, aeFiredEvent *firedEvent) { + int mask = firedEvent->mask; + int fd = firedEvent->fd; + aeFileEvent *fe = &eventLoop->events[fd]; + int fired = 0; /* Number of events fired for current fd. */ + + /* Normally we execute the readable event first, and the writable + * event later. This is useful as sometimes we may be able + * to serve the reply of a query immediately after processing the + * query. + * + * However if AE_BARRIER is set in the mask, our application is + * asking us to do the reverse: never fire the writable event + * after the readable. In such a case, we invert the calls. + * This is useful when, for instance, we want to do things + * in the beforeSleep() hook, like fsyncing a file to disk, + * before replying to a client. */ + int invert = fe->mask & AE_BARRIER; + + /* Note the "fe->mask & mask & ..." code: maybe an already + * processed event removed an element that fired and we still + * didn't processed, so we check if the event is still valid. + * + * Fire the readable event if the call sequence is not + * inverted. */ + if (!invert && fe->mask & mask & AE_READABLE) { + fe->rfileProc(eventLoop, fd, fe->clientData, mask); + fired++; + fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ + } + + /* Fire the writable event. */ + if (fe->mask & mask & AE_WRITABLE) { + if (!fired || fe->wfileProc != fe->rfileProc) { + fe->wfileProc(eventLoop, fd, fe->clientData, mask); + fired++; + } + } + + /* If we have to invert the call, fire the readable event now + * after the writable one. */ + if (invert) { + fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ + if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { + fe->rfileProc(eventLoop, fd, fe->clientData, mask); + fired++; + } + } +} + /* Process every pending file event, then every pending time event * (that may be registered by file event callbacks just processed). * Without special flags the function sleeps until some file event @@ -458,53 +509,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop, numevents); for (j = 0; j < numevents; j++) { - int fd = eventLoop->fired[j].fd; - aeFileEvent *fe = &eventLoop->events[fd]; - int mask = eventLoop->fired[j].mask; - int fired = 0; /* Number of events fired for current fd. */ - - /* Normally we execute the readable event first, and the writable - * event later. This is useful as sometimes we may be able - * to serve the reply of a query immediately after processing the - * query. - * - * However if AE_BARRIER is set in the mask, our application is - * asking us to do the reverse: never fire the writable event - * after the readable. In such a case, we invert the calls. - * This is useful when, for instance, we want to do things - * in the beforeSleep() hook, like fsyncing a file to disk, - * before replying to a client. */ - int invert = fe->mask & AE_BARRIER; - - /* Note the "fe->mask & mask & ..." code: maybe an already - * processed event removed an element that fired and we still - * didn't processed, so we check if the event is still valid. - * - * Fire the readable event if the call sequence is not - * inverted. */ - if (!invert && fe->mask & mask & AE_READABLE) { - fe->rfileProc(eventLoop, fd, fe->clientData, mask); - fired++; - fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ - } - - /* Fire the writable event. */ - if (fe->mask & mask & AE_WRITABLE) { - if (!fired || fe->wfileProc != fe->rfileProc) { - fe->wfileProc(eventLoop, fd, fe->clientData, mask); - fired++; - } - } - - /* If we have to invert the call, fire the readable event now - * after the writable one. */ - if (invert) { - fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ - if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { - fe->rfileProc(eventLoop, fd, fe->clientData, mask); - fired++; - } - } + processSingleFileEvent(eventLoop ,&eventLoop->fired[j]); processed++; } From 55b77ce8024c7c51bdc4a7183cb19ee81d6d0787 Mon Sep 17 00:00:00 2001 From: stav bentov Date: Mon, 27 Jan 2025 08:50:23 +0000 Subject: [PATCH 4/4] Refactor: Split cluster size computation into helper function Signed-off-by: stav bentov --- src/cluster_legacy.c | 47 +++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index f5e68675de..e48c0b29e3 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -5550,6 +5550,30 @@ void clusterCloseAllSlots(void) { memset(server.cluster->importing_slots_from, 0, sizeof(server.cluster->importing_slots_from)); } +static void clusterDetermineClusterSize(int *reachable_primaries) { + /* Compute the cluster size, that is the number of primary nodes + * serving at least a single slot. + * + * At the same time count the number of reachable primaries having + * at least one slot. */ + { + dictIterator *di; + dictEntry *de; + + server.cluster->size = 0; + di = dictGetSafeIterator(server.cluster->nodes); + while ((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + + if (clusterNodeIsVotingPrimary(node)) { + server.cluster->size++; + if ((node->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) == 0) reachable_primaries++; + } + } + dictReleaseIterator(di); + } +} + /* ----------------------------------------------------------------------------- * Cluster state evaluation function * -------------------------------------------------------------------------- */ @@ -5613,28 +5637,7 @@ void clusterUpdateState(void) { } } } - - /* Compute the cluster size, that is the number of primary nodes - * serving at least a single slot. - * - * At the same time count the number of reachable primaries having - * at least one slot. */ - { - dictIterator *di; - dictEntry *de; - - server.cluster->size = 0; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); - - if (clusterNodeIsVotingPrimary(node)) { - server.cluster->size++; - if ((node->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) == 0) reachable_primaries++; - } - } - dictReleaseIterator(di); - } + clusterDetermineClusterSize(&reachable_primaries); /* If we are in a minority partition, change the cluster state * to FAIL. */