nbd: Drop nbd_can_read()

There is no variant of aio_set_fd_handler() like qemu_set_fd_handler2(),
so we cannot give a can_read() callback function. Instead, unregister
the nbd_read() function whenever we cannot read and re-register it as
soon as we can read again.

All this is hidden behind the functions nbd_set_handlers() (which
registers all handlers for the AIO context and file descriptor belonging
to the given client), nbd_unset_handlers() (which unregisters them) and
nbd_update_can_read() (which checks whether NBD can read for the given
client and acts accordingly).

Signed-off-by: Max Reitz <mreitz@redhat.com>
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
This commit is contained in:
Max Reitz 2014-06-20 21:57:32 +02:00 committed by Stefan Hajnoczi
parent a780dea045
commit 958c717df9

72
nbd.c
View file

@ -18,6 +18,7 @@
#include "block/nbd.h"
#include "block/block.h"
#include "block/block_int.h"
#include "block/coroutine.h"
@ -107,6 +108,8 @@ struct NBDExport {
uint32_t nbdflags;
QTAILQ_HEAD(, NBDClient) clients;
QTAILQ_ENTRY(NBDExport) next;
AioContext *ctx;
};
static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
@ -123,6 +126,8 @@ struct NBDClient {
CoMutex send_lock;
Coroutine *send_coroutine;
bool can_read;
QTAILQ_ENTRY(NBDClient) next;
int nb_requests;
bool closing;
@ -130,6 +135,10 @@ struct NBDClient {
/* That's all folks */
static void nbd_set_handlers(NBDClient *client);
static void nbd_unset_handlers(NBDClient *client);
static void nbd_update_can_read(NBDClient *client);
ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
{
size_t offset = 0;
@ -862,7 +871,7 @@ void nbd_client_put(NBDClient *client)
*/
assert(client->closing);
qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL);
nbd_unset_handlers(client);
close(client->sock);
client->sock = -1;
if (client->exp) {
@ -898,6 +907,7 @@ static NBDRequest *nbd_request_get(NBDClient *client)
assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
client->nb_requests++;
nbd_update_can_read(client);
req = g_slice_new0(NBDRequest);
nbd_client_get(client);
@ -914,9 +924,8 @@ static void nbd_request_put(NBDRequest *req)
}
g_slice_free(NBDRequest, req);
if (client->nb_requests-- == MAX_NBD_REQUESTS) {
qemu_notify_event();
}
client->nb_requests--;
nbd_update_can_read(client);
nbd_client_put(client);
}
@ -932,6 +941,7 @@ NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset,
exp->nbdflags = nbdflags;
exp->size = size == -1 ? bdrv_getlength(bs) : size;
exp->close = close;
exp->ctx = bdrv_get_aio_context(bs);
bdrv_ref(bs);
return exp;
}
@ -1023,10 +1033,6 @@ void nbd_export_close_all(void)
}
}
static int nbd_can_read(void *opaque);
static void nbd_read(void *opaque);
static void nbd_restart_write(void *opaque);
static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
int len)
{
@ -1035,9 +1041,8 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
ssize_t rc, ret;
qemu_co_mutex_lock(&client->send_lock);
qemu_set_fd_handler2(csock, nbd_can_read, nbd_read,
nbd_restart_write, client);
client->send_coroutine = qemu_coroutine_self();
nbd_set_handlers(client);
if (!len) {
rc = nbd_send_reply(csock, reply);
@ -1054,7 +1059,7 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
}
client->send_coroutine = NULL;
qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
nbd_set_handlers(client);
qemu_co_mutex_unlock(&client->send_lock);
return rc;
}
@ -1067,6 +1072,8 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
ssize_t rc;
client->recv_coroutine = qemu_coroutine_self();
nbd_update_can_read(client);
rc = nbd_receive_request(csock, request);
if (rc < 0) {
if (rc != -EAGAIN) {
@ -1108,6 +1115,8 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
out:
client->recv_coroutine = NULL;
nbd_update_can_read(client);
return rc;
}
@ -1259,13 +1268,6 @@ out:
nbd_client_close(client);
}
static int nbd_can_read(void *opaque)
{
NBDClient *client = opaque;
return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS;
}
static void nbd_read(void *opaque)
{
NBDClient *client = opaque;
@ -1284,6 +1286,37 @@ static void nbd_restart_write(void *opaque)
qemu_coroutine_enter(client->send_coroutine, NULL);
}
static void nbd_set_handlers(NBDClient *client)
{
if (client->exp && client->exp->ctx) {
aio_set_fd_handler(client->exp->ctx, client->sock,
client->can_read ? nbd_read : NULL,
client->send_coroutine ? nbd_restart_write : NULL,
client);
}
}
static void nbd_unset_handlers(NBDClient *client)
{
if (client->exp && client->exp->ctx) {
aio_set_fd_handler(client->exp->ctx, client->sock, NULL, NULL, NULL);
}
}
static void nbd_update_can_read(NBDClient *client)
{
bool can_read = client->recv_coroutine ||
client->nb_requests < MAX_NBD_REQUESTS;
if (can_read != client->can_read) {
client->can_read = can_read;
nbd_set_handlers(client);
/* There is no need to invoke aio_notify(), since aio_set_fd_handler()
* in nbd_set_handlers() will have taken care of that */
}
}
NBDClient *nbd_client_new(NBDExport *exp, int csock,
void (*close)(NBDClient *))
{
@ -1292,13 +1325,14 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock,
client->refcount = 1;
client->exp = exp;
client->sock = csock;
client->can_read = true;
if (nbd_send_negotiate(client)) {
g_free(client);
return NULL;
}
client->close = close;
qemu_co_mutex_init(&client->send_lock);
qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
nbd_set_handlers(client);
if (exp) {
QTAILQ_INSERT_TAIL(&exp->clients, client, next);