Skip to content

Commit 9169ba5

Browse files
committed
Implement RESET in async API: Cancel subscriptions and monitor mode
Additionally, accept commands in monitor mode. (For example the RESET command, but also other commands.) Apart from being useful but itself, this change makes the async API's reply queue stay in sync (mapping each reply to the callback given when the command was sent) when bombed with random commands (fuzzing).
1 parent 5468136 commit 9169ba5

File tree

5 files changed

+188
-17
lines changed

5 files changed

+188
-17
lines changed

async.c

+70-13
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@
6161
#define PUBSUB_REPLY_PATTERN 16
6262
#define PUBSUB_REPLY_SHARDED 32
6363

64+
/* Special negative values for a callback's `pending_replies` fields. */
65+
#define PENDING_REPLY_UNSUBSCRIBE_ALL -1
66+
#define PENDING_REPLY_MONITOR -2
67+
#define PENDING_REPLY_RESET -3
68+
6469
/* Forward declarations of hiredis.c functions */
6570
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
6671
void __redisSetError(redisContext *c, int type, const char *str);
@@ -172,6 +177,7 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
172177
ac->sub.patterns = patterns;
173178
ac->sub.shard_channels = shard_channels;
174179
ac->sub.pending_commands = 0;
180+
ac->monitor_cb = NULL;
175181

176182
return ac;
177183
oom:
@@ -420,6 +426,10 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
420426
dictRelease(ac->sub.shard_channels);
421427
}
422428

429+
if (ac->monitor_cb != NULL) {
430+
callbackDecrRefCount(ac, ac->monitor_cb);
431+
}
432+
423433
/* Signal event lib to clean up */
424434
_EL_CLEANUP(ac);
425435

