siftr: flush pkt_nodes to the log file in batch

Reviewed by: rscheff, tuexen
Differential Revision: https://reviews.freebsd.org/D41175
This commit is contained in:
Cheng Cui 2023-07-25 09:53:55 -04:00
parent ba90a31d08
commit fafb03ab42
No known key found for this signature in database
GPG key ID: F9BE886D1486EF98

View file

@ -370,12 +370,12 @@ siftr_new_hash_node(struct flow_info info, int dir,
}
}
static void
siftr_process_pkt(struct pkt_node * pkt_node)
static int
siftr_process_pkt(struct pkt_node * pkt_node, char *buf)
{
struct flow_hash_node *hash_node;
struct listhead *counter_list;
struct ale *log_buf;
int ret_sz;
if (pkt_node->flowid == 0) {
panic("%s: flowid not available", __func__);
@ -385,7 +385,7 @@ siftr_process_pkt(struct pkt_node * pkt_node)
hash_node = siftr_find_flow(counter_list, pkt_node->flowid);
if (hash_node == NULL) {
return;
return 0;
} else if (siftr_pkts_per_log > 1) {
/*
* Taking the remainder of the counter divided
@ -401,16 +401,11 @@ siftr_process_pkt(struct pkt_node * pkt_node)
* we wrote a log message for this connection, return.
*/
if (hash_node->counter > 0)
return;
return 0;
}
log_buf = alq_getn(siftr_alq, MAX_LOG_MSG_LEN, ALQ_WAITOK);
if (log_buf == NULL)
return; /* Should only happen if the ALQ is shutting down. */
/* Construct a log message. */
log_buf->ae_bytesused = snprintf(log_buf->ae_data, MAX_LOG_MSG_LEN,
ret_sz = snprintf(buf, MAX_LOG_MSG_LEN,
"%c,%jd.%06ld,%s,%hu,%s,%hu,%u,%u,%u,%u,%u,%u,%u,%u,%u,%u,%u,%u,%u,"
"%u,%u,%u,%u,%u,%u,%u,%u\n",
direction[pkt_node->direction],
@ -442,7 +437,7 @@ siftr_process_pkt(struct pkt_node * pkt_node)
pkt_node->flowid,
pkt_node->flowtype);
alq_post_flags(siftr_alq, log_buf, 0);
return ret_sz;
}
static void
@ -452,6 +447,9 @@ siftr_pkt_manager_thread(void *arg)
STAILQ_HEAD_INITIALIZER(tmp_pkt_queue);
struct pkt_node *pkt_node, *pkt_node_temp;
uint8_t draining;
struct ale *log_buf;
int ret_sz, cnt;
char *bufp;
draining = 2;
@ -487,12 +485,51 @@ siftr_pkt_manager_thread(void *arg)
*/
mtx_unlock(&siftr_pkt_mgr_mtx);
/* Flush all pkt_nodes to the log file. */
STAILQ_FOREACH_SAFE(pkt_node, &tmp_pkt_queue, nodes,
pkt_node_temp) {
siftr_process_pkt(pkt_node);
STAILQ_REMOVE_HEAD(&tmp_pkt_queue, nodes);
free(pkt_node, M_SIFTR_PKTNODE);
try_again:
pkt_node = STAILQ_FIRST(&tmp_pkt_queue);
if (pkt_node != NULL) {
if (STAILQ_NEXT(pkt_node, nodes) != NULL) {
cnt = 3;
} else {
cnt = 1;
}
log_buf = alq_getn(siftr_alq, MAX_LOG_MSG_LEN * cnt,
ALQ_WAITOK);
if (log_buf != NULL) {
log_buf->ae_bytesused = 0;
bufp = log_buf->ae_data;
} else {
/*
* Should only happen if the ALQ is shutting
* down.
*/
bufp = NULL;
}
/* Flush all pkt_nodes to the log file. */
STAILQ_FOREACH_SAFE(pkt_node, &tmp_pkt_queue, nodes,
pkt_node_temp) {
if (log_buf != NULL) {
ret_sz = siftr_process_pkt(pkt_node,
bufp);
bufp += ret_sz;
log_buf->ae_bytesused += ret_sz;
cnt--;
}
STAILQ_REMOVE_HEAD(&tmp_pkt_queue, nodes);
free(pkt_node, M_SIFTR_PKTNODE);
if (cnt <= 0 && !STAILQ_EMPTY(&tmp_pkt_queue)) {
alq_post_flags(siftr_alq, log_buf, 0);
goto try_again;
}
}
if (log_buf != NULL) {
alq_post_flags(siftr_alq, log_buf, 0);
}
}
KASSERT(STAILQ_EMPTY(&tmp_pkt_queue),