Skip to content

Commit

Permalink
util/event-loop-base: Introduce options to set the thread pool size
Browse files Browse the repository at this point in the history
The thread pool regulates itself: when idle, it kills threads until
empty, when in demand, it creates new threads until full. This behaviour
doesn't play well with latency sensitive workloads where the price of
creating a new thread is too high. For example, when paired with qemu's
'-mlock', or using safety features like SafeStack, creating a new thread
has been measured take multiple milliseconds.

In order to mitigate this let's introduce a new 'EventLoopBase'
property to set the thread pool size. The threads will be created during
the pool's initialization or upon updating the property's value, remain
available during its lifetime regardless of demand, and destroyed upon
freeing it. A properly characterized workload will then be able to
configure the pool to avoid any latency spikes.

Signed-off-by: Nicolas Saenz Julienne <[email protected]>
Reviewed-by: Stefan Hajnoczi <[email protected]>
Acked-by: Markus Armbruster <[email protected]>
Message-id: [email protected]
Signed-off-by: Stefan Hajnoczi <[email protected]>
  • Loading branch information
vianpl authored and stefanhaRH committed May 9, 2022
1 parent 70ac26b commit 71ad471
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 5 deletions.
23 changes: 23 additions & 0 deletions event-loop-base.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,30 @@
#include "qemu/osdep.h"
#include "qom/object_interfaces.h"
#include "qapi/error.h"
#include "block/thread-pool.h"
#include "sysemu/event-loop-base.h"

typedef struct {
const char *name;
ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */
} EventLoopBaseParamInfo;

static void event_loop_base_instance_init(Object *obj)
{
EventLoopBase *base = EVENT_LOOP_BASE(obj);

base->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
}

static EventLoopBaseParamInfo aio_max_batch_info = {
"aio-max-batch", offsetof(EventLoopBase, aio_max_batch),
};
static EventLoopBaseParamInfo thread_pool_min_info = {
"thread-pool-min", offsetof(EventLoopBase, thread_pool_min),
};
static EventLoopBaseParamInfo thread_pool_max_info = {
"thread-pool-max", offsetof(EventLoopBase, thread_pool_max),
};

static void event_loop_base_get_param(Object *obj, Visitor *v,
const char *name, void *opaque, Error **errp)
Expand Down Expand Up @@ -95,12 +109,21 @@ static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
event_loop_base_get_param,
event_loop_base_set_param,
NULL, &aio_max_batch_info);
object_class_property_add(klass, "thread-pool-min", "int",
event_loop_base_get_param,
event_loop_base_set_param,
NULL, &thread_pool_min_info);
object_class_property_add(klass, "thread-pool-max", "int",
event_loop_base_get_param,
event_loop_base_set_param,
NULL, &thread_pool_max_info);
}

static const TypeInfo event_loop_base_info = {
.name = TYPE_EVENT_LOOP_BASE,
.parent = TYPE_OBJECT,
.instance_size = sizeof(EventLoopBase),
.instance_init = event_loop_base_instance_init,
.class_size = sizeof(EventLoopBaseClass),
.class_init = event_loop_base_class_init,
.abstract = true,
Expand Down
10 changes: 10 additions & 0 deletions include/block/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ struct AioContext {
QSLIST_HEAD(, Coroutine) scheduled_coroutines;
QEMUBH *co_schedule_bh;

int thread_pool_min;
int thread_pool_max;
/* Thread pool for performing work and receiving completion callbacks.
* Has its own locking.
*/
Expand Down Expand Up @@ -769,4 +771,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
Error **errp);

/**
* aio_context_set_thread_pool_params:
* @ctx: the aio context
* @min: min number of threads to have readily available in the thread pool
* @min: max number of threads the thread pool can contain
*/
void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
int64_t max, Error **errp);
#endif
3 changes: 3 additions & 0 deletions include/block/thread-pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include "block/block.h"

#define THREAD_POOL_MAX_THREADS_DEFAULT 64

typedef int ThreadPoolFunc(void *opaque);

typedef struct ThreadPool ThreadPool;
Expand All @@ -33,5 +35,6 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
ThreadPoolFunc *func, void *arg);
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);

#endif
4 changes: 4 additions & 0 deletions include/sysemu/event-loop-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,9 @@ struct EventLoopBase {

/* AioContext AIO engine parameters */
int64_t aio_max_batch;

/* AioContext thread pool parameters */
int64_t thread_pool_min;
int64_t thread_pool_max;
};
#endif
3 changes: 3 additions & 0 deletions iothread.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
aio_context_set_aio_params(iothread->ctx,
iothread->parent_obj.aio_max_batch,
errp);

aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
base->thread_pool_max, errp);
}


Expand Down
10 changes: 9 additions & 1 deletion qapi/qom.json
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,18 @@
# 0 means that the engine will use its default.
# (default: 0)
#
# @thread-pool-min: minimum number of threads reserved in the thread pool
# (default:0)
#
# @thread-pool-max: maximum number of threads the thread pool can contain
# (default:64)
#
# Since: 7.1
##
{ 'struct': 'EventLoopBaseProperties',
'data': { '*aio-max-batch': 'int' } }
'data': { '*aio-max-batch': 'int',
'*thread-pool-min': 'int',
'*thread-pool-max': 'int' } }

