Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Lipeng Zhu <[email protected]>
  • Loading branch information
lipzhu committed Jan 9, 2025
2 parents 9f4815a + efc4fe4 commit 658b652
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 15 deletions.
13 changes: 12 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,17 @@ else
LIBCRYPTO_LIBS=-lcrypto
endif

# only Linux has IO_URING support
ifeq ($(uname_S),Linux)
HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include <liburing.h>" > foo.c; \
$(CC) -E foo.c > /dev/null 2>&1 && echo yes; \
rm foo.c')
ifeq ($(HAS_LIBURING),yes)
FINAL_CFLAGS+= -DHAVE_LIBURING
FINAL_LIBS+= -luring
endif
endif

BUILD_NO:=0
BUILD_YES:=1
BUILD_MODULE:=2
Expand Down Expand Up @@ -416,7 +427,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o io_uring.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3188,6 +3188,7 @@ standardConfig static_configs[] = {
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("io-uring-enabled", NULL, IMMUTABLE_CONFIG, server.io_uring_enabled, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
88 changes: 88 additions & 0 deletions src/io_uring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include "io_uring.h"

#ifdef HAVE_LIBURING
#include <liburing.h>
#include <string.h>
#include "zmalloc.h"

/* io_uring instance queue depth. */
#define IO_URING_DEPTH 256

static struct io_uring *_io_uring;
static size_t io_uring_write_queue_len = 0;

/* Initialize io_uring at server startup if io_uring enabled,
* setup io_uring submission and completion. */
int initIOUring(void) {
struct io_uring_params params;
_io_uring = zmalloc(sizeof(struct io_uring));
memset(&params, 0, sizeof(params));
/* On success, io_uring_queue_init_params(3) returns 0 and _io_uring will
* point to the shared memory containing the io_uring queues.
* On failure -errno is returned. */
if (io_uring_queue_init_params(IO_URING_DEPTH, _io_uring, &params) < 0) return IO_URING_ERR;
return IO_URING_OK;
}

/* Use io_uring to handle the client write request. */
int ioUringPrepWrite(void *data, int fd, const void *buf, size_t len) {
struct io_uring_sqe *sqe = io_uring_get_sqe(_io_uring);
if (sqe == NULL) return IO_URING_ERR;
io_uring_prep_send(sqe, fd, buf, len, MSG_DONTWAIT);
io_uring_sqe_set_data(sqe, data);
io_uring_write_queue_len++;
return IO_URING_OK;
}

/* Submit requests to the submission queue and wait for completion. */
int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_handler) {
if (io_uring_submit(_io_uring) < 0) return IO_URING_ERR;
while (io_uring_write_queue_len) {
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(_io_uring, &cqe);
if (ret == 0) {
if (cqe_handler) {
void *data = io_uring_cqe_get_data(cqe);
cqe_handler(data, cqe->res);
}
io_uring_cqe_seen(_io_uring, cqe);
io_uring_write_queue_len--;
} else {
return IO_URING_ERR;
}
}
return IO_URING_OK;
}

/* Free io_uring. */
void freeIOUring(void) {
io_uring_queue_exit(_io_uring);
zfree(_io_uring);
_io_uring = NULL;
}
#else
#ifndef UNUSED
#define UNUSED(V) ((void)V)
#endif

int initIOUring(void) {
return IO_URING_ERR;
}

int ioUringPrepWrite(void *data, int fd, const void *buf, size_t len) {
UNUSED(data);
UNUSED(fd);
UNUSED(buf);
UNUSED(len);
return IO_URING_ERR;
}

int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_handler) {
UNUSED(cqe_handler);
return IO_URING_ERR;
}

void freeIOUring(void) {
}

#endif
15 changes: 15 additions & 0 deletions src/io_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef IO_URING_H
#define IO_URING_H
#include <stddef.h>

#define IO_URING_OK 0
#define IO_URING_ERR -1

typedef void (*io_uring_cqe_handler)(void *, int);

int initIOUring(void);
int ioUringPrepWrite(void *data, int fd, const void *buf, size_t len);
int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_handler);
void freeIOUring(void);

#endif /* IO_URING_H */
136 changes: 123 additions & 13 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "fmtargs.h"
#include "io_threads.h"
#include "module.h"
#include "io_uring.h"
#include <strings.h>
#include <sys/socket.h>
#include <sys/uio.h>
Expand Down Expand Up @@ -2398,6 +2399,83 @@ int processIOThreadsWriteDone(void) {
return processed;
}

/* If client is suitable to use io_uring to handle the write request. */
static inline int _canWriteUsingIOUring(client *c) {
if (server.io_uring_enabled && server.io_threads_num == 1) {
/* Currently, we only use io_uring to handle the static buffer write requests.
* If io-threads or tls is enabled, skip the io_uring. */
return connIsTLS(c->conn) == 0 && getClientType(c) != CLIENT_TYPE_REPLICA && listLength(c->reply) == 0 &&
c->bufpos > 0;
}
return 0;
}

