diff --git a/block.c b/block.c index 8440712ca0..9c94f7f28a 100644 --- a/block.c +++ b/block.c @@ -911,10 +911,11 @@ static bool bdrv_child_cb_drained_poll(BdrvChild *child) return bdrv_drain_poll(bs, false, NULL, false); } -static void bdrv_child_cb_drained_end(BdrvChild *child) +static void bdrv_child_cb_drained_end(BdrvChild *child, + int *drained_end_counter) { BlockDriverState *bs = child->opaque; - bdrv_drained_end(bs); + bdrv_drained_end_no_poll(bs, drained_end_counter); } static void bdrv_child_cb_attach(BdrvChild *child) @@ -5923,9 +5924,11 @@ static void bdrv_attach_aio_context(BlockDriverState *bs, void bdrv_set_aio_context_ignore(BlockDriverState *bs, AioContext *new_context, GSList **ignore) { + AioContext *old_context = bdrv_get_aio_context(bs); + AioContext *current_context = qemu_get_current_aio_context(); BdrvChild *child; - if (bdrv_get_aio_context(bs) == new_context) { + if (old_context == new_context) { return; } @@ -5949,13 +5952,31 @@ void bdrv_set_aio_context_ignore(BlockDriverState *bs, bdrv_detach_aio_context(bs); - /* This function executes in the old AioContext so acquire the new one in - * case it runs in a different thread. - */ - aio_context_acquire(new_context); + /* Acquire the new context, if necessary */ + if (current_context != new_context) { + aio_context_acquire(new_context); + } + bdrv_attach_aio_context(bs, new_context); + + /* + * If this function was recursively called from + * bdrv_set_aio_context_ignore(), there may be nodes in the + * subtree that have not yet been moved to the new AioContext. + * Release the old one so bdrv_drained_end() can poll them. + */ + if (current_context != old_context) { + aio_context_release(old_context); + } + bdrv_drained_end(bs); - aio_context_release(new_context); + + if (current_context != old_context) { + aio_context_acquire(old_context); + } + if (current_context != new_context) { + aio_context_release(new_context); + } } static bool bdrv_parent_can_set_aio_context(BdrvChild *c, AioContext *ctx, diff --git a/block/block-backend.c b/block/block-backend.c index a8d160fd5d..0056b526b8 100644 --- a/block/block-backend.c +++ b/block/block-backend.c @@ -121,7 +121,7 @@ static void blk_root_inherit_options(int *child_flags, QDict *child_options, } static void blk_root_drained_begin(BdrvChild *child); static bool blk_root_drained_poll(BdrvChild *child); -static void blk_root_drained_end(BdrvChild *child); +static void blk_root_drained_end(BdrvChild *child, int *drained_end_counter); static void blk_root_change_media(BdrvChild *child, bool load); static void blk_root_resize(BdrvChild *child); @@ -1249,7 +1249,7 @@ int blk_pread_unthrottled(BlockBackend *blk, int64_t offset, uint8_t *buf, blk_root_drained_begin(blk->root); ret = blk_pread(blk, offset, buf, count); - blk_root_drained_end(blk->root); + blk_root_drained_end(blk->root, NULL); return ret; } @@ -2236,7 +2236,7 @@ static bool blk_root_drained_poll(BdrvChild *child) return !!blk->in_flight; } -static void blk_root_drained_end(BdrvChild *child) +static void blk_root_drained_end(BdrvChild *child, int *drained_end_counter) { BlockBackend *blk = child->opaque; assert(blk->quiesce_counter); diff --git a/block/io.c b/block/io.c index b0b33174d3..8f23cab82e 100644 --- a/block/io.c +++ b/block/io.c @@ -55,17 +55,26 @@ static void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore, } } -void bdrv_parent_drained_end_single(BdrvChild *c) +static void bdrv_parent_drained_end_single_no_poll(BdrvChild *c, + int *drained_end_counter) { assert(c->parent_quiesce_counter > 0); c->parent_quiesce_counter--; if (c->role->drained_end) { - c->role->drained_end(c); + c->role->drained_end(c, drained_end_counter); } } +void bdrv_parent_drained_end_single(BdrvChild *c) +{ + int drained_end_counter = 0; + bdrv_parent_drained_end_single_no_poll(c, &drained_end_counter); + BDRV_POLL_WHILE(c->bs, atomic_read(&drained_end_counter) > 0); +} + static void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore, - bool ignore_bds_parents) + bool ignore_bds_parents, + int *drained_end_counter) { BdrvChild *c, *next; @@ -73,7 +82,7 @@ static void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore, if (c == ignore || (ignore_bds_parents && c->role->parent_is_bds)) { continue; } - bdrv_parent_drained_end_single(c); + bdrv_parent_drained_end_single_no_poll(c, drained_end_counter); } } @@ -212,13 +221,11 @@ static void coroutine_fn bdrv_drain_invoke_entry(void *opaque) atomic_mb_set(&data->done, true); bdrv_dec_in_flight(bs); - if (data->drained_end_counter) { + if (!data->begin) { atomic_dec(data->drained_end_counter); } - if (data->begin || data->drained_end_counter) { - g_free(data); - } + g_free(data); } /* Recursively call BlockDriver.bdrv_co_drain_begin/end callbacks */ @@ -240,7 +247,7 @@ static void bdrv_drain_invoke(BlockDriverState *bs, bool begin, .drained_end_counter = drained_end_counter, }; - if (!begin && drained_end_counter) { + if (!begin) { atomic_inc(drained_end_counter); } @@ -249,15 +256,6 @@ static void bdrv_drain_invoke(BlockDriverState *bs, bool begin, bdrv_inc_in_flight(bs); data->co = qemu_coroutine_create(bdrv_drain_invoke_entry, data); aio_co_schedule(bdrv_get_aio_context(bs), data->co); - - /* - * TODO: Drop this and make callers pass @drained_end_counter and poll - * themselves - */ - if (!begin && !drained_end_counter) { - BDRV_POLL_WHILE(bs, !data->done); - g_free(data); - } } /* Returns true if BDRV_POLL_WHILE() should go into a blocking aio_poll() */ @@ -320,9 +318,11 @@ static void bdrv_co_drain_bh_cb(void *opaque) } bdrv_dec_in_flight(bs); if (data->begin) { + assert(!data->drained_end_counter); bdrv_do_drained_begin(bs, data->recursive, data->parent, data->ignore_bds_parents, data->poll); } else { + assert(!data->poll); bdrv_do_drained_end(bs, data->recursive, data->parent, data->ignore_bds_parents, data->drained_end_counter); @@ -438,6 +438,20 @@ void bdrv_subtree_drained_begin(BlockDriverState *bs) bdrv_do_drained_begin(bs, true, NULL, false, true); } +/** + * This function does not poll, nor must any of its recursively called + * functions. The *drained_end_counter pointee will be incremented + * once for every background operation scheduled, and decremented once + * the operation settles. Therefore, the pointer must remain valid + * until the pointee reaches 0. That implies that whoever sets up the + * pointee has to poll until it is 0. + * + * We use atomic operations to access *drained_end_counter, because + * (1) when called from bdrv_set_aio_context_ignore(), the subgraph of + * @bs may contain nodes in different AioContexts, + * (2) bdrv_drain_all_end() uses the same counter for all nodes, + * regardless of which AioContext they are in. + */ static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive, BdrvChild *parent, bool ignore_bds_parents, int *drained_end_counter) @@ -445,6 +459,8 @@ static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive, BdrvChild *child, *next; int old_quiesce_counter; + assert(drained_end_counter != NULL); + if (qemu_in_coroutine()) { bdrv_co_yield_to_drain(bs, false, recursive, parent, ignore_bds_parents, false, drained_end_counter); @@ -454,7 +470,8 @@ static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive, /* Re-enable things in child-to-parent order */ bdrv_drain_invoke(bs, false, drained_end_counter); - bdrv_parent_drained_end(bs, parent, ignore_bds_parents); + bdrv_parent_drained_end(bs, parent, ignore_bds_parents, + drained_end_counter); old_quiesce_counter = atomic_fetch_dec(&bs->quiesce_counter); if (old_quiesce_counter == 1) { @@ -473,12 +490,21 @@ static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive, void bdrv_drained_end(BlockDriverState *bs) { - bdrv_do_drained_end(bs, false, NULL, false, NULL); + int drained_end_counter = 0; + bdrv_do_drained_end(bs, false, NULL, false, &drained_end_counter); + BDRV_POLL_WHILE(bs, atomic_read(&drained_end_counter) > 0); +} + +void bdrv_drained_end_no_poll(BlockDriverState *bs, int *drained_end_counter) +{ + bdrv_do_drained_end(bs, false, NULL, false, drained_end_counter); } void bdrv_subtree_drained_end(BlockDriverState *bs) { - bdrv_do_drained_end(bs, true, NULL, false, NULL); + int drained_end_counter = 0; + bdrv_do_drained_end(bs, true, NULL, false, &drained_end_counter); + BDRV_POLL_WHILE(bs, atomic_read(&drained_end_counter) > 0); } void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent) @@ -492,11 +518,15 @@ void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent) void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent) { + int drained_end_counter = 0; int i; for (i = 0; i < old_parent->recursive_quiesce_counter; i++) { - bdrv_do_drained_end(child->bs, true, child, false, NULL); + bdrv_do_drained_end(child->bs, true, child, false, + &drained_end_counter); } + + BDRV_POLL_WHILE(child->bs, atomic_read(&drained_end_counter) > 0); } /* @@ -596,15 +626,19 @@ void bdrv_drain_all_begin(void) void bdrv_drain_all_end(void) { BlockDriverState *bs = NULL; + int drained_end_counter = 0; while ((bs = bdrv_next_all_states(bs))) { AioContext *aio_context = bdrv_get_aio_context(bs); aio_context_acquire(aio_context); - bdrv_do_drained_end(bs, false, NULL, true, NULL); + bdrv_do_drained_end(bs, false, NULL, true, &drained_end_counter); aio_context_release(aio_context); } + assert(qemu_get_current_aio_context() == qemu_get_aio_context()); + AIO_WAIT_WHILE(NULL, atomic_read(&drained_end_counter) > 0); + assert(bdrv_drain_all_count > 0); bdrv_drain_all_count--; } diff --git a/blockjob.c b/blockjob.c index 458ae76f51..20b7f557da 100644 --- a/blockjob.c +++ b/blockjob.c @@ -135,7 +135,7 @@ static bool child_job_drained_poll(BdrvChild *c) } } -static void child_job_drained_end(BdrvChild *c) +static void child_job_drained_end(BdrvChild *c, int *drained_end_counter) { BlockJob *job = c->opaque; job_resume(&job->job); diff --git a/include/block/block.h b/include/block/block.h index a81645e8a3..60f00479e0 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -612,6 +612,9 @@ void bdrv_parent_drained_begin_single(BdrvChild *c, bool poll); * bdrv_parent_drained_end_single: * * End a quiesced section for the parent of @c. + * + * This polls @bs's AioContext until all scheduled sub-drained_ends + * have settled, which may result in graph changes. */ void bdrv_parent_drained_end_single(BdrvChild *c); @@ -661,9 +664,31 @@ void bdrv_subtree_drained_begin(BlockDriverState *bs); * bdrv_drained_end: * * End a quiescent section started by bdrv_drained_begin(). + * + * This polls @bs's AioContext until all scheduled sub-drained_ends + * have settled. On one hand, that may result in graph changes. On + * the other, this requires that all involved nodes (@bs and all of + * its parents) are in the same AioContext, and that the caller has + * acquired it. + * If there are any nodes that are in different contexts from @bs, + * these contexts must not be acquired. */ void bdrv_drained_end(BlockDriverState *bs); +/** + * bdrv_drained_end_no_poll: + * + * Same as bdrv_drained_end(), but do not poll for the subgraph to + * actually become unquiesced. Therefore, no graph changes will occur + * with this function. + * + * *drained_end_counter is incremented for every background operation + * that is scheduled, and will be decremented for every operation once + * it settles. The caller must poll until it reaches 0. The counter + * should be accessed using atomic operations only. + */ +void bdrv_drained_end_no_poll(BlockDriverState *bs, int *drained_end_counter); + /** * End a quiescent section started by bdrv_subtree_drained_begin(). */ diff --git a/include/block/block_int.h b/include/block/block_int.h index f5b044fcdb..3aa1e832a8 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -664,11 +664,15 @@ struct BdrvChildRole { * These functions must not change the graph (and therefore also must not * call aio_poll(), which could change the graph indirectly). * + * If drained_end() schedules background operations, it must atomically + * increment *drained_end_counter for each such operation and atomically + * decrement it once the operation has settled. + * * Note that this can be nested. If drained_begin() was called twice, new * I/O is allowed only after drained_end() was called twice, too. */ void (*drained_begin)(BdrvChild *child); - void (*drained_end)(BdrvChild *child); + void (*drained_end)(BdrvChild *child, int *drained_end_counter); /* * Returns whether the parent has pending requests for the child. This