##
# @IothreadProperties:
Expand Down
1 change: 1 addition & 0 deletions util/aio-posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "qemu/osdep.h"
#include "block/block.h"
#include "block/thread-pool.h"
#include "qemu/main-loop.h"
#include "qemu/rcu.h"
#include "qemu/rcu_queue.h"
Expand Down
20 changes: 20 additions & 0 deletions util/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,9 @@ AioContext *aio_context_new(Error **errp)

ctx->aio_max_batch = 0;

ctx->thread_pool_min = 0;
ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;

return ctx;
fail:
g_source_destroy(&ctx->source);
Expand Down Expand Up @@ -696,3 +699,20 @@ void qemu_set_current_aio_context(AioContext *ctx)
assert(!get_my_aiocontext());
set_my_aiocontext(ctx);
}

void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
int64_t max, Error **errp)
{

if (min > max || !max || min > INT_MAX || max > INT_MAX) {
error_setg(errp, "bad thread-pool-min/thread-pool-max values");
return;
}

ctx->thread_pool_min = min;
ctx->thread_pool_max = max;

if (ctx->thread_pool) {
thread_pool_update_params(ctx->thread_pool, ctx);
}
}
9 changes: 9 additions & 0 deletions util/main-loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "sysemu/replay.h"
#include "qemu/main-loop.h"
#include "block/aio.h"
#include "block/thread-pool.h"
#include "qemu/error-report.h"
#include "qemu/queue.h"
#include "qemu/compiler.h"
Expand Down Expand Up @@ -187,12 +188,20 @@ int qemu_init_main_loop(Error **errp)

static void main_loop_update_params(EventLoopBase *base, Error **errp)
{
ERRP_GUARD();

if (!qemu_aio_context) {
error_setg(errp, "qemu aio context not ready");
return;
}

aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp);
if (*errp) {
return;
}

aio_context_set_thread_pool_params(qemu_aio_context, base->thread_pool_min,
base->thread_pool_max, errp);
}

MainLoop *mloop;
Expand Down
55 changes: 51 additions & 4 deletions util/thread-pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ struct ThreadPool {
QemuMutex lock;
QemuCond worker_stopped;
QemuSemaphore sem;
int max_threads;
QEMUBH *new_thread_bh;

/* The following variables are only accessed from one AioContext. */
Expand All @@ -71,8 +70,27 @@ struct ThreadPool {
int new_threads; /* backlog of threads we need to create */
int pending_threads; /* threads created but not running yet */
bool stopping;
int min_threads;
int max_threads;
};

static inline bool back_to_sleep(ThreadPool *pool, int ret)
{
/*
* The semaphore timed out, we should exit the loop except when:
* - There is work to do, we raced with the signal.
* - The max threads threshold just changed, we raced with the signal.
* - The thread pool forces a minimum number of readily available threads.
*/
if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
pool->cur_threads > pool->max_threads ||
pool->cur_threads <= pool->min_threads)) {
return true;
}

return false;
}

static void *worker_thread(void *opaque)
{
ThreadPool *pool = opaque;
Expand All @@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
ret = qemu_sem_timedwait(&pool->sem, 10000);
qemu_mutex_lock(&pool->lock);
pool->idle_threads--;
} while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
if (ret == -1 || pool->stopping) {
} while (back_to_sleep(pool, ret));
if (ret == -1 || pool->stopping ||
pool->cur_threads > pool->max_threads) {
break;
}

Expand Down Expand Up @@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
}

void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
{
qemu_mutex_lock(&pool->lock);

pool->min_threads = ctx->thread_pool_min;
pool->max_threads = ctx->thread_pool_max;

/*
* We either have to:
* - Increase the number available of threads until over the min_threads
* threshold.
* - Decrease the number of available threads until under the max_threads
* threshold.
* - Do nothing. The current number of threads fall in between the min and
* max thresholds. We'll let the pool manage itself.
*/
for (int i = pool->cur_threads; i < pool->min_threads; i++) {
spawn_thread(pool);
}

for (int i = pool->cur_threads; i > pool->max_threads; i--) {
qemu_sem_post(&pool->sem);
}

qemu_mutex_unlock(&pool->lock);
}

static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
{
if (!ctx) {
Expand All @@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
qemu_mutex_init(&pool->lock);
qemu_cond_init(&pool->worker_stopped);
qemu_sem_init(&pool->sem, 0);
pool->max_threads = 64;
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);

QLIST_INIT(&pool->head);
QTAILQ_INIT(&pool->request_list);

thread_pool_update_params(pool, ctx);
}

ThreadPool *thread_pool_new(AioContext *ctx)
Expand Down

0 comments on commit 71ad471

Please sign in to comment.