/* Check the completed io_uring event and update the state. */
static int _checkPendingIOUringWriteState(client *c) {
/* Note that where synchronous system calls will return -1 on
* failure and set errno to the actual error value,
* io_uring never uses errno. Instead it returns the negated
* errno directly in the CQE res field. */
if (c->nwritten <= 0) {
if (c->nwritten != -EAGAIN) {
c->conn->last_errno = -(c->nwritten);
/* Don't overwrite the state of a connection that is not already
* connected, not to mess with handler callbacks. */
if (c->nwritten != -EINTR && c->conn->state == CONN_STATE_CONNECTED) c->conn->state = CONN_STATE_ERROR;
}
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Error writing to client: %s", connGetLastError(c->conn));
freeClientAsync(c);
}
return IO_URING_ERR;
}

c->sentlen += c->nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
server.stat_net_output_bytes += c->nwritten;
c->net_output_bytes += c->nwritten;

/* For clients representing masters we don't count sending data
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!c->flag.primary) c->last_interaction = server.unixtime;

return IO_URING_OK;
}

static void _postIOUringWrite(void) {
listIter li;
listNode *ln;
listRewind(server.clients_pending_write, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
listUnlinkNode(server.clients_pending_write, ln);

if (_checkPendingIOUringWriteState(c) == IO_URING_ERR) continue;
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
/* Close connection after entire reply has been sent. */
if (c->flag.close_after_reply) {
freeClientAsync(c);
continue;
}
}
/* Update client's memory usage after writing.*/
updateClientMemUsageAndBucket(c);
}
}

void setClientLastWritten(void *data, int res) {
client *c = data;
c->nwritten = res;
}

/* This function is called just before entering the event loop, in the hope
* we can just write the replies to the client output buffer without any
* need to use a syscall in order to install the writable event handler,
Expand All @@ -2417,34 +2495,66 @@ int handleClientsWithPendingWrites(void) {
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flag.pending_write = 0;
listUnlinkNode(server.clients_pending_write, ln);

/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flag.protected) continue;
if (c->flag.protected) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* Don't write to clients that are going to be closed anyway. */
if (c->flag.close_asap) continue;
if (c->flag.close_asap) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

if (!clientHasPendingReplies(c)) continue;
if (!clientHasPendingReplies(c)) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* If we can send the client to the I/O thread, let it handle the write. */
if (trySendWriteToIOThreads(c) == C_OK) continue;
if (server.io_threads_num > 1) {
listUnlinkNode(server.clients_pending_write, ln);
if (trySendWriteToIOThreads(c) == C_OK) {
continue;
}
}

/* We can't write to the client while IO operation is in progress. */
if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) continue;
if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) {
if (server.io_threads_num == 1) {
listUnlinkNode(server.clients_pending_write, ln);
}
continue;
}

processed++;
if (_canWriteUsingIOUring(c)) {
if (ioUringPrepWrite(c, c->conn->fd, c->buf + c->sentlen, c->bufpos - c->sentlen) == IO_URING_ERR) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
} else {
if (server.io_threads_num == 1) {
listUnlinkNode(server.clients_pending_write, ln);
}
/* Try to write buffers to the client socket. */
if (writeToClient(c) == C_ERR) continue;

/* Try to write buffers to the client socket. */
if (writeToClient(c) == C_ERR) continue;

/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c);
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c);
}
}
}

if (server.io_uring_enabled && server.io_threads_num == 1 && listLength(server.clients_pending_write) > 0) {
ioUringWaitWriteBarrier(setClientLastWritten);
_postIOUringWrite();
}
return processed;
}

Expand Down
8 changes: 8 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "threads_mngr.h"
#include "fmtargs.h"
#include "io_threads.h"
#include "io_uring.h"
#include "sds.h"
#include "module.h"

Expand Down Expand Up @@ -2984,6 +2985,12 @@ void initListeners(void) {
void InitServerLast(void) {
bioInit();
initIOThreads();
if (server.io_uring_enabled) {
if (initIOUring() == IO_URING_ERR) {
serverLog(LL_WARNING, "Failed to initialize io_uring.");
exit(1);
}
}
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
Expand Down Expand Up @@ -7129,6 +7136,7 @@ __attribute__((weak)) int main(int argc, char **argv) {

aeMain(server.el);
aeDeleteEventLoop(server.el);
if (server.io_uring_enabled) freeIOUring();
return 0;
}
/* The End */
3 changes: 2 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,8 @@ struct valkeyServer {
sds availability_zone; /* When run in a cloud environment we can configure the availability zone it is running in */
/* Local environment */
char *locale_collate;
char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */
char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */
int io_uring_enabled; /* If io_uring is enabled */
};

#define MAX_KEYS_BUFFER 256
Expand Down
1 change: 1 addition & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ start_server {tags {"introspection"}} {
rdma-rx-size
rdma-bind
rdma-port
io-uring-enabled
}

if {!$::tls} {
Expand Down
6 changes: 6 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2483,3 +2483,9 @@ jemalloc-bg-thread yes
# we may also use this when making decisions for replication.
#
# availability-zone "zone-name"

# If Valkey is compiled with io_uring support and liburing is installed in the
# system, then io_uring can be enabled with this config. The io_uring kernel
# interface was adopted in Linux kernel version 5.1.
#
# io-uring-enabled no

0 comments on commit 658b652

Please sign in to comment.