Skip to content

Commit

Permalink
convert ZSET from dict -> hashtable (squashed)
Browse files Browse the repository at this point in the history
Signed-off-by: Rain Valentine <[email protected]>
  • Loading branch information
SoftlyRaining committed Dec 17, 2024
1 parent ba25b58 commit 6a8ee44
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 278 deletions.
20 changes: 9 additions & 11 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1888,30 +1888,28 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
}
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;

while ((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
double *score = dictGetVal(de);

hashtableIterator iter;
hashtableInitIterator(&iter, zs->ht);
zskiplistNode *node;
while (hashtableNext(&iter, (void **)&node)) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? AOF_REWRITE_ITEMS_PER_CMD : items;

if (!rioWriteBulkCount(r, '*', 2 + cmd_items * 2) || !rioWriteBulkString(r, "ZADD", 4) ||
!rioWriteBulkObject(r, key)) {
dictReleaseIterator(di);
hashtableResetIterator(&iter);
return 0;
}
}
if (!rioWriteBulkDouble(r, *score) || !rioWriteBulkString(r, ele, sdslen(ele))) {
dictReleaseIterator(di);
sds ele = node->ele;
if (!rioWriteBulkDouble(r, node->score) || !rioWriteBulkString(r, ele, sdslen(ele))) {
hashtableResetIterator(&iter);
return 0;
}
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
dictReleaseIterator(di);
hashtableResetIterator(&iter);
} else {
serverPanic("Unknown sorted zset encoding");
}
Expand Down
44 changes: 33 additions & 11 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -1003,13 +1003,6 @@ void dictScanCallback(void *privdata, const dictEntry *de) {
if (!data->only_keys) {
val = dictGetVal(de);
}
} else if (o->type == OBJ_ZSET) {
key = sdsdup(keysds);
if (!data->only_keys) {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf, sizeof(buf), *(double *)dictGetVal(de), LD_STR_AUTO);
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in dict SCAN callback.");
}
Expand All @@ -1020,13 +1013,26 @@ void dictScanCallback(void *privdata, const dictEntry *de) {

void hashtableScanCallback(void *privdata, void *entry) {
scanData *data = (scanData *)privdata;
sds val = NULL;
sds key = NULL;

robj *o = data->o;
list *keys = data->keys;
data->sampled++;

/* currently only implemented for SET scan */
serverAssert(o && o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE);
sds key = (sds)entry; /* Specific for OBJ_SET */
/* This callback is only used for scanning elements within a key (hash
* fields, set elements, etc.) so o must be set here. */
serverAssert(o != NULL);

/* get key */
if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE) {
key = (sds)entry;
} else if (o->type == OBJ_ZSET) {
zskiplistNode *node = (zskiplistNode *)entry;
key = node->ele;
} else {
serverPanic("Type not handled in hashset SCAN callback.");
}

/* Filter element if it does not match the pattern. */
if (data->pattern) {
Expand All @@ -1035,7 +1041,23 @@ void hashtableScanCallback(void *privdata, void *entry) {
}
}

if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE) {
/* no value, key used by reference */
} else if (o->type == OBJ_ZSET) {
/* zset data is copied */
zskiplistNode *node = (zskiplistNode *)entry;
key = sdsdup(node->ele);
if (!data->only_keys) {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf, sizeof(buf), node->score, LD_STR_AUTO);
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in hashset SCAN callback.");
}

listAddNodeTail(keys, key);
if (val) listAddNodeTail(keys, val);
}

/* Try to parse a SCAN cursor stored at object 'o':
Expand Down Expand Up @@ -1183,7 +1205,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
shallow_copied_list_items = 1;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dict_table = zs->dict;
hashtable_table = zs->ht;
/* scanning ZSET allocates temporary strings even though it's a dict */
shallow_copied_list_items = 0;
}
Expand Down
17 changes: 8 additions & 9 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,20 +205,19 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
}
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
hashtableIterator iter;
hashtableInitIterator(&iter, zs->ht);

