qemu-nbd: use common main loop

Using a single main loop for sockets will help yielding from the socket
coroutine back to the main loop, and later reentering it.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
This commit is contained in:
Paolo Bonzini 2011-09-12 17:28:11 +02:00
parent cbcfa0418f
commit a61c67828d

View file

@ -35,12 +35,15 @@
#define SOCKET_PATH "/var/lock/qemu-nbd-%s" #define SOCKET_PATH "/var/lock/qemu-nbd-%s"
static int sigterm_wfd;
static NBDExport *exp; static NBDExport *exp;
static int verbose; static int verbose;
static char *device; static char *device;
static char *srcpath; static char *srcpath;
static char *sockpath; static char *sockpath;
static bool sigterm_reported;
static bool nbd_started;
static int shared = 1;
static int nb_fds;
static void usage(const char *name) static void usage(const char *name)
{ {
@ -169,10 +172,8 @@ static int find_partition(BlockDriverState *bs, int partition,
static void termsig_handler(int signum) static void termsig_handler(int signum)
{ {
static int sigterm_reported; sigterm_reported = true;
if (!sigterm_reported) { qemu_notify_event();
sigterm_reported = (write(sigterm_wfd, "", 1) == 1);
}
} }
static void *show_parts(void *arg) static void *show_parts(void *arg)
@ -243,6 +244,36 @@ out:
return (void *) EXIT_FAILURE; return (void *) EXIT_FAILURE;
} }
static int nbd_can_accept(void *opaque)
{
return nb_fds < shared;
}
static void nbd_read(void *opaque)
{
int fd = (uintptr_t) opaque;
if (nbd_trip(exp, fd) != 0) {
qemu_set_fd_handler2(fd, NULL, NULL, NULL, NULL);
close(fd);
nb_fds--;
}
}
static void nbd_accept(void *opaque)
{
int server_fd = (uintptr_t) opaque;
struct sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
int fd = accept(server_fd, (struct sockaddr *)&addr, &addr_len);
nbd_started = true;
if (fd != -1 && nbd_negotiate(exp, fd) != -1) {
qemu_set_fd_handler2(fd, NULL, nbd_read, NULL, (void *) (intptr_t) fd);
nb_fds++;
}
}
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
BlockDriverState *bs; BlockDriverState *bs;
@ -251,8 +282,6 @@ int main(int argc, char **argv)
bool disconnect = false; bool disconnect = false;
const char *bindto = "0.0.0.0"; const char *bindto = "0.0.0.0";
int port = NBD_DEFAULT_PORT; int port = NBD_DEFAULT_PORT;
struct sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
off_t fd_size; off_t fd_size;
const char *sopt = "hVb:o:p:rsnP:c:dvk:e:t"; const char *sopt = "hVb:o:p:rsnP:c:dvk:e:t";
struct option lopt[] = { struct option lopt[] = {
@ -280,13 +309,7 @@ int main(int argc, char **argv)
int flags = BDRV_O_RDWR; int flags = BDRV_O_RDWR;
int partition = -1; int partition = -1;
int ret; int ret;
int shared = 1;
fd_set fds;
int *sharing_fds;
int fd; int fd;
int i;
int nb_fds = 0;
int max_fd;
int persistent = 0; int persistent = 0;
pthread_t client_thread; pthread_t client_thread;
@ -294,12 +317,6 @@ int main(int argc, char **argv)
* handler ensures that "qemu-nbd -v -c" exits with a nice status code. * handler ensures that "qemu-nbd -v -c" exits with a nice status code.
*/ */
struct sigaction sa_sigterm; struct sigaction sa_sigterm;
int sigterm_fd[2];
if (qemu_pipe(sigterm_fd) == -1) {
err(EXIT_FAILURE, "Error setting up communication pipe");
}
sigterm_wfd = sigterm_fd[1];
memset(&sa_sigterm, 0, sizeof(sa_sigterm)); memset(&sa_sigterm, 0, sizeof(sa_sigterm));
sa_sigterm.sa_handler = termsig_handler; sa_sigterm.sa_handler = termsig_handler;
sigaction(SIGTERM, &sa_sigterm, NULL); sigaction(SIGTERM, &sa_sigterm, NULL);
@ -490,16 +507,16 @@ int main(int argc, char **argv)
} }
exp = nbd_export_new(bs, dev_offset, fd_size, nbdflags); exp = nbd_export_new(bs, dev_offset, fd_size, nbdflags);
sharing_fds = g_malloc((shared + 1) * sizeof(int));
if (sockpath) { if (sockpath) {
sharing_fds[0] = unix_socket_incoming(sockpath); fd = unix_socket_incoming(sockpath);
} else { } else {
sharing_fds[0] = tcp_socket_incoming(bindto, port); fd = tcp_socket_incoming(bindto, port);
} }
if (sharing_fds[0] == -1) if (fd == -1) {
return 1; return 1;
}
if (device) { if (device) {
int ret; int ret;
@ -514,54 +531,15 @@ int main(int argc, char **argv)
memset(&client_thread, 0, sizeof(client_thread)); memset(&client_thread, 0, sizeof(client_thread));
} }
max_fd = sharing_fds[0]; qemu_init_main_loop();
nb_fds++; qemu_set_fd_handler2(fd, nbd_can_accept, nbd_accept, NULL,
(void *)(uintptr_t)fd);
do { do {
FD_ZERO(&fds); main_loop_wait(false);
FD_SET(sigterm_fd[0], &fds); } while (!sigterm_reported && (persistent || !nbd_started || nb_fds > 0));
for (i = 0; i < nb_fds; i++)
FD_SET(sharing_fds[i], &fds);
do {
ret = select(max_fd + 1, &fds, NULL, NULL, NULL);
} while (ret == -1 && errno == EINTR);
if (ret == -1 || FD_ISSET(sigterm_fd[0], &fds)) {
break;
}
if (FD_ISSET(sharing_fds[0], &fds))
ret--;
for (i = 1; i < nb_fds && ret; i++) {
if (FD_ISSET(sharing_fds[i], &fds)) {
if (nbd_trip(exp, sharing_fds[i]) != 0) {
close(sharing_fds[i]);
nb_fds--;
sharing_fds[i] = sharing_fds[nb_fds];
i--;
}
ret--;
}
}
/* new connection ? */
if (FD_ISSET(sharing_fds[0], &fds)) {
if (nb_fds < shared + 1) {
sharing_fds[nb_fds] = accept(sharing_fds[0],
(struct sockaddr *)&addr,
&addr_len);
if (sharing_fds[nb_fds] != -1 &&
nbd_negotiate(exp, sharing_fds[nb_fds]) != -1) {
if (sharing_fds[nb_fds] > max_fd)
max_fd = sharing_fds[nb_fds];
nb_fds++;
}
}
}
} while (persistent || nb_fds > 1);
close(sharing_fds[0]);
nbd_export_close(exp); nbd_export_close(exp);
g_free(sharing_fds);
if (sockpath) { if (sockpath) {
unlink(sockpath); unlink(sockpath);
} }