diff --git a/block/nbd.c b/block/nbd.c index 7bb881fef4..9daf003bea 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -38,6 +38,7 @@ #include "qapi/qapi-visit-sockets.h" #include "qapi/qmp/qstring.h" +#include "qapi/clone-visitor.h" #include "block/qdict.h" #include "block/nbd.h" @@ -62,6 +63,47 @@ typedef enum NBDClientState { NBD_CLIENT_QUIT } NBDClientState; +typedef enum NBDConnectThreadState { + /* No thread, no pending results */ + CONNECT_THREAD_NONE, + + /* Thread is running, no results for now */ + CONNECT_THREAD_RUNNING, + + /* + * Thread is running, but requestor exited. Thread should close + * the new socket and free the connect state on exit. + */ + CONNECT_THREAD_RUNNING_DETACHED, + + /* Thread finished, results are stored in a state */ + CONNECT_THREAD_FAIL, + CONNECT_THREAD_SUCCESS +} NBDConnectThreadState; + +typedef struct NBDConnectThread { + /* Initialization constants */ + SocketAddress *saddr; /* address to connect to */ + /* + * Bottom half to schedule on completion. Scheduled only if bh_ctx is not + * NULL + */ + QEMUBHFunc *bh_func; + void *bh_opaque; + + /* + * Result of last attempt. Valid in FAIL and SUCCESS states. + * If you want to steal error, don't forget to set pointer to NULL. + */ + QIOChannelSocket *sioc; + Error *err; + + /* state and bh_ctx are protected by mutex */ + QemuMutex mutex; + NBDConnectThreadState state; /* current state of the thread */ + AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */ +} NBDConnectThread; + typedef struct BDRVNBDState { QIOChannelSocket *sioc; /* The master data channel */ QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */ @@ -91,10 +133,17 @@ typedef struct BDRVNBDState { QCryptoTLSCreds *tlscreds; const char *hostname; char *x_dirty_bitmap; + + bool wait_connect; + NBDConnectThread *connect_thread; } BDRVNBDState; static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr, Error **errp); +static QIOChannelSocket *nbd_co_establish_connection(BlockDriverState *bs, + Error **errp); +static void nbd_co_establish_connection_cancel(BlockDriverState *bs, + bool detach); static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc, Error **errp); @@ -191,6 +240,8 @@ static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs) if (s->connection_co_sleep_ns_state) { qemu_co_sleep_wake(s->connection_co_sleep_ns_state); } + + nbd_co_establish_connection_cancel(bs, false); } static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs) @@ -223,6 +274,7 @@ static void nbd_teardown_connection(BlockDriverState *bs) if (s->connection_co_sleep_ns_state) { qemu_co_sleep_wake(s->connection_co_sleep_ns_state); } + nbd_co_establish_connection_cancel(bs, true); } if (qemu_in_coroutine()) { s->teardown_co = qemu_coroutine_self(); @@ -246,6 +298,216 @@ static bool nbd_client_connecting_wait(BDRVNBDState *s) return s->state == NBD_CLIENT_CONNECTING_WAIT; } +static void connect_bh(void *opaque) +{ + BDRVNBDState *state = opaque; + + assert(state->wait_connect); + state->wait_connect = false; + aio_co_wake(state->connection_co); +} + +static void nbd_init_connect_thread(BDRVNBDState *s) +{ + s->connect_thread = g_new(NBDConnectThread, 1); + + *s->connect_thread = (NBDConnectThread) { + .saddr = QAPI_CLONE(SocketAddress, s->saddr), + .state = CONNECT_THREAD_NONE, + .bh_func = connect_bh, + .bh_opaque = s, + }; + + qemu_mutex_init(&s->connect_thread->mutex); +} + +static void nbd_free_connect_thread(NBDConnectThread *thr) +{ + if (thr->sioc) { + qio_channel_close(QIO_CHANNEL(thr->sioc), NULL); + } + error_free(thr->err); + qapi_free_SocketAddress(thr->saddr); + g_free(thr); +} + +static void *connect_thread_func(void *opaque) +{ + NBDConnectThread *thr = opaque; + int ret; + bool do_free = false; + + thr->sioc = qio_channel_socket_new(); + + error_free(thr->err); + thr->err = NULL; + ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err); + if (ret < 0) { + object_unref(OBJECT(thr->sioc)); + thr->sioc = NULL; + } + + qemu_mutex_lock(&thr->mutex); + + switch (thr->state) { + case CONNECT_THREAD_RUNNING: + thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS; + if (thr->bh_ctx) { + aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque); + + /* play safe, don't reuse bh_ctx on further connection attempts */ + thr->bh_ctx = NULL; + } + break; + case CONNECT_THREAD_RUNNING_DETACHED: + do_free = true; + break; + default: + abort(); + } + + qemu_mutex_unlock(&thr->mutex); + + if (do_free) { + nbd_free_connect_thread(thr); + } + + return NULL; +} + +static QIOChannelSocket *coroutine_fn +nbd_co_establish_connection(BlockDriverState *bs, Error **errp) +{ + QemuThread thread; + BDRVNBDState *s = bs->opaque; + QIOChannelSocket *res; + NBDConnectThread *thr = s->connect_thread; + + qemu_mutex_lock(&thr->mutex); + + switch (thr->state) { + case CONNECT_THREAD_FAIL: + case CONNECT_THREAD_NONE: + error_free(thr->err); + thr->err = NULL; + thr->state = CONNECT_THREAD_RUNNING; + qemu_thread_create(&thread, "nbd-connect", + connect_thread_func, thr, QEMU_THREAD_DETACHED); + break; + case CONNECT_THREAD_SUCCESS: + /* Previous attempt finally succeeded in background */ + thr->state = CONNECT_THREAD_NONE; + res = thr->sioc; + thr->sioc = NULL; + qemu_mutex_unlock(&thr->mutex); + return res; + case CONNECT_THREAD_RUNNING: + /* Already running, will wait */ + break; + default: + abort(); + } + + thr->bh_ctx = qemu_get_current_aio_context(); + + qemu_mutex_unlock(&thr->mutex); + + + /* + * We are going to wait for connect-thread finish, but + * nbd_client_co_drain_begin() can interrupt. + * + * Note that wait_connect variable is not visible for connect-thread. It + * doesn't need mutex protection, it used only inside home aio context of + * bs. + */ + s->wait_connect = true; + qemu_coroutine_yield(); + + qemu_mutex_lock(&thr->mutex); + + switch (thr->state) { + case CONNECT_THREAD_SUCCESS: + case CONNECT_THREAD_FAIL: + thr->state = CONNECT_THREAD_NONE; + error_propagate(errp, thr->err); + thr->err = NULL; + res = thr->sioc; + thr->sioc = NULL; + break; + case CONNECT_THREAD_RUNNING: + case CONNECT_THREAD_RUNNING_DETACHED: + /* + * Obviously, drained section wants to start. Report the attempt as + * failed. Still connect thread is executing in background, and its + * result may be used for next connection attempt. + */ + res = NULL; + error_setg(errp, "Connection attempt cancelled by other operation"); + break; + + case CONNECT_THREAD_NONE: + /* + * Impossible. We've seen this thread running. So it should be + * running or at least give some results. + */ + abort(); + + default: + abort(); + } + + qemu_mutex_unlock(&thr->mutex); + + return res; +} + +/* + * nbd_co_establish_connection_cancel + * Cancel nbd_co_establish_connection asynchronously: it will finish soon, to + * allow drained section to begin. + * + * If detach is true, also cleanup the state (or if thread is running, move it + * to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if + * detach is true. + */ +static void nbd_co_establish_connection_cancel(BlockDriverState *bs, + bool detach) +{ + BDRVNBDState *s = bs->opaque; + NBDConnectThread *thr = s->connect_thread; + bool wake = false; + bool do_free = false; + + qemu_mutex_lock(&thr->mutex); + + if (thr->state == CONNECT_THREAD_RUNNING) { + /* We can cancel only in running state, when bh is not yet scheduled */ + thr->bh_ctx = NULL; + if (s->wait_connect) { + s->wait_connect = false; + wake = true; + } + if (detach) { + thr->state = CONNECT_THREAD_RUNNING_DETACHED; + s->connect_thread = NULL; + } + } else if (detach) { + do_free = true; + } + + qemu_mutex_unlock(&thr->mutex); + + if (do_free) { + nbd_free_connect_thread(thr); + s->connect_thread = NULL; + } + + if (wake) { + aio_co_wake(s->connection_co); + } +} + static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) { int ret; @@ -289,7 +551,7 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) s->ioc = NULL; } - sioc = nbd_establish_connection(s->saddr, &local_err); + sioc = nbd_co_establish_connection(s->bs, &local_err); if (!sioc) { ret = -ECONNREFUSED; goto out; @@ -1946,6 +2208,8 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, /* successfully connected */ s->state = NBD_CLIENT_CONNECTED; + nbd_init_connect_thread(s); + s->connection_co = qemu_coroutine_create(nbd_connection_entry, s); bdrv_inc_in_flight(bs); aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);