Skip to content

Commit 0fdb731

Browse files
esposemkevmw
authored andcommitted
thread-pool: use ThreadPool from the running thread
Use qemu_get_current_aio_context() where possible, since we always submit work to the current thread anyways. We want to also be sure that the thread submitting the work is the same as the one processing the pool, to avoid adding synchronization to the pool list. Signed-off-by: Emanuele Giuseppe Esposito <[email protected]> Message-Id: <[email protected]> Reviewed-by: Kevin Wolf <[email protected]> Reviewed-by: Stefan Hajnoczi <[email protected]> Signed-off-by: Kevin Wolf <[email protected]>
1 parent a75e4e4 commit 0fdb731

File tree

5 files changed

+21
-18
lines changed

5 files changed

+21
-18
lines changed

block/file-posix.c

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2040,11 +2040,10 @@ static int handle_aiocb_truncate(void *opaque)
20402040
return result;
20412041
}
20422042

2043-
static int coroutine_fn raw_thread_pool_submit(BlockDriverState *bs,
2044-
ThreadPoolFunc func, void *arg)
2043+
static int coroutine_fn raw_thread_pool_submit(ThreadPoolFunc func, void *arg)
20452044
{
20462045
/* @bs can be NULL, bdrv_get_aio_context() returns the main context then */
2047-
ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
2046+
ThreadPool *pool = aio_get_thread_pool(qemu_get_current_aio_context());
20482047
return thread_pool_submit_co(pool, func, arg);
20492048
}
20502049

@@ -2112,7 +2111,7 @@ static int coroutine_fn raw_co_prw(BlockDriverState *bs, uint64_t offset,
21122111
};
21132112

21142113
assert(qiov->size == bytes);
2115-
return raw_thread_pool_submit(bs, handle_aiocb_rw, &acb);
2114+
return raw_thread_pool_submit(handle_aiocb_rw, &acb);
21162115
}
21172116

21182117
static int coroutine_fn raw_co_preadv(BlockDriverState *bs, int64_t offset,
@@ -2181,7 +2180,7 @@ static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs)
21812180
return luring_co_submit(bs, s->fd, 0, NULL, QEMU_AIO_FLUSH);
21822181
}
21832182
#endif
2184-
return raw_thread_pool_submit(bs, handle_aiocb_flush, &acb);
2183+
return raw_thread_pool_submit(handle_aiocb_flush, &acb);
21852184
}
21862185

21872186
static void raw_aio_attach_aio_context(BlockDriverState *bs,
@@ -2243,7 +2242,7 @@ raw_regular_truncate(BlockDriverState *bs, int fd, int64_t offset,
22432242
},
22442243
};
22452244

2246-
return raw_thread_pool_submit(bs, handle_aiocb_truncate, &acb);
2245+
return raw_thread_pool_submit(handle_aiocb_truncate, &acb);
22472246
}
22482247

22492248
static int coroutine_fn raw_co_truncate(BlockDriverState *bs, int64_t offset,
@@ -2992,7 +2991,7 @@ raw_do_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes,
29922991
acb.aio_type |= QEMU_AIO_BLKDEV;
29932992
}
29942993

2995-
ret = raw_thread_pool_submit(bs, handle_aiocb_discard, &acb);
2994+
ret = raw_thread_pool_submit(handle_aiocb_discard, &acb);
29962995
raw_account_discard(s, bytes, ret);
29972996
return ret;
29982997
}
@@ -3067,7 +3066,7 @@ raw_do_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
30673066
handler = handle_aiocb_write_zeroes;
30683067
}
30693068

3070-
return raw_thread_pool_submit(bs, handler, &acb);
3069+
return raw_thread_pool_submit(handler, &acb);
30713070
}
30723071

30733072
static int coroutine_fn raw_co_pwrite_zeroes(
@@ -3305,7 +3304,7 @@ raw_co_copy_range_to(BlockDriverState *bs,
33053304
},
33063305
};
33073306

3308-
return raw_thread_pool_submit(bs, handle_aiocb_copy_range, &acb);
3307+
return raw_thread_pool_submit(handle_aiocb_copy_range, &acb);
33093308
}
33103309

