Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dict with new hashtable: sorted set datatype #1427

Merged
merged 6 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1889,30 +1889,29 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
}
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;

while ((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
double *score = dictGetVal(de);

hashtableIterator iter;
hashtableInitIterator(&iter, zs->ht);
void *next;
while (hashtableNext(&iter, &next)) {
zskiplistNode *node = next;
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? AOF_REWRITE_ITEMS_PER_CMD : items;

if (!rioWriteBulkCount(r, '*', 2 + cmd_items * 2) || !rioWriteBulkString(r, "ZADD", 4) ||
!rioWriteBulkObject(r, key)) {
dictReleaseIterator(di);
hashtableResetIterator(&iter);
return 0;
}
}
if (!rioWriteBulkDouble(r, *score) || !rioWriteBulkString(r, ele, sdslen(ele))) {
dictReleaseIterator(di);
sds ele = node->ele;
if (!rioWriteBulkDouble(r, node->score) || !rioWriteBulkString(r, ele, sdslen(ele))) {
hashtableResetIterator(&iter);
return 0;
}
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
dictReleaseIterator(di);
hashtableResetIterator(&iter);
} else {
serverPanic("Unknown sorted zset encoding");
}
Expand Down
44 changes: 33 additions & 11 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -1004,13 +1004,6 @@ void dictScanCallback(void *privdata, const dictEntry *de) {
if (!data->only_keys) {
val = dictGetVal(de);
}
} else if (o->type == OBJ_ZSET) {
key = sdsdup(keysds);
if (!data->only_keys) {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf, sizeof(buf), *(double *)dictGetVal(de), LD_STR_AUTO);
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in dict SCAN callback.");
}
Expand All @@ -1021,13 +1014,26 @@ void dictScanCallback(void *privdata, const dictEntry *de) {

void hashtableScanCallback(void *privdata, void *entry) {
scanData *data = (scanData *)privdata;
sds val = NULL;
sds key = NULL;

robj *o = data->o;
list *keys = data->keys;
data->sampled++;

/* currently only implemented for SET scan */
serverAssert(o && o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE);
sds key = (sds)entry; /* Specific for OBJ_SET */
/* This callback is only used for scanning elements within a key (hash
* fields, set elements, etc.) so o must be set here. */
serverAssert(o != NULL);

/* get key */
if (o->type == OBJ_SET) {
key = (sds)entry;
} else if (o->type == OBJ_ZSET) {
zskiplistNode *node = (zskiplistNode *)entry;
key = node->ele;
} else {
serverPanic("Type not handled in hashset SCAN callback.");
}

/* Filter element if it does not match the pattern. */
if (data->pattern) {
Expand All @@ -1036,7 +1042,23 @@ void hashtableScanCallback(void *privdata, void *entry) {
}
}

if (o->type == OBJ_SET) {
/* no value, key used by reference */
} else if (o->type == OBJ_ZSET) {
/* zset data is copied */
zskiplistNode *node = (zskiplistNode *)entry;
key = sdsdup(node->ele);
if (!data->only_keys) {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf, sizeof(buf), node->score, LD_STR_AUTO);
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in hashset SCAN callback.");
}

listAddNodeTail(keys, key);
if (val) listAddNodeTail(keys, val);
}

/* Try to parse a SCAN cursor stored at object 'o':
Expand Down Expand Up @@ -1184,7 +1206,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
shallow_copied_list_items = 1;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dict_table = zs->dict;
hashtable_table = zs->ht;
/* scanning ZSET allocates temporary strings even though it's a dict */
shallow_copied_list_items = 0;
}
Expand Down
26 changes: 13 additions & 13 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,20 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
}
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
hashtableIterator iter;
hashtableInitIterator(&iter, zs->ht);

while ((de = dictNext(di)) != NULL) {
sds sdsele = dictGetKey(de);
double *score = dictGetVal(de);
const int len = fpconv_dtoa(*score, buf);
void *next;
while (hashtableNext(&iter, &next)) {
zskiplistNode *node = next;
const int len = fpconv_dtoa(node->score, buf);
buf[len] = '\0';
memset(eledigest, 0, 20);
mixDigest(eledigest, sdsele, sdslen(sdsele));
mixDigest(eledigest, node->ele, sdslen(node->ele));
mixDigest(eledigest, buf, strlen(buf));
xorDigest(digest, eledigest, 20);
}
dictReleaseIterator(di);
hashtableResetIterator(&iter);
} else {
serverPanic("Unknown sorted set encoding");
}
Expand Down Expand Up @@ -284,13 +284,11 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
* a different digest. */
void computeDatasetDigest(unsigned char *final) {
unsigned char digest[20];
robj *o;
int j;
uint32_t aux;

memset(final, 0, 20); /* Start with a clean result */

for (j = 0; j < server.dbnum; j++) {
for (int j = 0; j < server.dbnum; j++) {
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys);
Expand All @@ -300,7 +298,9 @@ void computeDatasetDigest(unsigned char *final) {
mixDigest(final, &aux, sizeof(aux));

/* Iterate this DB writing every entry */
while (kvstoreIteratorNext(kvs_it, (void **)&o)) {
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
robj *o = next;
sds key;
robj *keyobj;

Expand Down Expand Up @@ -929,7 +929,7 @@ void debugCommand(client *c) {
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST: {
zset *zs = o->ptr;
d = zs->dict;
ht = zs->ht;
} break;
case OBJ_ENCODING_HT: d = o->ptr; break;
case OBJ_ENCODING_HASHTABLE: ht = o->ptr; break;
Expand Down
117 changes: 49 additions & 68 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,55 +297,45 @@ static void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode
}
}

/* Defrag helper for sorted set.
* Update the robj pointer, defrag the skiplist struct and return the new score
* reference. We may not access oldele pointer (not even the pointer stored in
* the skiplist), as it was already freed. Newele may be null, in which case we
* only need to defrag the skiplist, but not update the obj pointer.
* When return value is non-NULL, it is the score reference that must be updated
* in the dict record. */
static double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x, *newx;
int i;
sds ele = newele ? newele : oldele;

/* find the skiplist node referring to the object that was moved,
* and all pointers that need to be updated if we'll end up moving the skiplist node. */
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward && x->level[i].forward->ele != oldele && /* make sure not to access the
->obj pointer if it matches
oldele */
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele, ele) < 0)))
x = x->level[i].forward;
/* Hashtable scan callback for sorted set. It defragments a single skiplist
* node, updates skiplist pointers, and updates the hashtable pointer to the
* node. */
static void activeDefragZsetNode(void *privdata, void *entry_ref) {
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
zskiplist *zsl = privdata;
zskiplistNode **node_ref = (zskiplistNode **)entry_ref;
zskiplistNode *node = *node_ref;

/* defragment node internals */
sds newsds = activeDefragSds(node->ele);
if (newsds) node->ele = newsds;

const double score = node->score;
const sds ele = node->ele;

/* find skiplist pointers that need to be updated if we end up moving the
* skiplist node. */
zskiplistNode *update[ZSKIPLIST_MAXLEVEL];
zskiplistNode *x = zsl->header;
for (int i = zsl->level - 1; i >= 0; i--) {
/* stop when we've reached the end of this level or the next node comes
* after our target in sorted order */
zskiplistNode *next = x->level[i].forward;
while (next &&
(next->score < score ||
(next->score == score && sdscmp(next->ele, ele) < 0))) {
x = next;
next = x->level[i].forward;
}
update[i] = x;
}

/* update the robj pointer inside the skip list record. */
x = x->level[0].forward;
serverAssert(x && score == x->score && x->ele == oldele);
if (newele) x->ele = newele;
/* should have arrived at intended node */
serverAssert(x->level[0].forward == node);

/* try to defrag the skiplist record itself */
newx = activeDefragAlloc(x);
if (newx) {
zslUpdateNode(zsl, x, newx, update);
return &newx->score;
}
return NULL;
}