while ((de = dictNext(di)) != NULL) {
sds sdsele = dictGetKey(de);
double *score = dictGetVal(de);
const int len = fpconv_dtoa(*score, buf);
zskiplistNode *node;
while (hashtableNext(&iter, (void **)&node)) {
const int len = fpconv_dtoa(node->score, buf);
buf[len] = '\0';
memset(eledigest, 0, 20);
mixDigest(eledigest, sdsele, sdslen(sdsele));
mixDigest(eledigest, node->ele, sdslen(node->ele));
mixDigest(eledigest, buf, strlen(buf));
xorDigest(digest, eledigest, 20);
}
dictReleaseIterator(di);
hashtableResetIterator(&iter);
} else {
serverPanic("Unknown sorted set encoding");
}
Expand Down Expand Up @@ -928,7 +927,7 @@ void debugCommand(client *c) {
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST: {
zset *zs = o->ptr;
d = zs->dict;
ht = zs->ht;
} break;
case OBJ_ENCODING_HT: d = o->ptr; break;
case OBJ_ENCODING_HASHTABLE: ht = o->ptr; break;
Expand Down
109 changes: 46 additions & 63 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,54 +297,46 @@ static void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode
}

/* Defrag helper for sorted set.
* Update the robj pointer, defrag the skiplist struct and return the new score
* reference. We may not access oldele pointer (not even the pointer stored in
* the skiplist), as it was already freed. Newele may be null, in which case we
* only need to defrag the skiplist, but not update the obj pointer.
* When return value is non-NULL, it is the score reference that must be updated
* in the dict record. */
static double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x, *newx;
int i;
sds ele = newele ? newele : oldele;

/* find the skiplist node referring to the object that was moved,
* and all pointers that need to be updated if we'll end up moving the skiplist node. */
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward && x->level[i].forward->ele != oldele && /* make sure not to access the
->obj pointer if it matches
oldele */
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele, ele) < 0)))
* Defragment a single skiplist node, update skiplist pointers, and update the
* hashtable pointer to the node */
static void activeDefragZsetNode(void *privdata, void *entry_ref) {
zskiplist *zsl = privdata;
zskiplistNode **node_ref = (zskiplistNode **)entry_ref;

/* defragment node internals */
sds newsds = activeDefragSds((*node_ref)->ele);
if (newsds) (*node_ref)->ele = newsds;

const double score = (*node_ref)->score;
const sds ele = (*node_ref)->ele;

/* find skiplist pointers that need to be updated if we end up moving the
* skiplist node. */
zskiplistNode *update[ZSKIPLIST_MAXLEVEL];
zskiplistNode *x = zsl->header;
for (int i = zsl->level - 1; i >= 0; i--) {
while (1) {
/* stop when we've reached the end of this level or the next node
* comes after our target in sorted order */
zskiplistNode *next = x->level[i].forward;
if (!next) break;
if (next->score > score) break;
if (next->score == score && sdscmp(next->ele, ele) >= 0) {
break;
}
x = x->level[i].forward;
}
update[i] = x;
}

/* update the robj pointer inside the skip list record. */
x = x->level[0].forward;
serverAssert(x && score == x->score && x->ele == oldele);
if (newele) x->ele = newele;
/* should have arrived at intended node */
serverAssert(x == *node_ref);

/* try to defrag the skiplist record itself */
newx = activeDefragAlloc(x);
zskiplistNode *newx = activeDefragAlloc(x);
if (newx) {
zslUpdateNode(zsl, x, newx, update);
return &newx->score;
}
return NULL;
}

/* Defrag helper for sorted set.
* Defrag a single dict entry key name, and corresponding skiplist struct */
static void activeDefragZsetEntry(zset *zs, dictEntry *de) {
sds newsds;
double *newscore;
sds sdsele = dictGetKey(de);
if ((newsds = activeDefragSds(sdsele))) dictSetKey(zs->dict, de, newsds);
newscore = zslDefrag(zs->zsl, *(double *)dictGetVal(de), sdsele, newsds);
if (newscore) {
dictSetVal(zs->dict, de, newscore);
*node_ref = newx; /* update hashtable pointer */
}
}

Expand Down Expand Up @@ -471,24 +463,15 @@ static long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) {
return bookmark_failed ? 1 : 0;
}

typedef struct {
zset *zs;
} scanLaterZsetData;

static void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
dictEntry *de = (dictEntry *)_de;
scanLaterZsetData *data = privdata;
activeDefragZsetEntry(data->zs, de);
static void scanLaterZsetCallback(void *privdata, void *element_ref) {
activeDefragZsetNode(privdata, element_ref);
server.stat_active_defrag_scanned++;
}

