Skip to content

Commit

Permalink
Replace dict with new hashtable: sorted set datatype (#1427)
Browse files Browse the repository at this point in the history
This PR replaces dict with hashtable in the ZSET datatype. Instead of
mapping key to score as dict did, the hashtable maps key to a node in
the skiplist, which contains the score. This takes advantage of
hashtable performance improvements and saves 15 bytes per set item - 24
bytes overhead before, 9 bytes after.

Closes #1096

---------

Signed-off-by: Rain Valentine <[email protected]>
Signed-off-by: Viktor Söderqvist <[email protected]>
Co-authored-by: Viktor Söderqvist <[email protected]>
  • Loading branch information
SoftlyRaining and zuiderkwast authored Jan 8, 2025
1 parent 8af35a1 commit ab627d6
Show file tree
Hide file tree
Showing 13 changed files with 420 additions and 484 deletions.
21 changes: 10 additions & 11 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1890,30 +1890,29 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
}
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;

while ((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
double *score = dictGetVal(de);

hashtableIterator iter;
hashtableInitIterator(&iter, zs->ht);
void *next;
while (hashtableNext(&iter, &next)) {
zskiplistNode *node = next;
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? AOF_REWRITE_ITEMS_PER_CMD : items;

if (!rioWriteBulkCount(r, '*', 2 + cmd_items * 2) || !rioWriteBulkString(r, "ZADD", 4) ||
!rioWriteBulkObject(r, key)) {
dictReleaseIterator(di);
hashtableResetIterator(&iter);
return 0;
}
}
if (!rioWriteBulkDouble(r, *score) || !rioWriteBulkString(r, ele, sdslen(ele))) {
dictReleaseIterator(di);
sds ele = node->ele;
if (!rioWriteBulkDouble(r, node->score) || !rioWriteBulkString(r, ele, sdslen(ele))) {
hashtableResetIterator(&iter);
return 0;
}
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
dictReleaseIterator(di);
hashtableResetIterator(&iter);
} else {
serverPanic("Unknown sorted zset encoding");
}
Expand Down
44 changes: 33 additions & 11 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -1004,13 +1004,6 @@ void dictScanCallback(void *privdata, const dictEntry *de) {
if (!data->only_keys) {
val = dictGetVal(de);
}
} else if (o->type == OBJ_ZSET) {
key = sdsdup(keysds);
if (!data->only_keys) {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf, sizeof(buf), *(double *)dictGetVal(de), LD_STR_AUTO);
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in dict SCAN callback.");
}
Expand All @@ -1021,13 +1014,26 @@ void dictScanCallback(void *privdata, const dictEntry *de) {

void hashtableScanCallback(void *privdata, void *entry) {
scanData *data = (scanData *)privdata;
sds val = NULL;
sds key = NULL;

robj *o = data->o;
list *keys = data->keys;
data->sampled++;

/* currently only implemented for SET scan */
serverAssert(o && o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE);
sds key = (sds)entry; /* Specific for OBJ_SET */
/* This callback is only used for scanning elements within a key (hash
* fields, set elements, etc.) so o must be set here. */
serverAssert(o != NULL);

/* get key */
if (o->type == OBJ_SET) {
key = (sds)entry;
} else if (o->type == OBJ_ZSET) {
zskiplistNode *node = (zskiplistNode *)entry;
key = node->ele;
} else {
serverPanic("Type not handled in hashset SCAN callback.");
}

/* Filter element if it does not match the pattern. */
if (data->pattern) {
Expand All @@ -1036,7 +1042,23 @@ void hashtableScanCallback(void *privdata, void *entry) {
}
}

if (o->type == OBJ_SET) {
/* no value, key used by reference */
} else if (o->type == OBJ_ZSET) {
/* zset data is copied */
zskiplistNode *node = (zskiplistNode *)entry;
key = sdsdup(node->ele);
if (!data->only_keys) {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf, sizeof(buf), node->score, LD_STR_AUTO);
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in hashset SCAN callback.");
}

listAddNodeTail(keys, key);
if (val) listAddNodeTail(keys, val);
}

/* Try to parse a SCAN cursor stored at object 'o':
Expand Down Expand Up @@ -1184,7 +1206,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
shallow_copied_list_items = 1;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dict_table = zs->dict;
hashtable_table = zs->ht;
/* scanning ZSET allocates temporary strings even though it's a dict */
shallow_copied_list_items = 0;
}
Expand Down
26 changes: 13 additions & 13 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,20 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
}
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
hashtableIterator iter;
hashtableInitIterator(&iter, zs->ht);

while ((de = dictNext(di)) != NULL) {
sds sdsele = dictGetKey(de);
double *score = dictGetVal(de);
const int len = fpconv_dtoa(*score, buf);
void *next;
while (hashtableNext(&iter, &next)) {
zskiplistNode *node = next;
const int len = fpconv_dtoa(node->score, buf);
buf[len] = '\0';
memset(eledigest, 0, 20);
mixDigest(eledigest, sdsele, sdslen(sdsele));
mixDigest(eledigest, node->ele, sdslen(node->ele));
mixDigest(eledigest, buf, strlen(buf));
xorDigest(digest, eledigest, 20);
}
dictReleaseIterator(di);
hashtableResetIterator(&iter);
} else {
serverPanic("Unknown sorted set encoding");
}
Expand Down Expand Up @@ -284,13 +284,11 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
* a different digest. */
void computeDatasetDigest(unsigned char *final) {
unsigned char digest[20];
robj *o;
int j;
uint32_t aux;

memset(final, 0, 20); /* Start with a clean result */

for (j = 0; j < server.dbnum; j++) {
for (int j = 0; j < server.dbnum; j++) {
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys);
Expand All @@ -300,7 +298,9 @@ void computeDatasetDigest(unsigned char *final) {
mixDigest(final, &aux, sizeof(aux));

/* Iterate this DB writing every entry */
while (kvstoreIteratorNext(kvs_it, (void **)&o)) {
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
robj *o = next;
sds key;
robj *keyobj;

Expand Down Expand Up @@ -929,7 +929,7 @@ void debugCommand(client *c) {
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST: {
zset *zs = o->ptr;
d = zs->dict;
ht = zs->ht;
} break;
case OBJ_ENCODING_HT: d = o->ptr; break;
case OBJ_ENCODING_HASHTABLE: ht = o->ptr; break;
Expand Down
117 changes: 49 additions & 68 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,55 +297,45 @@ static void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode
}
}

/* Defrag helper for sorted set.
* Update the robj pointer, defrag the skiplist struct and return the new score
* reference. We may not access oldele pointer (not even the pointer stored in
* the skiplist), as it was already freed. Newele may be null, in which case we
* only need to defrag the skiplist, but not update the obj pointer.
* When return value is non-NULL, it is the score reference that must be updated
* in the dict record. */
static double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x, *newx;
int i;
sds ele = newele ? newele : oldele;

/* find the skiplist node referring to the object that was moved,
* and all pointers that need to be updated if we'll end up moving the skiplist node. */
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward && x->level[i].forward->ele != oldele && /* make sure not to access the
->obj pointer if it matches
oldele */
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele, ele) < 0)))
x = x->level[i].forward;
/* Hashtable scan callback for sorted set. It defragments a single skiplist
* node, updates skiplist pointers, and updates the hashtable pointer to the
* node. */
static void activeDefragZsetNode(void *privdata, void *entry_ref) {
zskiplist *zsl = privdata;
zskiplistNode **node_ref = (zskiplistNode **)entry_ref;
zskiplistNode *node = *node_ref;

/* defragment node internals */
sds newsds = activeDefragSds(node->ele);
if (newsds) node->ele = newsds;

const double score = node->score;
const sds ele = node->ele;

/* find skiplist pointers that need to be updated if we end up moving the
* skiplist node. */
zskiplistNode *update[ZSKIPLIST_MAXLEVEL];
zskiplistNode *x = zsl->header;
for (int i = zsl->level - 1; i >= 0; i--) {
/* stop when we've reached the end of this level or the next node comes
* after our target in sorted order */
zskiplistNode *next = x->level[i].forward;
while (next &&
(next->score < score ||
(next->score == score && sdscmp(next->ele, ele) < 0))) {
x = next;
next = x->level[i].forward;
}
update[i] = x;
}

/* update the robj pointer inside the skip list record. */
x = x->level[0].forward;
serverAssert(x && score == x->score && x->ele == oldele);
if (newele) x->ele = newele;
/* should have arrived at intended node */
serverAssert(x->level[0].forward == node);

/* try to defrag the skiplist record itself */
newx = activeDefragAlloc(x);
if (newx) {
zslUpdateNode(zsl, x, newx, update);
return &newx->score;
}
return NULL;
}

