diff --git a/async.c b/async.c index 5fb3fa61df..6930185e64 100644 --- a/async.c +++ b/async.c @@ -214,6 +214,7 @@ aio_ctx_finalize(GSource *source) thread_pool_free(ctx->thread_pool); aio_set_event_notifier(ctx, &ctx->notifier, NULL); event_notifier_cleanup(&ctx->notifier); + rfifolock_destroy(&ctx->lock); qemu_mutex_destroy(&ctx->bh_lock); g_array_free(ctx->pollfds, TRUE); timerlistgroup_deinit(&ctx->tlg); @@ -250,6 +251,12 @@ static void aio_timerlist_notify(void *opaque) aio_notify(opaque); } +static void aio_rfifolock_cb(void *opaque) +{ + /* Kick owner thread in case they are blocked in aio_poll() */ + aio_notify(opaque); +} + AioContext *aio_context_new(void) { AioContext *ctx; @@ -257,6 +264,7 @@ AioContext *aio_context_new(void) ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); ctx->thread_pool = NULL; qemu_mutex_init(&ctx->bh_lock); + rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx); event_notifier_init(&ctx->notifier, false); aio_set_event_notifier(ctx, &ctx->notifier, (EventNotifierHandler *) @@ -275,3 +283,13 @@ void aio_context_unref(AioContext *ctx) { g_source_unref(&ctx->source); } + +void aio_context_acquire(AioContext *ctx) +{ + rfifolock_lock(&ctx->lock); +} + +void aio_context_release(AioContext *ctx) +{ + rfifolock_unlock(&ctx->lock); +} diff --git a/include/block/aio.h b/include/block/aio.h index 2efdf416cf..a92511bd3b 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -19,6 +19,7 @@ #include "qemu/queue.h" #include "qemu/event_notifier.h" #include "qemu/thread.h" +#include "qemu/rfifolock.h" #include "qemu/timer.h" typedef struct BlockDriverAIOCB BlockDriverAIOCB; @@ -47,6 +48,9 @@ typedef void IOHandler(void *opaque); struct AioContext { GSource source; + /* Protects all fields from multi-threaded access */ + RFifoLock lock; + /* The list of registered AIO handlers */ QLIST_HEAD(, AioHandler) aio_handlers; @@ -104,6 +108,20 @@ void aio_context_ref(AioContext *ctx); */ void aio_context_unref(AioContext *ctx); +/* Take ownership of the AioContext. If the AioContext will be shared between + * threads, a thread must have ownership when calling aio_poll(). + * + * Note that multiple threads calling aio_poll() means timers, BHs, and + * callbacks may be invoked from a different thread than they were registered + * from. Therefore, code must use AioContext acquire/release or use + * fine-grained synchronization to protect shared state if other threads will + * be accessing it simultaneously. + */ +void aio_context_acquire(AioContext *ctx); + +/* Relinquish ownership of the AioContext. */ +void aio_context_release(AioContext *ctx); + /** * aio_bh_new: Allocate a new bottom half structure. * diff --git a/tests/test-aio.c b/tests/test-aio.c index 592721ed3f..56f4288ca8 100644 --- a/tests/test-aio.c +++ b/tests/test-aio.c @@ -112,6 +112,64 @@ static void test_notify(void) g_assert(!aio_poll(ctx, false)); } +typedef struct { + QemuMutex start_lock; + bool thread_acquired; +} AcquireTestData; + +static void *test_acquire_thread(void *opaque) +{ + AcquireTestData *data = opaque; + + /* Wait for other thread to let us start */ + qemu_mutex_lock(&data->start_lock); + qemu_mutex_unlock(&data->start_lock); + + aio_context_acquire(ctx); + aio_context_release(ctx); + + data->thread_acquired = true; /* success, we got here */ + + return NULL; +} + +static void dummy_notifier_read(EventNotifier *unused) +{ + g_assert(false); /* should never be invoked */ +} + +static void test_acquire(void) +{ + QemuThread thread; + EventNotifier notifier; + AcquireTestData data; + + /* Dummy event notifier ensures aio_poll() will block */ + event_notifier_init(¬ifier, false); + aio_set_event_notifier(ctx, ¬ifier, dummy_notifier_read); + g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */ + + qemu_mutex_init(&data.start_lock); + qemu_mutex_lock(&data.start_lock); + data.thread_acquired = false; + + qemu_thread_create(&thread, "test_acquire_thread", + test_acquire_thread, + &data, QEMU_THREAD_JOINABLE); + + /* Block in aio_poll(), let other thread kick us and acquire context */ + aio_context_acquire(ctx); + qemu_mutex_unlock(&data.start_lock); /* let the thread run */ + g_assert(!aio_poll(ctx, true)); + aio_context_release(ctx); + + qemu_thread_join(&thread); + aio_set_event_notifier(ctx, ¬ifier, NULL); + event_notifier_cleanup(¬ifier); + + g_assert(data.thread_acquired); +} + static void test_bh_schedule(void) { BHTestData data = { .n = 0 }; @@ -775,6 +833,7 @@ int main(int argc, char **argv) g_test_init(&argc, &argv, NULL); g_test_add_func("/aio/notify", test_notify); + g_test_add_func("/aio/acquire", test_acquire); g_test_add_func("/aio/bh/schedule", test_bh_schedule); g_test_add_func("/aio/bh/schedule10", test_bh_schedule10); g_test_add_func("/aio/bh/cancel", test_bh_cancel);