/* Defrag helper for sorted set.
* Defrag a single dict entry key name, and corresponding skiplist struct */
static void activeDefragZsetEntry(zset *zs, dictEntry *de) {
sds newsds;
double *newscore;
sds sdsele = dictGetKey(de);
if ((newsds = activeDefragSds(sdsele))) dictSetKey(zs->dict, de, newsds);
newscore = zslDefrag(zs->zsl, *(double *)dictGetVal(de), sdsele, newsds);
if (newscore) {
dictSetVal(zs->dict, de, newscore);
zskiplistNode *newnode = activeDefragAlloc(node);
if (newnode) {
zslUpdateNode(zsl, node, newnode, update);
*node_ref = newnode; /* update hashtable pointer */
}
}

Expand Down Expand Up @@ -472,24 +462,15 @@ static long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) {
return bookmark_failed ? 1 : 0;
}

typedef struct {
zset *zs;
} scanLaterZsetData;

static void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
dictEntry *de = (dictEntry *)_de;
scanLaterZsetData *data = privdata;
activeDefragZsetEntry(data->zs, de);
static void scanLaterZsetCallback(void *privdata, void *element_ref) {
activeDefragZsetNode(privdata, element_ref);
server.stat_active_defrag_scanned++;
}

static void scanLaterZset(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST) return;
zset *zs = (zset *)ob->ptr;
dict *d = zs->dict;
scanLaterZsetData data = {zs};
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
*cursor = dictScanDefrag(d, *cursor, scanLaterZsetCallback, &defragfns, &data);
*cursor = hashtableScanDefrag(zs->ht, *cursor, scanLaterZsetCallback, zs->zsl, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

/* Used as hashtable scan callback when all we need is to defrag the hashtable
Expand Down Expand Up @@ -533,27 +514,27 @@ static void defragQuicklist(robj *ob) {
}

static void defragZsetSkiplist(robj *ob) {
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
zset *zs = (zset *)ob->ptr;

zset *newzs;
zskiplist *newzsl;
dict *newdict;
dictEntry *de;
struct zskiplistNode *newheader;
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
if ((newzs = activeDefragAlloc(zs))) ob->ptr = zs = newzs;
if ((newzsl = activeDefragAlloc(zs->zsl))) zs->zsl = newzsl;
if ((newheader = activeDefragAlloc(zs->zsl->header))) zs->zsl->header = newheader;
if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)

hashtable *newtable;
if ((newtable = hashtableDefragTables(zs->ht, activeDefragAlloc))) zs->ht = newtable;

if (hashtableSize(zs->ht) > server.active_defrag_max_scan_fields)
defragLater(ob);
else {
dictIterator *di = dictGetIterator(zs->dict);
while ((de = dictNext(di)) != NULL) {
activeDefragZsetEntry(zs, de);
}
dictReleaseIterator(di);
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(zs->ht, cursor, activeDefragZsetNode, zs->zsl, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the dict struct and tables */
if ((newdict = dictDefragTables(zs->dict))) zs->dict = newdict;
}

static void defragHash(robj *ob) {
Expand Down
6 changes: 3 additions & 3 deletions src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,9 @@ int performEvictions(void) {
kvs = db->expires;
}
int slot = kvstoreGetFairRandomHashtableIndex(kvs);
int found = kvstoreHashtableRandomEntry(kvs, slot, (void **)&valkey);
if (found) {
bestkey = objectGetKey(valkey);
void *entry;
if (kvstoreHashtableRandomEntry(kvs, slot, &entry)) {
bestkey = objectGetKey((robj *)entry);
bestdbid = j;
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/geo.c
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) {
if (maxelelen < elelen) maxelelen = elelen;
totelelen += elelen;
znode = zslInsert(zs->zsl, score, gp->member);
serverAssert(dictAdd(zs->dict, gp->member, &znode->score) == DICT_OK);
serverAssert(hashtableAdd(zs->ht, znode));
gp->member = NULL;
}

Expand Down
Loading
Loading