diff --git a/migration/multifd.c b/migration/multifd.c index 3db18dc79e..adfe8c9a0a 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -62,6 +62,11 @@ struct { * Make it easy for now. */ uintptr_t packet_num; + /* + * Synchronization point past which no more channels will be + * created. + */ + QemuSemaphore channels_created; /* send channels ready */ QemuSemaphore channels_ready; /* @@ -622,10 +627,6 @@ static void multifd_send_terminate_threads(void) /* * Finally recycle all the threads. - * - * TODO: p->running is still buggy, e.g. we can reach here without the - * corresponding multifd_new_send_channel_async() get invoked yet, - * then a new thread can even be created after this function returns. */ for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; @@ -670,6 +671,7 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) static void multifd_send_cleanup_state(void) { + qemu_sem_destroy(&multifd_send_state->channels_created); qemu_sem_destroy(&multifd_send_state->channels_ready); g_free(multifd_send_state->params); multifd_send_state->params = NULL; @@ -954,18 +956,26 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) if (migrate_channel_requires_tls_upgrade(ioc)) { ret = multifd_tls_channel_connect(p, ioc, &local_err); + if (ret) { + return; + } } else { ret = multifd_channel_connect(p, ioc, &local_err); } +out: + /* + * Here we're not interested whether creation succeeded, only that + * it happened at all. + */ + qemu_sem_post(&multifd_send_state->channels_created); + if (ret) { return; } -out: trace_multifd_new_send_channel_async_error(p->id, local_err); multifd_send_set_error(local_err); - multifd_send_kick_main(p); if (!p->c) { /* * If no channel has been created, drop the initial @@ -998,6 +1008,7 @@ bool multifd_send_setup(void) multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); multifd_send_state->pages = multifd_pages_init(page_count); + qemu_sem_init(&multifd_send_state->channels_created, 0); qemu_sem_init(&multifd_send_state->channels_ready, 0); qatomic_set(&multifd_send_state->exiting, 0); multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; @@ -1023,6 +1034,15 @@ bool multifd_send_setup(void) multifd_new_send_channel_create(p); } + /* + * Wait until channel creation has started for all channels. The + * creation can still fail, but no more channels will be created + * past this point. + */ + for (i = 0; i < thread_count; i++) { + qemu_sem_wait(&multifd_send_state->channels_created); + } + for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i];