Skip to content

Commit b5aa749

Browse files
maciejsszmigierolegoater
authored andcommitted
thread-pool: Implement generic (non-AIO) pool support
Migration code wants to manage device data sending threads in one place. QEMU has an existing thread pool implementation, however it is limited to queuing AIO operations only and essentially has a 1:1 mapping between the current AioContext and the AIO ThreadPool in use. Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's GThreadPool. This brings a few new operations on a pool: * thread_pool_wait() operation waits until all the submitted work requests have finished. * thread_pool_set_max_threads() explicitly sets the maximum thread count in the pool. * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count in the pool to equal the number of still waiting in queue or unfinished work. Reviewed-by: Fabiano Rosas <[email protected]> Reviewed-by: Peter Xu <[email protected]> Signed-off-by: Maciej S. Szmigiero <[email protected]> Link: https://lore.kernel.org/qemu-devel/b1efaebdbea7cb7068b8fb74148777012383e12b.1741124640.git.maciej.szmigiero@oracle.com Signed-off-by: Cédric Le Goater <[email protected]>
1 parent dc67dae commit b5aa749

File tree

2 files changed

+170
-0
lines changed

2 files changed

+170
-0
lines changed

include/block/thread-pool.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,56 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
3838
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
3939
void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
4040

41+
/* ------------------------------------------- */
42+
/* Generic thread pool types and methods below */
43+
typedef struct ThreadPool ThreadPool;
44+
45+
/* Create a new thread pool. Never returns NULL. */
46+
ThreadPool *thread_pool_new(void);
47+
48+
/*
49+
* Free the thread pool.
50+
* Waits for all the previously submitted work to complete before performing
51+
* the actual freeing operation.
52+
*/
53+
void thread_pool_free(ThreadPool *pool);
54+
55+
/*
56+
* Submit a new work (task) for the pool.
57+
*
58+
* @opaque_destroy is an optional GDestroyNotify for the @opaque argument
59+
* to the work function at @func.
60+
*/
61+
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
62+
void *opaque, GDestroyNotify opaque_destroy);
63+
64+
/*
65+
* Submit a new work (task) for the pool, making sure it starts getting
66+
* processed immediately, launching a new thread for it if necessary.
67+
*
68+
* @opaque_destroy is an optional GDestroyNotify for the @opaque argument
69+
* to the work function at @func.
70+
*/
71+
void thread_pool_submit_immediate(ThreadPool *pool, ThreadPoolFunc *func,
72+
void *opaque, GDestroyNotify opaque_destroy);
73+
74+
/*
75+
* Wait for all previously submitted work to complete before returning.
76+
*
77+
* Can be used as a barrier between two sets of tasks executed on a thread
78+
* pool without destroying it or in a performance sensitive path where the
79+
* caller just wants to wait for all tasks to complete while deferring the
80+
* pool free operation for later, less performance sensitive time.
81+
*/
82+
void thread_pool_wait(ThreadPool *pool);
83+
84+
/* Set the maximum number of threads in the pool. */
85+
bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
86+
87+
/*
88+
* Adjust the maximum number of threads in the pool to give each task its
89+
* own thread (exactly one thread per task).
90+
*/
91+
bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
4192

4293
#endif

util/thread-pool.c

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,3 +374,122 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
374374
qemu_mutex_destroy(&pool->lock);
375375
g_free(pool);
376376
}
377+
378+
struct ThreadPool {
379+
GThreadPool *t;
380+
size_t cur_work;
381+
QemuMutex cur_work_lock;
382+
QemuCond all_finished_cond;
383+
};
384+
385+
typedef struct {
386+
ThreadPoolFunc *func;
387+
void *opaque;
388+
GDestroyNotify opaque_destroy;
389+
} ThreadPoolElement;
390+
391+
static void thread_pool_func(gpointer data, gpointer user_data)
392+
{
393+
ThreadPool *pool = user_data;
394+
g_autofree ThreadPoolElement *el = data;
395+
396+
el->func(el->opaque);
397+
398+
if (el->opaque_destroy) {
399+
el->opaque_destroy(el->opaque);
400+
}
401+
402+
QEMU_LOCK_GUARD(&pool->cur_work_lock);
403+
404+
assert(pool->cur_work > 0);
405+
pool->cur_work--;
406+
407+
if (pool->cur_work == 0) {
408+
qemu_cond_signal(&pool->all_finished_cond);
409+
}
410+
}
411+
412+
ThreadPool *thread_pool_new(void)
413+
{
414+
ThreadPool *pool = g_new(ThreadPool, 1);
415+
416+
pool->cur_work = 0;
417+
qemu_mutex_init(&pool->cur_work_lock);
418+
qemu_cond_init(&pool->all_finished_cond);
419+
420+
pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
421+
/*
422+
* g_thread_pool_new() can only return errors if initial thread(s)
423+
* creation fails but we ask for 0 initial threads above.
424+
*/
425+
assert(pool->t);
426+
427+
return pool;
428+
}
429+
430+
void thread_pool_free(ThreadPool *pool)
431+
{
432+
/*
433+
* With _wait = TRUE this effectively waits for all
434+
* previously submitted work to complete first.
435+
*/
436+
g_thread_pool_free(pool->t, FALSE, TRUE);
437+
438+
qemu_cond_destroy(&pool->all_finished_cond);
439+
qemu_mutex_destroy(&pool->cur_work_lock);
440+
441+
g_free(pool);
442+
}
443+
444+
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
445+
void *opaque, GDestroyNotify opaque_destroy)
446+
{
447+
ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
448+
449+
el->func = func;
450+
el->opaque = opaque;
451+
el->opaque_destroy = opaque_destroy;
452+
453+
WITH_QEMU_LOCK_GUARD(&pool->cur_work_lock) {
454+
pool->cur_work++;
455+
}
456+
457+
/*
458+
* Ignore the return value since this function can only return errors
459+
* if creation of an additional thread fails but even in this case the
460+
* provided work is still getting queued (just for the existing threads).
461+
*/
462+
g_thread_pool_push(pool->t, el, NULL);
463+
}
464+
465+
void thread_pool_submit_immediate(ThreadPool *pool, ThreadPoolFunc *func,
466+
void *opaque, GDestroyNotify opaque_destroy)
467+
{
468+
thread_pool_submit(pool, func, opaque, opaque_destroy);
469+
thread_pool_adjust_max_threads_to_work(pool);
470+
}
471+
472+
void thread_pool_wait(ThreadPool *pool)
473+
{
474+
QEMU_LOCK_GUARD(&pool->cur_work_lock);
475+
476+
while (pool->cur_work > 0) {
477+
qemu_cond_wait(&pool->all_finished_cond,
478+
&pool->cur_work_lock);
479+
}
480+
}
481+
482+
bool thread_pool_set_max_threads(ThreadPool *pool,
483+
int max_threads)
484+
{
485+
assert(max_threads > 0);
486+
487+
return g_thread_pool_set_max_threads(pool->t, max_threads, NULL);
488+
}
489+
490+
bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool)
491+
{
492+
QEMU_LOCK_GUARD(&pool->cur_work_lock);
493+
494+
return thread_pool_set_max_threads(pool, pool->cur_work);
495+
}

0 commit comments

Comments
 (0)