Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of io_uring in wrk without batching #524

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CFLAGS += -std=c99 -Wall -O2 -D_REENTRANT
LIBS := -lm -lssl -lcrypto -lpthread
LIBS := -lm -lssl -lcrypto -lpthread -luring

TARGET := $(shell uname -s | tr '[A-Z]' '[a-z]' 2>/dev/null || echo unknown)

Expand Down
220 changes: 213 additions & 7 deletions src/wrk.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,27 @@
#include "wrk.h"
#include "script.h"
#include "main.h"
#include <liburing.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/uio.h>
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>

#define QUEUE_DEPTH 4096
#define BACKLOG 4096

static struct config {
uint64_t connections;
uint64_t duration;
uint64_t threads;
uint64_t timeout;
uint64_t pipeline;
bool use_io;
bool delay;
bool dynamic;
bool latency;
Expand Down Expand Up @@ -41,6 +55,12 @@ static void handler(int sig) {
stop = 1;
}

void *thread_io_uring_nb(void *data);
void prep_connect(struct io_uring_sqe *sqe, struct connection *conn);
void prep_send(struct io_uring_sqe *sqe, struct connection *conn);
void prep_read(struct io_uring_sqe *sqe, struct connection *conn);
void initialize_connection(struct connection *conn, struct io_uring *ring);

static void usage() {
printf("Usage: wrk <options> <url> \n"
" Options: \n"
Expand All @@ -53,6 +73,7 @@ static void usage() {
" --latency Print latency statistics \n"
" --timeout <T> Socket/request timeout \n"
" -v, --version Print version details \n"
" -i, --io_uring Enable io_uring \n"
" \n"
" Numeric arguments may include a SI unit (1k, 1M, 1G)\n"
" Time arguments may include a time unit (2s, 2m, 2h)\n");
Expand Down Expand Up @@ -105,7 +126,7 @@ int main(int argc, char **argv) {
thread *t = &threads[i];
t->loop = aeCreateEventLoop(10 + cfg.connections * 3);
t->connections = cfg.connections / cfg.threads;

t->port = atoi(port);
t->L = script_create(cfg.script, url, headers);
script_init(L, t, argc - optind, &argv[optind]);

Expand All @@ -118,9 +139,13 @@ int main(int argc, char **argv) {
parser_settings.on_header_value = header_value;
parser_settings.on_body = response_body;
}
}
}

if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) {
if(cfg.use_io) {
printf("Using io_uring!\n");
pthread_create(&t->thread, NULL, &thread_io_uring_nb, t);

} else if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) {
char *msg = strerror(errno);
fprintf(stderr, "unable to create thread %"PRIu64": %s\n", i, msg);
exit(2);
Expand Down Expand Up @@ -199,6 +224,172 @@ int main(int argc, char **argv) {
return 0;
}

void *thread_io_uring_nb(void *arg) {

thread *thread = arg;
struct io_uring ring;
if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
perror("Failed to initialize io_uring");
return NULL;
}

char *request = NULL;
size_t length = 0;

if (!cfg.dynamic) {
script_request(thread->L, &request, &length);
}

thread->cs = zcalloc(thread->connections * sizeof(connection));
connection *c = thread->cs;

for (uint64_t i = 0; i < thread->connections; i++, c++) {
c->thread = thread;
c->ssl = cfg.ctx ? SSL_new(cfg.ctx) : NULL;
c->request = request;
c->length = length;
c->delayed = cfg.delay;

int sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (sockfd == -1) {
perror("Failed to create socket");
continue;
}
c->fd = sockfd;
c->state = CONNECT;
c->addr.sin_family = AF_INET;
c->addr.sin_port = htons(c->thread->port);
inet_pton(AF_INET, cfg.host, &(c->addr.sin_addr));
c->parser.data = c;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
prep_connect(sqe, c);

}
io_uring_submit(&ring);

clock_t start_time = clock();
thread->start = time_us();
uint64_t last_record_time = time_us();

while(1) {
for(int k = 0 ; k < cfg.connections; k++) {
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
struct connection *conn = io_uring_cqe_get_data(cqe);

switch(conn->state) {
case CONNECT:
io_uring_cqe_seen(&ring, cqe);
http_parser_init(&conn->parser, HTTP_RESPONSE);
conn->state = SEND;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
prep_send(sqe, conn);
break;
case SEND:
io_uring_cqe_seen(&ring, cqe);
sqe = io_uring_get_sqe(&ring);
prep_read(sqe, conn);
break;
case READ:
io_uring_cqe_seen(&ring, cqe);
io_uring_cqe_get_data(cqe);
size_t x = http_parser_execute(&conn->parser, &parser_settings, conn->buf, cqe->res);
if (cqe->res < 0 || x < 0)
{
fprintf(stderr, "Connection failed with error: %s\n", strerror(-cqe->res));
thread->errors.connect++;
}
else if (cqe->res == 0)
{
close(conn->fd);
initialize_connection(conn, &ring);
io_uring_submit(&ring);
int y = io_uring_wait_cqe(&ring, &cqe);
if (y < 0)
{
printf("Error waiting for CQE after re-establishing connection\n");
}
}
else if (cqe->res == RECVBUF)
{

sqe = io_uring_get_sqe(&ring);
prep_read(sqe, conn);
}
else
{
conn->state = SEND;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
prep_send(sqe, conn);
}
conn->thread->bytes += cqe->res;

break;
}

io_uring_submit(&ring);
}

uint64_t current_time = time_us();

if (current_time - last_record_time >= RECORD_INTERVAL_MS * 1000) {
if (thread->requests > 0)
{
uint64_t elapsed_ms = (current_time - thread->start) / 1000;
uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000;
stats_record(statistics.requests, requests);
thread->start = time_us();
thread->requests = 0;
}
last_record_time = current_time;
}
clock_t current_time_2 = clock();
double time_passed_in_seconds = ((double)(current_time_2 - start_time)) / CLOCKS_PER_SEC;

if (time_passed_in_seconds >= cfg.duration)
{
break;
}
}

thread->start = time_us();
io_uring_queue_exit(&ring);
return NULL;
}

void prep_connect(struct io_uring_sqe *sqe, struct connection *conn) {
io_uring_prep_connect(sqe, conn->fd, (struct sockaddr *)&conn->addr, sizeof(conn->addr));
io_uring_sqe_set_data(sqe, conn);
}

void prep_send(struct io_uring_sqe *sqe, struct connection *conn) {
if(!conn->written) {
conn->start = time_us();
conn->pending = cfg.pipeline;
}
conn->pending = cfg.pipeline;
const char *msg = "GET / HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n";
io_uring_prep_send(sqe, conn->fd, msg, strlen(msg), MSG_DONTWAIT);
io_uring_sqe_set_data(sqe, conn);
}

void prep_read(struct io_uring_sqe *sqe, struct connection *conn) {
conn->state = READ;
memset(conn->buf, 0, sizeof(conn->buf));
io_uring_prep_read(sqe, conn->fd, conn->buf, RECVBUF, 0);
io_uring_sqe_set_data(sqe, conn);
}

void initialize_connection(struct connection *conn, struct io_uring *ring) {
conn->fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
conn->state = CONNECT;
conn->addr.sin_family = AF_INET;
conn->addr.sin_port = htons(conn->thread->port);
inet_pton(AF_INET, cfg.host, &(conn->addr.sin_addr));
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
prep_connect(sqe, conn);
}

void *thread_main(void *arg) {
thread *thread = arg;

Expand Down Expand Up @@ -326,15 +517,23 @@ static int response_complete(http_parser *parser) {
thread *thread = c->thread;
uint64_t now = time_us();
int status = parser->status_code;

thread->complete++;
thread->requests++;
if(cfg.use_io) {
thread->complete++;
thread->requests++;
if (--c->pending == 0) {
if (!stats_record(statistics.latency, now - c->start)) {
thread->errors.timeout++;
}
}
return 0;
}

if (status > 399) {
thread->errors.status++;
}

if (c->headers.buffer) {
printf("Here!!\n");
*c->headers.cursor++ = '\0';
script_response(thread->L, status, &c->headers, &c->body);
c->state = FIELD;
Expand All @@ -348,6 +547,9 @@ static int response_complete(http_parser *parser) {
aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
}

thread->complete++;
thread->requests++;

if (!http_should_keep_alive(parser)) {
reconnect_socket(thread, c);
goto done;
Expand Down Expand Up @@ -476,6 +678,7 @@ static struct option longopts[] = {
{ "timeout", required_argument, NULL, 'T' },
{ "help", no_argument, NULL, 'h' },
{ "version", no_argument, NULL, 'v' },
{ "io_uring", no_argument, NULL, 'i' },
{ NULL, 0, NULL, 0 }
};

Expand All @@ -489,7 +692,7 @@ static int parse_args(struct config *cfg, char **url, struct http_parser_url *pa
cfg->duration = 10;
cfg->timeout = SOCKET_TIMEOUT_MS;

while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrv?", longopts, NULL)) != -1) {
while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrvi?", longopts, NULL)) != -1) {
switch (c) {
case 't':
if (scan_metric(optarg, &cfg->threads)) return -1;
Expand Down Expand Up @@ -517,6 +720,9 @@ static int parse_args(struct config *cfg, char **url, struct http_parser_url *pa
printf("wrk %s [%s] ", VERSION, aeGetApiName());
printf("Copyright (C) 2012 Will Glozer\n");
break;
case 'i':
cfg->use_io = true;
break;
case 'h':
case '?':
case ':':
Expand Down
17 changes: 15 additions & 2 deletions src/wrk.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <sys/types.h>
#include <netdb.h>
#include <sys/socket.h>

#include <liburing.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <lua.h>
Expand All @@ -33,6 +33,7 @@ typedef struct {
uint64_t requests;
uint64_t bytes;
uint64_t start;
int port;
lua_State *L;
errors errors;
struct connection *cs;
Expand All @@ -44,11 +45,22 @@ typedef struct {
char *cursor;
} buffer;

typedef struct {
int start_conn_idx;
int end_conn_idx;
struct connection *connection;
int port;
char host;
uint64_t requests;
} thread_data_t;



typedef struct connection {
thread *thread;
http_parser parser;
enum {
FIELD, VALUE
FIELD, VALUE, CONNECT, READ, SEND
} state;
int fd;
SSL *ssl;
Expand All @@ -61,6 +73,7 @@ typedef struct connection {
buffer headers;
buffer body;
char buf[RECVBUF];
struct sockaddr_in addr;
} connection;

#endif /* WRK_H */