loop: flush items in the order they were added

Add a count to each invoke item that is updated with an increasing
loop atomic counter. Flush items from the queues based on their count
so that items are flushed in the order they were added even if they
were added to different queues.
This commit is contained in:
Wim Taymans 2024-05-08 12:21:54 +02:00
parent fac0d47c23
commit 8b23a8a89e

View file

@ -16,6 +16,8 @@
#include <spa/support/log.h>
#include <spa/support/plugin.h>
#include <spa/utils/list.h>
#include <spa/utils/cleanup.h>
#include <spa/utils/atomic.h>
#include <spa/utils/names.h>
#include <spa/utils/ratelimit.h>
#include <spa/utils/result.h>
@ -41,6 +43,7 @@ struct invoke_item {
size_t item_size;
spa_invoke_func_t func;
uint32_t seq;
uint32_t count;
void *data;
size_t size;
bool block;
@ -73,6 +76,8 @@ struct impl {
tss_t queue_tss_id;
pthread_mutex_t queue_lock;
uint32_t count;
uint32_t flush_count;
unsigned int polling:1;
};
@ -87,8 +92,6 @@ struct queue {
struct spa_ringbuffer buffer;
uint8_t *buffer_data;
uint8_t buffer_mem[DATAS_SIZE + MAX_ALIGN];
uint32_t flush_count;
};
struct source_impl {
@ -169,41 +172,57 @@ static int loop_remove_source(void *object, struct spa_source *source)
return res;
}
static void queue_flush_items(struct queue *queue)
static inline int32_t item_compare(struct invoke_item *a, struct invoke_item *b)
{
struct impl *impl = queue->impl;
uint32_t index, flush_count;
int32_t avail;
return (int32_t)(a->count - b->count);
}
static void flush_all_queues(struct impl *impl)
{
uint32_t flush_count;
int res;
flush_count = ++queue->flush_count;
avail = spa_ringbuffer_get_read_index(&queue->buffer, &index);
while (avail > 0) {
struct invoke_item *item;
bool block;
pthread_mutex_lock(&impl->queue_lock);
flush_count = ++impl->flush_count;
while (true) {
struct queue *cqueue, *queue;
struct invoke_item *citem, *item = NULL;
uint32_t cindex, index;
spa_invoke_func_t func;
bool block;
item = SPA_PTROFF(queue->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item);
block = item->block;
func = item->func;
spa_list_for_each(cqueue, &impl->queue_list, link) {
if (spa_ringbuffer_get_read_index(&cqueue->buffer, &cindex) <
(int32_t)sizeof(struct invoke_item))
continue;
citem = SPA_PTROFF(cqueue->buffer_data, cindex & (DATAS_SIZE - 1), struct invoke_item);
if (item == NULL || item_compare(citem, item) < 0) {
item = citem;
queue = cqueue;
index = cindex;
}
}
if (item == NULL)
break;
spa_log_trace_fp(impl->log, "%p: flush item %p", queue, item);
/* first we remove the function from the item so that recursive
* calls don't call the callback again. We can't update the
* read index before we call the function because then the item
* might get overwritten. */
item->func = NULL;
func = spa_steal_ptr(item->func);
if (func)
item->res = func(&impl->loop, true, item->seq, item->data,
item->size, item->user_data);
/* if this function did a recursive invoke, it now flushed the
* ringbuffer and we can exit */
if (flush_count != queue->flush_count)
if (flush_count != impl->flush_count)
break;
index += item->item_size;
avail -= item->item_size;
block = item->block;
spa_ringbuffer_read_update(&queue->buffer, index);
if (block) {
@ -212,14 +231,6 @@ static void queue_flush_items(struct queue *queue)
queue, queue->ack_fd, spa_strerror(res));
}
}
}
static void flush_all_queues(struct impl *impl)
{
struct queue *queue;
pthread_mutex_lock(&impl->queue_lock);
spa_list_for_each(queue, &impl->queue_list, link)
queue_flush_items(queue);
pthread_mutex_unlock(&impl->queue_lock);
}
@ -261,6 +272,7 @@ retry:
item = SPA_PTROFF(queue->buffer_data, offset, struct invoke_item);
item->func = func;
item->seq = seq;
item->count = SPA_ATOMIC_INC(impl->count);
item->size = size;
item->block = in_thread ? false : block;
item->user_data = user_data;