@@ -584,7 +594,9 @@ static int handlePubsubReply(redisAsyncContext *ac, redisReply *reply,
584594
/* If we've unsubscribed to the last channel, the command is done. */
585595
/* Check if this was the last channel unsubscribed. */
586596
assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
587-
if (cb->pending_replies == -1 && reply->element[2]->integer == 0) {
597+
if (cb->pending_replies == PENDING_REPLY_UNSUBSCRIBE_ALL &&
598+
reply->element[2]->integer == 0)
599+
{
588600
cb->pending_replies = 0;
589601
}
590602

@@ -621,12 +633,28 @@ static int handlePubsubReply(redisAsyncContext *ac, redisReply *reply,
621633
return REDIS_ERR;
622634
}
623635

636+
/* Handle the effects of the RESET command. */
637+
static void handleReset(redisAsyncContext *ac) {
638+
/* Cancel monitoring mode */
639+
ac->c.flags &= ~REDIS_MONITORING;
640+
if (ac->monitor_cb != NULL) {
641+
callbackDecrRefCount(ac, ac->monitor_cb);
642+
ac->monitor_cb = NULL;
643+
}
644+
645+
/* Cancel subscriptions (finalizers are called if any) */
646+
ac->c.flags &= ~REDIS_SUBSCRIBED;
647+
if (ac->sub.channels) dictEmpty(ac->sub.channels);
648+
if (ac->sub.patterns) dictEmpty(ac->sub.patterns);
649+
if (ac->sub.shard_channels) dictEmpty(ac->sub.shard_channels);
650+
}
651+
624652
void redisProcessCallbacks(redisAsyncContext *ac) {
625653
redisContext *c = &(ac->c);
626-
void *reply = NULL;
654+
redisReply *reply = NULL;
627655
int status;
628656

629-
while((status = redisGetReply(c,&reply)) == REDIS_OK) {
657+
while((status = redisGetReply(c, (void**)&reply)) == REDIS_OK) {
630658
if (reply == NULL) {
631659
/* When the connection is being disconnected and there are
632660
* no more replies, this is the cue to really disconnect. */
@@ -659,6 +687,15 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
659687
continue;
660688
}
661689

690+
/* Send monitored command to monitor callback */
691+
if ((c->flags & REDIS_MONITORING) &&
692+
reply->type == REDIS_REPLY_STATUS && reply->len > 0 &&
693+
reply->str[0] >= '0' && reply->str[0] <= '9')
694+
{
695+
__redisRunCallback(ac, ac->monitor_cb, reply);
696+
continue;
697+
}
698+
662699
/* Get callback from queue which was added when the command was sent. */
663700
redisCallback *cb = NULL;
664701
if (pubsub_reply_flags & PUBSUB_REPLY_MESSAGE) {
@@ -694,15 +731,33 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
694731
handlePubsubReply(ac, reply, pubsub_reply_flags, cb);
695732
} else {
696733
/* Regular reply. This includes ERR reply for subscribe commands. */
734+
735+
/* Handle special effects of the command's reply, if any. */
736+
if (cb->pending_replies == PENDING_REPLY_RESET &&
737+
reply->type == REDIS_REPLY_STATUS &&
738+
strncmp(reply->str, "RESET", reply->len) == 0)
739+
{
740+
handleReset(ac);
741+
} else if (cb->pending_replies == PENDING_REPLY_MONITOR &&
742+
reply->type == REDIS_REPLY_STATUS &&
743+
strncmp(reply->str, "OK", reply->len) == 0)
744+
{
745+
/* Set monitor flag and callback, freeing any old callback. */
746+
c->flags |= REDIS_MONITORING;
747+
if (ac->monitor_cb != NULL) {
748+
callbackDecrRefCount(ac, ac->monitor_cb);
749+
}
750+
ac->monitor_cb = cb;
751+
callbackIncrRefCount(ac, cb);
752+
}
753+
754+
/* Invoke callback */
697755
__redisRunCallback(ac, cb, reply);
698756
cb->pending_replies = 0;
699757
}
700758

701759
if (cb != NULL) {
702-
/* If in monitor mode, repush the callback */
703-
if ((c->flags & REDIS_MONITORING) && !(c->flags & REDIS_FREEING)) {
704-
__redisPushCallback(&ac->replies, cb);
705-
} else if (cb->pending_replies != 0) {
760+
if (cb->pending_replies != 0) {
706761
/* The command needs more repies. Put it first in queue. */
707762
__redisUnshiftCallback(&ac->replies, cb);
708763
} else {
@@ -939,15 +994,17 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn,
939994
cb->pending_replies++;
940995
}
941996
if (cb->pending_replies == 0) {
942-
/* No channels specified. This is unsubscribe 'all' or an error. */
943-
cb->pending_replies = -1;
997+
/* No channels specified means unsubscribe all. (This can happens
998+
* for SUBSCRIBE, but it is an error and then the value of pending
999+
* replies doesn't matter.) */
1000+
cb->pending_replies = PENDING_REPLY_UNSUBSCRIBE_ALL;
9441001
}
9451002
c->flags |= REDIS_SUBSCRIBED;
9461003
ac->sub.pending_commands++;
947-
} else if (strncasecmp(cstr,"monitor\r\n",9) == 0) {
948-
/* Set monitor flag */
949-
c->flags |= REDIS_MONITORING;
950-
cb->pending_replies = -1;
1004+
} else if (strncasecmp(cstr, "monitor", clen) == 0) {
1005+
cb->pending_replies = PENDING_REPLY_MONITOR;
1006+
} else if (strncasecmp(cstr, "reset", clen) == 0) {
1007+
cb->pending_replies = PENDING_REPLY_RESET;
9511008
}
9521009

9531010
if (__redisPushCallback(&ac->replies, cb) != REDIS_OK)

async.h

+5-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ typedef struct redisCallback {
5050
void *privdata;
5151
unsigned int refcount; /* Reference counter used when callback is used
5252
* for multiple pubsub channels. */
53-
int pending_replies; /* Number of replies expected; -1 means
54-
* unsubscribe all. */
53+
int pending_replies; /* Number of replies expected; negative values
54+
* are special. */
5555
} redisCallback;
5656

5757
/* List of callbacks for either regular replies or pub/sub */
@@ -117,6 +117,9 @@ typedef struct redisAsyncContext {
117117

118118
/* Any configured RESP3 PUSH handler */
119119
redisAsyncPushFn *push_cb;
120+
121+
/* Monitor, only when MONITOR has been called. */
122+
redisCallback *monitor_cb;
120123
} redisAsyncContext;
121124

122125
/* Functions that proxy to hiredis */

dict.c

+5
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,11 @@ static int _dictClear(dict *ht) {
246246
return DICT_OK; /* never fails */
247247
}
248248

249+
/* Delete all the keys and values */
250+
static void dictEmpty(dict *ht) {
251+
_dictClear(ht);
252+
}
253+
249254
/* Clear & Release the hash table */
250255
static void dictRelease(dict *ht) {
251256
_dictClear(ht);

dict.h

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ static int dictExpand(dict *ht, unsigned long size);
117117
static int dictAdd(dict *ht, void *key, void *val);
118118
static int dictReplace(dict *ht, void *key, void *val);
119119
static int dictDelete(dict *ht, const void *key);
120+
static void dictEmpty(dict *ht);
120121
static void dictRelease(dict *ht);
121122
static dictEntry * dictFind(dict *ht, const void *key);
122123
static void dictInitIterator(dictIterator *iter, dict *ht);

test.c

+107-2
Original file line numberDiff line numberDiff line change
@@ -2023,6 +2023,108 @@ static void test_monitor(struct config config) {
20232023
/* Verify test checkpoints */
20242024
assert(state.checkpoint == 3);
20252025
}
2026+
2027+
/* Reset callback for test_reset() */
2028+
static void reset_reset_cb(redisAsyncContext *ac, void *r, void *privdata) {
2029+
redisReply *reply = r;
2030+
TestState *state = privdata;
2031+
assert(reply != NULL && reply->elements == 2);
2032+
char *str = reply->element[0]->str;
2033+
size_t len = reply->element[0]->len;
2034+
assert(strncmp(str, "RESET", len) == 0);
2035+
state->checkpoint++;
2036+
/* Check that when the RESET callback is called, the context has already
2037+
* been reset. Monitor and pubsub have been cancelled. */
2038+
assert(!(ac->c.flags & REDIS_SUBSCRIBED));
2039+
assert(!(ac->c.flags & REDIS_MONITORING));
2040+
event_base_loopbreak(base);
2041+
}
2042+
2043+
/* Ping callback for test_reset() */
2044+
static void reset_ping_cb(redisAsyncContext *ac, void *r, void *privdata) {
2045+
redisReply *reply = r;
2046+
TestState *state = privdata;
2047+
assert(reply != NULL && reply->elements == 2);
2048+
char *str = reply->element[0]->str;
2049+
size_t len = reply->element[0]->len;
2050+
assert(strncmp(str, "pong", len) == 0);
2051+
state->checkpoint++;
2052+
redisAsyncCommandWithFinalizer(ac, reset_reset_cb, finalizer_cb, &state,
2053+
"reset");
2054+
}
2055+
2056+
/* Subscribe callback for test_reset() */
2057+
static void reset_subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
2058+
redisReply *reply = r;
2059+
TestState *state = privdata;
2060+
assert(reply != NULL &&
2061+
reply->type == REDIS_REPLY_ARRAY &&
2062+
reply->elements > 0);
2063+
char *str = reply->element[0]->str;
2064+
size_t len = reply->element[0]->len;
2065+
assert(strncmp(str, "subscribe", len) == 0);
2066+
state->checkpoint++;
2067+
redisAsyncCommandWithFinalizer(ac, reset_subscribe_cb,
2068+
finalizer_cb, &state, "ping");
2069+
}
2070+
2071+
/* Monitor callback for test_reset(). */
2072+
static void reset_monitor_cb(redisAsyncContext *ac, void *r, void *privdata) {
2073+
redisReply *reply = r;
2074+
TestState *state = privdata;
2075+
assert(reply != NULL && reply->type == REDIS_REPLY_STATUS);
2076+
state->checkpoint++;
2077+
if (strncmp(reply->str, "OK", reply->len) == 0) {
2078+
/* Reply to the MONITOR command */
2079+
redisAsyncCommandWithFinalizer(ac, reset_subscribe_cb, finalizer_cb, &state,
2080+
"subscribe %s", "ch");
2081+
} else {
2082+
/* A monitored command starts with a numeric timestamp, e.g.
2083+
* +1689801837.986559 [0 127.0.0.1:44308] "ping" */
2084+
assert(reply->str[0] >= '0' && reply->str[0] <= '9');
2085+
}
2086+
}
2087+
2088+
/* Check that RESET cancels all subscriptions and monitoring (Redis >= 6.2) */
2089+
static void test_reset(struct config config) {
2090+
test("RESET cancels monitoring and pubsub: ");
2091+
/* Setup event dispatcher with a testcase timeout */
2092+
base = event_base_new();
2093+
struct event *timeout = evtimer_new(base, timeout_cb, NULL);
2094+
assert(timeout != NULL);
2095+
2096+
evtimer_assign(timeout,base,timeout_cb,NULL);
2097+
struct timeval timeout_tv = {.tv_sec = 10};
2098+
evtimer_add(timeout, &timeout_tv);
2099+
2100+
/* Connect */
2101+
redisOptions options = get_redis_tcp_options(config);
2102+
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
2103+
assert(ac != NULL && ac->err == 0);
2104+
redisLibeventAttach(ac,base);
2105+
2106+
/* Not expecting any push messages in this test */
2107+
redisAsyncSetPushCallback(ac,unexpected_push_cb);
2108+
2109+
/* Start monitor */
2110+
TestState state = {.options = &options};
2111+
redisAsyncCommandWithFinalizer(ac, reset_monitor_cb, finalizer_cb, &state,
2112+
"monitor");
2113+
2114+
/* Start event dispatching loop */
2115+
test_cond(event_base_dispatch(base) == 0);
2116+
event_free(timeout);
2117+
event_base_free(base);
2118+
2119+
/* Verify test checkpoint sum.
2120+
*
2121+
* Replies for monitor, subscribe, ping and reset = 4
2122+
* Monitored subscribe and ping = 2
2123+
* Finalizer for monitor, subscribe, ping, reset = 4
2124+
* Sum: 4 + 2 + 4 = 10 */
2125+
assert(state.checkpoint == 10);
2126+
}
2127+
20262128
#endif /* HIREDIS_TEST_ASYNC */
20272129

20282130
/* tests for async api using polling adapter, requires no extra libraries*/
@@ -2394,9 +2496,9 @@ int main(int argc, char **argv) {
23942496
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
23952497
cfg.type = CONN_TCP;
23962498

2397-
int major;
2499+
int major, minor;
23982500
redisContext *c = do_connect(cfg);
2399-
get_redis_version(c, &major, NULL);
2501+
get_redis_version(c, &major, &minor);
24002502
disconnect(c, 0);
24012503

24022504
test_pubsub_handling(cfg);
@@ -2406,6 +2508,9 @@ int main(int argc, char **argv) {
24062508
test_pubsub_handling_resp3(cfg);
24072509
test_command_timeout_during_pubsub(cfg);
24082510
}
2511+
if (major > 6 || (major == 6 && minor >= 2)) {
2512+
test_reset(cfg);
2513+
}
24092514
#endif /* HIREDIS_TEST_ASYNC */
24102515

24112516
cfg.type = CONN_TCP;

0 commit comments

Comments
 (0)