Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements and Refactor for Maintainability, and Performance #1625

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 52 additions & 47 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,57 @@ int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) {
return ret;
}

static void processSingleFileEvent(
aeEventLoop *eventLoop, aeFiredEvent *firedEvent) {
int mask = firedEvent->mask;
int fd = firedEvent->fd;
aeFileEvent *fe = &eventLoop->events[fd];
int fired = 0; /* Number of events fired for current fd. */

/* Normally we execute the readable event first, and the writable
* event later. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsyncing a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;

/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}

/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
}

/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
}
}

/* Process every pending file event, then every pending time event
* (that may be registered by file event callbacks just processed).
* Without special flags the function sleeps until some file event
Expand Down Expand Up @@ -458,53 +509,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop, numevents);

for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
int fired = 0; /* Number of events fired for current fd. */

/* Normally we execute the readable event first, and the writable
* event later. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsyncing a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;

/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}

/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
}

/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
}
processSingleFileEvent(eventLoop ,&eventLoop->fired[j]);

processed++;
}
Expand Down
51 changes: 29 additions & 22 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2456,6 +2456,10 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg *
if (node->tcp_port == tcp_port && node->cport == cport && node->tls_port == tls_port && strcmp(ip, node->ip) == 0)
return 0;

/* We should not update the node address if we were not able to get a valid
* IP address. */
if (ip[0] == '\0' || strcmp(ip, "?") == 0) return 0;

/* IP / port is different, update it. */
memcpy(node->ip, ip, sizeof(ip));
node->tcp_port = tcp_port;
Expand Down Expand Up @@ -5546,6 +5550,30 @@ void clusterCloseAllSlots(void) {
memset(server.cluster->importing_slots_from, 0, sizeof(server.cluster->importing_slots_from));
}

static void clusterDetermineClusterSize(int *reachable_primaries) {
/* Compute the cluster size, that is the number of primary nodes
* serving at least a single slot.
*
* At the same time count the number of reachable primaries having
* at least one slot. */
{
dictIterator *di;
dictEntry *de;

server.cluster->size = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (clusterNodeIsVotingPrimary(node)) {
server.cluster->size++;
if ((node->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) == 0) reachable_primaries++;
}
}
dictReleaseIterator(di);
}
}

/* -----------------------------------------------------------------------------
* Cluster state evaluation function
* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -5609,28 +5637,7 @@ void clusterUpdateState(void) {
}
}
}

/* Compute the cluster size, that is the number of primary nodes
* serving at least a single slot.
*
* At the same time count the number of reachable primaries having
* at least one slot. */
{
dictIterator *di;
dictEntry *de;

server.cluster->size = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (clusterNodeIsVotingPrimary(node)) {
server.cluster->size++;
if ((node->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) == 0) reachable_primaries++;
}
}
dictReleaseIterator(di);
}
clusterDetermineClusterSize(&reachable_primaries);

/* If we are in a minority partition, change the cluster state
* to FAIL. */
Expand Down
8 changes: 5 additions & 3 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ robj *lookupKey(serverDb *db, robj *key, int flags) {
int expire_flags = 0;
if (flags & LOOKUP_WRITE && !is_ro_replica) expire_flags |= EXPIRE_FORCE_DELETE_EXPIRED;
if (flags & LOOKUP_NOEXPIRE) expire_flags |= EXPIRE_AVOID_DELETE_EXPIRED;
if (expireIfNeededWithDictIndex(db, key, val, expire_flags, dict_index) != KEY_VALID) {
/* The key is no longer valid. */
val = NULL;
if (unlikely(dbSize(db) > 0)) { // This condition check found to improve branch prediction
if (expireIfNeededWithDictIndex(db, key, val, expire_flags, dict_index) != KEY_VALID) {
/* The key is no longer valid. */
val = NULL;
}
}
}

Expand Down
Loading