Rework the iSCSI PDU transmit code to avoid lock contention and coalesce

PDUs before sending.

Sponsored by:	The FreeBSD Foundation
This commit is contained in:
Edward Tomasz Napierala 2014-04-04 15:49:37 +00:00
parent 8fb9488054
commit c485ab142f
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=264122
2 changed files with 170 additions and 38 deletions

View file

@ -63,6 +63,10 @@ static int debug = 1;
TUNABLE_INT("kern.icl.debug", &debug);
SYSCTL_INT(_kern_icl, OID_AUTO, debug, CTLFLAG_RWTUN,
&debug, 1, "Enable debug messages");
static int coalesce = 1;
TUNABLE_INT("kern.icl.coalesce", &coalesce);
SYSCTL_INT(_kern_icl, OID_AUTO, coalesce, CTLFLAG_RWTUN,
&coalesce, 1, "Try to coalesce PDUs before sending");
static int partial_receive_len = 1 * 1024; /* XXX: More? */
TUNABLE_INT("kern.icl.partial_receive_len", &partial_receive_len);
SYSCTL_INT(_kern_icl, OID_AUTO, partial_receive_len, CTLFLAG_RWTUN,
@ -772,18 +776,14 @@ icl_soupcall_receive(struct socket *so, void *arg, int waitflag)
}
static int
icl_pdu_send(struct icl_pdu *request)
icl_pdu_finalize(struct icl_pdu *request)
{
size_t padding, pdu_len;
uint32_t digest, zero = 0;
int error, ok;
struct socket *so;
int ok;
struct icl_conn *ic;
ic = request->ip_conn;
so = request->ip_conn->ic_socket;
ICL_CONN_LOCK_ASSERT(ic);
icl_pdu_set_data_segment_length(request, request->ip_data_len);
@ -816,7 +816,7 @@ icl_pdu_send(struct icl_pdu *request)
ok = m_append(request->ip_data_mbuf, sizeof(digest),
(void *)&digest);
if (ok != 1) {
ICL_WARN("failed to append header digest");
ICL_WARN("failed to append data digest");
return (1);
}
}
@ -827,64 +827,114 @@ icl_pdu_send(struct icl_pdu *request)
request->ip_bhs_mbuf->m_pkthdr.len = pdu_len;
error = sosend(so, NULL, NULL, request->ip_bhs_mbuf,
NULL, MSG_DONTWAIT, curthread);
request->ip_bhs_mbuf = NULL; /* Sosend consumes the mbuf. */
if (error != 0) {
ICL_DEBUG("sosend error %d", error);
return (error);
}
return (0);
}
static void
icl_conn_send_pdus(struct icl_conn *ic)
icl_conn_send_pdus(struct icl_conn *ic, void *fts)
{
struct icl_pdu *request;
STAILQ_HEAD(, icl_pdu) *queue = fts; /* XXX */
struct icl_pdu *request, *request2;
struct socket *so;
size_t available, size;
int error;
size_t available, size, size2;
int coalesced, error;
ICL_CONN_LOCK_ASSERT(ic);
ICL_CONN_LOCK_ASSERT_NOT(ic);
so = ic->ic_socket;
SOCKBUF_LOCK(&so->so_snd);
/*
* Check how much space do we have for transmit. We can't just
* call sosend() and retry when we get EWOULDBLOCK or EMSGSIZE,
* as it always frees the mbuf chain passed to it, even in case
* of error.
*/
available = sbspace(&so->so_snd);
/*
* Notify the socket layer that it doesn't need to call
* send socket upcall for the time being.
*/
so->so_snd.sb_lowat = so->so_snd.sb_hiwat;
SOCKBUF_UNLOCK(&so->so_snd);
while (!STAILQ_EMPTY(&ic->ic_to_send)) {
while (!STAILQ_EMPTY(queue)) {
if (ic->ic_disconnecting)
return;
request = STAILQ_FIRST(&ic->ic_to_send);
request = STAILQ_FIRST(queue);
size = icl_pdu_size(request);
if (available < size) {
/*
* Set the low watermark on the socket,
* to avoid waking up until there is enough
* space.
*/
SOCKBUF_LOCK(&so->so_snd);
so->so_snd.sb_lowat = size;
SOCKBUF_UNLOCK(&so->so_snd);
#if 1
ICL_DEBUG("no space to send; "
"have %zd, need %zd",
available, size);
#endif
/*
* Set the low watermark on the socket,
* to avoid unneccessary wakeups until there
* is enough space for the PDU to fit.
*/
SOCKBUF_LOCK(&so->so_snd);
so->so_snd.sb_lowat = size;
SOCKBUF_UNLOCK(&so->so_snd);
return;
}
available -= size;
STAILQ_REMOVE_HEAD(&ic->ic_to_send, ip_next);
error = icl_pdu_send(request);
STAILQ_REMOVE_HEAD(queue, ip_next);
error = icl_pdu_finalize(request);
if (error != 0) {
ICL_DEBUG("failed to send PDU; "
ICL_DEBUG("failed to finalize PDU; "
"dropping connection");
icl_conn_fail(ic);
icl_pdu_free(request);
return;
}
}
if (coalesce) {
coalesced = 1;
for (;;) {
request2 = STAILQ_FIRST(queue);
if (request2 == NULL)
break;
size2 = icl_pdu_size(request2);
if (available < size + size2)
break;
STAILQ_REMOVE_HEAD(queue, ip_next);
error = icl_pdu_finalize(request2);
if (error != 0) {
ICL_DEBUG("failed to finalize PDU; "
"dropping connection");
icl_conn_fail(ic);
icl_pdu_free(request);
icl_pdu_free(request2);
return;
}
m_cat(request->ip_bhs_mbuf, request2->ip_bhs_mbuf);
request2->ip_bhs_mbuf = NULL;
request->ip_bhs_mbuf->m_pkthdr.len += size2;
size += size2;
STAILQ_REMOVE_AFTER(queue, request, ip_next);
icl_pdu_free(request2);
coalesced++;
}
#if 0
if (coalesced > 1) {
ICL_DEBUG("coalesced %d PDUs into %zd bytes",
coalesced, size);
}
#endif
}
available -= size;
error = sosend(so, NULL, NULL, request->ip_bhs_mbuf,
NULL, MSG_DONTWAIT, curthread);
request->ip_bhs_mbuf = NULL; /* Sosend consumes the mbuf. */
if (error != 0) {
ICL_DEBUG("failed to send PDU, error %d; "
"dropping connection", error);
icl_conn_fail(ic);
icl_pdu_free(request);
return;
}
icl_pdu_free(request);
}
}
@ -893,9 +943,12 @@ static void
icl_send_thread(void *arg)
{
struct icl_conn *ic;
STAILQ_HEAD(, icl_pdu) queue;
ic = arg;
STAILQ_INIT(&queue);
ICL_CONN_LOCK(ic);
ic->ic_send_running = true;
@ -904,10 +957,54 @@ icl_send_thread(void *arg)
//ICL_DEBUG("terminating");
break;
}
icl_conn_send_pdus(ic);
for (;;) {
/*
* If the local queue is empty, populate it from
* the main one. This way the icl_conn_send_pdus()
* can go through all the queued PDUs without holding
* any locks.
*/
if (STAILQ_EMPTY(&queue))
STAILQ_SWAP(&ic->ic_to_send, &queue, icl_pdu);
ic->ic_check_send_space = false;
ICL_CONN_UNLOCK(ic);
icl_conn_send_pdus(ic, &queue);
ICL_CONN_LOCK(ic);
/*
* The icl_soupcall_send() was called since the last
* call to sbspace(); go around;
*/
if (ic->ic_check_send_space)
continue;
/*
* Local queue is empty, but we still have PDUs
* in the main one; go around.
*/
if (STAILQ_EMPTY(&queue) &&
!STAILQ_EMPTY(&ic->ic_to_send))
continue;
/*
* There might be some stuff in the local queue,
* which didn't get sent due to not having enough send
* space. Wait for socket upcall.
*/
break;
}
cv_wait(&ic->ic_send_cv, ic->ic_lock);
}
/*
* We're exiting; move PDUs back to the main queue, so they can
* get freed properly. At this point ordering doesn't matter.
*/
STAILQ_CONCAT(&ic->ic_to_send, &queue);
ic->ic_send_running = false;
ICL_CONN_UNLOCK(ic);
kthread_exit();
@ -919,12 +1016,19 @@ icl_soupcall_send(struct socket *so, void *arg, int waitflag)
struct icl_conn *ic;
ic = arg;
ICL_CONN_LOCK(ic);
ic->ic_check_send_space = true;
ICL_CONN_UNLOCK(ic);
cv_signal(&ic->ic_send_cv);
return (SU_OK);
}
int
icl_pdu_append_data(struct icl_pdu *request, const void *addr, size_t len, int flags)
icl_pdu_append_data(struct icl_pdu *request, const void *addr, size_t len,
int flags)
{
struct mbuf *mb, *newmb;
size_t copylen, off = 0;
@ -977,6 +1081,17 @@ icl_pdu_queue(struct icl_pdu *ip)
icl_pdu_free(ip);
return;
}
if (!STAILQ_EMPTY(&ic->ic_to_send)) {
STAILQ_INSERT_TAIL(&ic->ic_to_send, ip, ip_next);
/*
* If the queue is not empty, someone else had already
* signaled the send thread; no need to do that again,
* just return.
*/
return;
}
STAILQ_INSERT_TAIL(&ic->ic_to_send, ip, ip_next);
cv_signal(&ic->ic_send_cv);
}
@ -1188,6 +1303,20 @@ icl_conn_close(struct icl_conn *ic)
return;
}
/*
* Deregister socket upcalls.
*/
ICL_CONN_UNLOCK(ic);
SOCKBUF_LOCK(&ic->ic_socket->so_snd);
if (ic->ic_socket->so_snd.sb_upcall != NULL)
soupcall_clear(ic->ic_socket, SO_SND);
SOCKBUF_UNLOCK(&ic->ic_socket->so_snd);
SOCKBUF_LOCK(&ic->ic_socket->so_rcv);
if (ic->ic_socket->so_rcv.sb_upcall != NULL)
soupcall_clear(ic->ic_socket, SO_RCV);
SOCKBUF_UNLOCK(&ic->ic_socket->so_rcv);
ICL_CONN_LOCK(ic);
ic->ic_disconnecting = true;
/*
@ -1205,7 +1334,9 @@ icl_conn_close(struct icl_conn *ic)
}
//ICL_DEBUG("send/receive threads terminated");
ICL_CONN_UNLOCK(ic);
soclose(ic->ic_socket);
ICL_CONN_LOCK(ic);
ic->ic_socket = NULL;
if (ic->ic_receive_pdu != NULL) {

View file

@ -80,6 +80,7 @@ struct icl_conn {
volatile u_int ic_outstanding_pdus;
#endif
STAILQ_HEAD(, icl_pdu) ic_to_send;
bool ic_check_send_space;
size_t ic_receive_len;
int ic_receive_state;
struct icl_pdu *ic_receive_pdu;