migration/colo: wrap incoming checkpoint process into new helper

Split checkpoint incoming process into a helper.

Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
Reviewed-by: Juan Quintela <quintela@redhat.com>
Signed-off-by: Juan Quintela <quintela@redhat.com>
This commit is contained in:
zhanghailiang 2020-02-24 14:54:08 +08:00 committed by Juan Quintela
parent 0306dae5ac
commit 6ad8ad38d0

View file

@ -664,13 +664,138 @@ void migrate_start_colo_process(MigrationState *s)
qemu_mutex_lock_iothread();
}
static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
Error **errp)
static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
{
uint64_t total_size;
uint64_t value;
Error *local_err = NULL;
int ret;
qemu_mutex_lock_iothread();
vm_stop_force_state(RUN_STATE_COLO);
trace_colo_vm_state_change("run", "stop");
qemu_mutex_unlock_iothread();
/* FIXME: This is unnecessary for periodic checkpoint mode */
colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
&local_err);
if (local_err) {
error_propagate(errp, local_err);
return;
}
colo_receive_check_message(mis->from_src_file,
COLO_MESSAGE_VMSTATE_SEND, &local_err);
if (local_err) {
error_propagate(errp, local_err);
return;
}
qemu_mutex_lock_iothread();
cpu_synchronize_all_pre_loadvm();
ret = qemu_loadvm_state_main(mis->from_src_file, mis);
qemu_mutex_unlock_iothread();
if (ret < 0) {
error_setg(errp, "Load VM's live state (ram) error");
return;
}
value = colo_receive_message_value(mis->from_src_file,
COLO_MESSAGE_VMSTATE_SIZE, &local_err);
if (local_err) {
error_propagate(errp, local_err);
return;
}
/*
* Read VM device state data into channel buffer,
* It's better to re-use the memory allocated.
* Here we need to handle the channel buffer directly.
*/
if (value > bioc->capacity) {
bioc->capacity = value;
bioc->data = g_realloc(bioc->data, bioc->capacity);
}
total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
if (total_size != value) {
error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
" %" PRIu64, total_size, value);
return;
}
bioc->usage = total_size;
qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
&local_err);
if (local_err) {
error_propagate(errp, local_err);
return;
}
qemu_mutex_lock_iothread();
vmstate_loading = true;
ret = qemu_load_device_state(fb);
if (ret < 0) {
error_setg(errp, "COLO: load device state failed");
qemu_mutex_unlock_iothread();
return;
}
#ifdef CONFIG_REPLICATION
replication_get_error_all(&local_err);
if (local_err) {
error_propagate(errp, local_err);
qemu_mutex_unlock_iothread();
return;
}
/* discard colo disk buffer */
replication_do_checkpoint_all(&local_err);
if (local_err) {
error_propagate(errp, local_err);
qemu_mutex_unlock_iothread();
return;
}
#else
abort();
#endif
/* Notify all filters of all NIC to do checkpoint */
colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
if (local_err) {
error_propagate(errp, local_err);
qemu_mutex_unlock_iothread();
return;
}
vmstate_loading = false;
vm_start();
trace_colo_vm_state_change("stop", "run");
qemu_mutex_unlock_iothread();
if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
failover_set_state(FAILOVER_STATUS_RELAUNCH,
FAILOVER_STATUS_NONE);
failover_request_active(NULL);
return;
}
colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
&local_err);
if (local_err) {
error_propagate(errp, local_err);
}
}
static void colo_wait_handle_message(MigrationIncomingState *mis,
QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
{
COLOMessage msg;
Error *local_err = NULL;
msg = colo_receive_message(f, &local_err);
msg = colo_receive_message(mis->from_src_file, &local_err);
if (local_err) {
error_propagate(errp, local_err);
return;
@ -678,10 +803,9 @@ static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
switch (msg) {
case COLO_MESSAGE_CHECKPOINT_REQUEST:
*checkpoint_request = 1;
colo_incoming_process_checkpoint(mis, fb, bioc, errp);
break;
default:
*checkpoint_request = 0;
error_setg(errp, "Got unknown COLO message: %d", msg);
break;
}
@ -692,10 +816,7 @@ void *colo_process_incoming_thread(void *opaque)
MigrationIncomingState *mis = opaque;
QEMUFile *fb = NULL;
QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
uint64_t total_size;
uint64_t value;
Error *local_err = NULL;
int ret;
rcu_register_thread();
qemu_sem_init(&mis->colo_incoming_sem, 0);
@ -749,134 +870,19 @@ void *colo_process_incoming_thread(void *opaque)
}
while (mis->state == MIGRATION_STATUS_COLO) {
int request = 0;
colo_wait_handle_message(mis->from_src_file, &request, &local_err);
colo_wait_handle_message(mis, fb, bioc, &local_err);
if (local_err) {
goto out;
error_report_err(local_err);
break;
}
assert(request);
if (failover_get_state() != FAILOVER_STATUS_NONE) {
error_report("failover request");
goto out;
}
qemu_mutex_lock_iothread();
vm_stop_force_state(RUN_STATE_COLO);
trace_colo_vm_state_change("run", "stop");
qemu_mutex_unlock_iothread();
/* FIXME: This is unnecessary for periodic checkpoint mode */
colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
&local_err);
if (local_err) {
goto out;
}
colo_receive_check_message(mis->from_src_file,
COLO_MESSAGE_VMSTATE_SEND, &local_err);
if (local_err) {
goto out;
}
qemu_mutex_lock_iothread();
cpu_synchronize_all_pre_loadvm();
ret = qemu_loadvm_state_main(mis->from_src_file, mis);
qemu_mutex_unlock_iothread();
if (ret < 0) {
error_report("Load VM's live state (ram) error");
goto out;
}
value = colo_receive_message_value(mis->from_src_file,
COLO_MESSAGE_VMSTATE_SIZE, &local_err);
if (local_err) {
goto out;
}
/*
* Read VM device state data into channel buffer,
* It's better to re-use the memory allocated.
* Here we need to handle the channel buffer directly.
*/
if (value > bioc->capacity) {
bioc->capacity = value;
bioc->data = g_realloc(bioc->data, bioc->capacity);
}
total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
if (total_size != value) {
error_report("Got %" PRIu64 " VMState data, less than expected"
" %" PRIu64, total_size, value);
goto out;
}
bioc->usage = total_size;
qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
&local_err);
if (local_err) {
goto out;
}
qemu_mutex_lock_iothread();
vmstate_loading = true;
ret = qemu_load_device_state(fb);
if (ret < 0) {
error_report("COLO: load device state failed");
qemu_mutex_unlock_iothread();
goto out;
}
#ifdef CONFIG_REPLICATION
replication_get_error_all(&local_err);
if (local_err) {
qemu_mutex_unlock_iothread();
goto out;
}
/* discard colo disk buffer */
replication_do_checkpoint_all(&local_err);
if (local_err) {
qemu_mutex_unlock_iothread();
goto out;
}
#else
abort();
#endif
/* Notify all filters of all NIC to do checkpoint */
colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
if (local_err) {
qemu_mutex_unlock_iothread();
goto out;
}
vmstate_loading = false;
vm_start();
trace_colo_vm_state_change("stop", "run");
qemu_mutex_unlock_iothread();
if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
failover_set_state(FAILOVER_STATUS_RELAUNCH,
FAILOVER_STATUS_NONE);
failover_request_active(NULL);
goto out;
}
colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
&local_err);
if (local_err) {
goto out;
break;
}
}
out:
vmstate_loading = false;
/* Throw the unreported error message after exited from loop */
if (local_err) {
error_report_err(local_err);
}
/*
* There are only two reasons we can get here, some error happened