static void scanLaterZset(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST) return;
zset *zs = (zset *)ob->ptr;
dict *d = zs->dict;
scanLaterZsetData data = {zs};
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
*cursor = dictScanDefrag(d, *cursor, scanLaterZsetCallback, &defragfns, &data);
*cursor = hashtableScanDefrag(zs->ht, *cursor, scanLaterZsetCallback, zs->zsl, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

/* Used as hashtable scan callback when all we need is to defrag the hashtable
Expand Down Expand Up @@ -532,27 +515,27 @@ static void defragQuicklist(robj *ob) {
}

static void defragZsetSkiplist(robj *ob) {
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
zset *zs = (zset *)ob->ptr;

zset *newzs;
zskiplist *newzsl;
dict *newdict;
dictEntry *de;
struct zskiplistNode *newheader;
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
if ((newzs = activeDefragAlloc(zs))) ob->ptr = zs = newzs;
if ((newzsl = activeDefragAlloc(zs->zsl))) zs->zsl = newzsl;
if ((newheader = activeDefragAlloc(zs->zsl->header))) zs->zsl->header = newheader;
if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)

hashtable *newtable;
if ((newtable = hashtableDefragTables(zs->ht, activeDefragAlloc))) zs->ht = newtable;

if (hashtableSize(zs->ht) > server.active_defrag_max_scan_fields)
defragLater(ob);
else {
dictIterator *di = dictGetIterator(zs->dict);
while ((de = dictNext(di)) != NULL) {
activeDefragZsetEntry(zs, de);
}
dictReleaseIterator(di);
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(zs->ht, cursor, activeDefragZsetNode, zs->zsl, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the dict struct and tables */
if ((newdict = dictDefragTables(zs->dict))) zs->dict = newdict;
}

static void defragHash(robj *ob) {
Expand Down
2 changes: 1 addition & 1 deletion src/geo.c
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) {
if (maxelelen < elelen) maxelelen = elelen;
totelelen += elelen;
znode = zslInsert(zs->zsl, score, gp->member);
serverAssert(dictAdd(zs->dict, gp->member, &znode->score) == DICT_OK);
serverAssert(hashtableAdd(zs->ht, znode));
gp->member = NULL;
}

Expand Down
26 changes: 18 additions & 8 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -11023,12 +11023,10 @@ static void moduleScanKeyDictCallback(void *privdata, const dictEntry *de) {
robj *o = data->key->value;
robj *field = createStringObject(key, sdslen(key));
robj *value = NULL;

if (o->type == OBJ_HASH) {
sds val = dictGetVal(de);
value = createStringObject(val, sdslen(val));
} else if (o->type == OBJ_ZSET) {
double *val = (double *)dictGetVal(de);
value = createStringObjectFromLongDouble(*val, 0);
} else {
serverPanic("unexpected object type");
}
Expand All @@ -11041,12 +11039,24 @@ static void moduleScanKeyDictCallback(void *privdata, const dictEntry *de) {
static void moduleScanKeyHashtableCallback(void *privdata, void *entry) {
ScanKeyCBData *data = privdata;
robj *o = data->key->value;
serverAssert(o->type == OBJ_SET);
sds key = entry;
robj *field = createStringObject(key, sdslen(key));
robj *value = NULL;
sds key = NULL;

if (o->type == OBJ_SET) {
key = entry;
/* no value */
} else if (o->type == OBJ_ZSET) {
zskiplistNode *node = (zskiplistNode *)entry;
key = node->ele;
value = createStringObjectFromLongDouble(node->score, 0);
} else {
serverPanic("unexpected object type");
}
robj * field = createStringObject(key, sdslen(key));

data->fn(data->key, field, NULL, data->user_data);
data->fn(data->key, field, value, data->user_data);
decrRefCount(field);
if (value) decrRefCount(value);
}

/* Scan api that allows a module to scan the elements in a hash, set or sorted set key
Expand Down Expand Up @@ -11110,7 +11120,7 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul
} else if (o->type == OBJ_HASH) {
if (o->encoding == OBJ_ENCODING_HT) d = o->ptr;
} else if (o->type == OBJ_ZSET) {
if (o->encoding == OBJ_ENCODING_SKIPLIST) d = ((zset *)o->ptr)->dict;
if (o->encoding == OBJ_ENCODING_SKIPLIST) ht = ((zset *)o->ptr)->ht;
} else {
errno = EINVAL;
return 0;
Expand Down
Loading

0 comments on commit 6a8ee44

Please sign in to comment.