/* Defrag helper for sorted set.
* Defrag a single dict entry key name, and corresponding skiplist struct */
static void activeDefragZsetEntry(zset *zs, dictEntry *de) {
sds newsds;
double *newscore;
sds sdsele = dictGetKey(de);
if ((newsds = activeDefragSds(sdsele))) dictSetKey(zs->dict, de, newsds);
newscore = zslDefrag(zs->zsl, *(double *)dictGetVal(de), sdsele, newsds);
if (newscore) {
dictSetVal(zs->dict, de, newscore);
zskiplistNode *newnode = activeDefragAlloc(node);
if (newnode) {
zslUpdateNode(zsl, node, newnode, update);
*node_ref = newnode; /* update hashtable pointer */
}
}

Expand Down Expand Up @@ -472,24 +462,15 @@ static long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) {
return bookmark_failed ? 1 : 0;
}

typedef struct {
zset *zs;
} scanLaterZsetData;

static void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
dictEntry *de = (dictEntry *)_de;
scanLaterZsetData *data = privdata;
activeDefragZsetEntry(data->zs, de);
static void scanLaterZsetCallback(void *privdata, void *element_ref) {
activeDefragZsetNode(privdata, element_ref);
server.stat_active_defrag_scanned++;
}

static void scanLaterZset(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST) return;
zset *zs = (zset *)ob->ptr;
dict *d = zs->dict;
scanLaterZsetData data = {zs};
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
*cursor = dictScanDefrag(d, *cursor, scanLaterZsetCallback, &defragfns, &data);
*cursor = hashtableScanDefrag(zs->ht, *cursor, scanLaterZsetCallback, zs->zsl, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

/* Used as hashtable scan callback when all we need is to defrag the hashtable
Expand Down Expand Up @@ -533,27 +514,27 @@ static void defragQuicklist(robj *ob) {
}

static void defragZsetSkiplist(robj *ob) {
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
zset *zs = (zset *)ob->ptr;

zset *newzs;
zskiplist *newzsl;
dict *newdict;
dictEntry *de;
struct zskiplistNode *newheader;
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
if ((newzs = activeDefragAlloc(zs))) ob->ptr = zs = newzs;
if ((newzsl = activeDefragAlloc(zs->zsl))) zs->zsl = newzsl;
if ((newheader = activeDefragAlloc(zs->zsl->header))) zs->zsl->header = newheader;
if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)

hashtable *newtable;
if ((newtable = hashtableDefragTables(zs->ht, activeDefragAlloc))) zs->ht = newtable;

if (hashtableSize(zs->ht) > server.active_defrag_max_scan_fields)
defragLater(ob);
else {
dictIterator *di = dictGetIterator(zs->dict);
while ((de = dictNext(di)) != NULL) {
activeDefragZsetEntry(zs, de);
}
dictReleaseIterator(di);
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(zs->ht, cursor, activeDefragZsetNode, zs->zsl, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the dict struct and tables */
if ((newdict = dictDefragTables(zs->dict))) zs->dict = newdict;
}

static void defragHash(robj *ob) {
Expand Down
6 changes: 3 additions & 3 deletions src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,9 @@ int performEvictions(void) {
kvs = db->expires;
}
int slot = kvstoreGetFairRandomHashtableIndex(kvs);
int found = kvstoreHashtableRandomEntry(kvs, slot, (void **)&valkey);
if (found) {
bestkey = objectGetKey(valkey);
void *entry;
if (kvstoreHashtableRandomEntry(kvs, slot, &entry)) {
bestkey = objectGetKey((robj *)entry);
bestdbid = j;
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/geo.c
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) {
if (maxelelen < elelen) maxelelen = elelen;
totelelen += elelen;
znode = zslInsert(zs->zsl, score, gp->member);
serverAssert(dictAdd(zs->dict, gp->member, &znode->score) == DICT_OK);
serverAssert(hashtableAdd(zs->ht, znode));
gp->member = NULL;
}

Expand Down
Loading

0 comments on commit ab627d6

Please sign in to comment.