33113310
BlockDriver bdrv_file = {
@@ -3635,7 +3634,7 @@ hdev_co_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
36353634
struct sg_io_hdr *io_hdr = buf;
36363635
if (io_hdr->cmdp[0] == PERSISTENT_RESERVE_OUT ||
36373636
io_hdr->cmdp[0] == PERSISTENT_RESERVE_IN) {
3638-
return pr_manager_execute(s->pr_mgr, bdrv_get_aio_context(bs),
3637+
return pr_manager_execute(s->pr_mgr, qemu_get_current_aio_context(),
36393638
s->fd, io_hdr);
36403639
}
36413640
}
@@ -3651,7 +3650,7 @@ hdev_co_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
36513650
},
36523651
};
36533652

3654-
return raw_thread_pool_submit(bs, handle_aiocb_ioctl, &acb);
3653+
return raw_thread_pool_submit(handle_aiocb_ioctl, &acb);
36553654
}
36563655
#endif /* linux */
36573656

block/file-win32.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
168168
acb->aio_offset = offset;
169169

170170
trace_file_paio_submit(acb, opaque, offset, count, type);
171-
pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
171+
pool = aio_get_thread_pool(qemu_get_current_aio_context());
172172
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
173173
}
174174

block/qcow2-threads.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ qcow2_co_process(BlockDriverState *bs, ThreadPoolFunc *func, void *arg)
4343
{
4444
int ret;
4545
BDRVQcow2State *s = bs->opaque;
46-
ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
46+
ThreadPool *pool = aio_get_thread_pool(qemu_get_current_aio_context());
4747

4848
qemu_co_mutex_lock(&s->lock);
4949
while (s->nb_threads >= QCOW2_MAX_THREADS) {

include/block/thread-pool.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,17 @@ typedef struct ThreadPool ThreadPool;
2929
ThreadPool *thread_pool_new(struct AioContext *ctx);
3030
void thread_pool_free(ThreadPool *pool);
3131

32+
/*
33+
* thread_pool_submit* API: submit I/O requests in the thread's
34+
* current AioContext.
35+
*/
3236
BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
3337
ThreadPoolFunc *func, void *arg,
3438
BlockCompletionFunc *cb, void *opaque);
3539
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
3640
ThreadPoolFunc *func, void *arg);
3741
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
42+
3843
void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
3944

4045
#endif

util/thread-pool.c

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ struct ThreadPoolElement {
4848
/* Access to this list is protected by lock. */
4949
QTAILQ_ENTRY(ThreadPoolElement) reqs;
5050

51-
/* Access to this list is protected by the global mutex. */
51+
/* This list is only written by the thread pool's mother thread. */
5252
QLIST_ENTRY(ThreadPoolElement) all;
5353
};
5454

@@ -175,7 +175,6 @@ static void thread_pool_completion_bh(void *opaque)
175175
ThreadPool *pool = opaque;
176176
ThreadPoolElement *elem, *next;
177177

178-
aio_context_acquire(pool->ctx);
179178
restart:
180179
QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
181180
if (elem->state != THREAD_DONE) {
@@ -195,9 +194,7 @@ static void thread_pool_completion_bh(void *opaque)
195194
*/
196195
qemu_bh_schedule(pool->completion_bh);
197196

198-
aio_context_release(pool->ctx);
199197
elem->common.cb(elem->common.opaque, elem->ret);
200-
aio_context_acquire(pool->ctx);
201198

202199
/* We can safely cancel the completion_bh here regardless of someone
203200
* else having scheduled it meanwhile because we reenter the
@@ -211,7 +208,6 @@ static void thread_pool_completion_bh(void *opaque)
211208
qemu_aio_unref(elem);
212209
}
213210
}
214-
aio_context_release(pool->ctx);
215211
}
216212

217213
static void thread_pool_cancel(BlockAIOCB *acb)
@@ -251,6 +247,9 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
251247
{
252248
ThreadPoolElement *req;
253249

250+
/* Assert that the thread submitting work is the same running the pool */
251+
assert(pool->ctx == qemu_get_current_aio_context());
252+
254253
req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
255254
req->func = func;
256255
req->arg = arg;

0 commit comments

Comments
 (0)