loop: create a per-thread queue

Keep a thread local queue. This makes it possible for multiple threads
to write to the ringbuffer.

There is a lock to protect the list of queues. It can only be contended
when new queues are created in the threads but this can be done at
thread startup.

Fixes #3983
This commit is contained in:
Wim Taymans 2024-04-29 15:17:45 +02:00
parent c76424da36
commit de0db48f17

View file

@ -9,6 +9,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <pthread.h> #include <pthread.h>
#include <threads.h>
#include <spa/support/loop.h> #include <spa/support/loop.h>
#include <spa/support/system.h> #include <spa/support/system.h>
@ -70,7 +71,8 @@ struct impl {
struct spa_source *wakeup; struct spa_source *wakeup;
struct queue *queue; tss_t queue_tss_id;
pthread_mutex_t queue_lock;
unsigned int polling:1; unsigned int polling:1;
}; };
@ -215,8 +217,10 @@ static void queue_flush_items(struct queue *queue)
static void flush_all_queues(struct impl *impl) static void flush_all_queues(struct impl *impl)
{ {
struct queue *queue; struct queue *queue;
pthread_mutex_lock(&impl->queue_lock);
spa_list_for_each(queue, &impl->queue_list, link) spa_list_for_each(queue, &impl->queue_list, link)
queue_flush_items(queue); queue_flush_items(queue);
pthread_mutex_unlock(&impl->queue_lock);
} }
static int static int
@ -352,18 +356,15 @@ static void loop_queue_destroy(void *data)
{ {
struct queue *queue = data; struct queue *queue = data;
struct impl *impl = queue->impl; struct impl *impl = queue->impl;
pthread_mutex_lock(&impl->queue_lock);
spa_list_remove(&queue->link); spa_list_remove(&queue->link);
pthread_mutex_unlock(&impl->queue_lock);
spa_system_close(impl->system, queue->ack_fd); spa_system_close(impl->system, queue->ack_fd);
free(queue); free(queue);
} }
static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
const void *data, size_t size, bool block, void *user_data)
{
struct impl *impl = object;
return loop_queue_invoke(impl->queue, func, seq, data, size, block, user_data);
}
static struct queue *loop_create_queue(void *object, uint32_t flags) static struct queue *loop_create_queue(void *object, uint32_t flags)
{ {
struct impl *impl = object; struct impl *impl = object;
@ -390,7 +391,11 @@ static struct queue *loop_create_queue(void *object, uint32_t flags)
} }
queue->ack_fd = res; queue->ack_fd = res;
pthread_mutex_lock(&impl->queue_lock);
spa_list_append(&impl->queue_list, &queue->link); spa_list_append(&impl->queue_list, &queue->link);
pthread_mutex_unlock(&impl->queue_lock);
spa_log_info(impl->log, "%p created queue %p", impl, queue);
return queue; return queue;
@ -400,6 +405,21 @@ error:
return NULL; return NULL;
} }
static int loop_invoke(void *object, spa_invoke_func_t func, uint32_t seq,
const void *data, size_t size, bool block, void *user_data)
{
struct impl *impl = object;
struct queue *local_queue = tss_get(impl->queue_tss_id);
if (local_queue == NULL) {
local_queue = loop_create_queue(impl, 0);
if (local_queue == NULL)
return -errno;
tss_set(impl->queue_tss_id, local_queue);
}
return loop_queue_invoke(local_queue, func, seq, data, size, block, user_data);
}
static int loop_get_fd(void *object) static int loop_get_fd(void *object)
{ {
struct impl *impl = object; struct impl *impl = object;
@ -1043,6 +1063,8 @@ static int impl_clear(struct spa_handle *handle)
loop_queue_destroy(queue); loop_queue_destroy(queue);
spa_system_close(impl->system, impl->poll_fd); spa_system_close(impl->system, impl->poll_fd);
pthread_mutex_destroy(&impl->queue_lock);
tss_delete(impl->queue_tss_id);
return 0; return 0;
} }
@ -1054,6 +1076,15 @@ impl_get_size(const struct spa_handle_factory *factory,
return sizeof(struct impl); return sizeof(struct impl);
} }
#define CHECK(expression,label) \
do { \
if ((errno = (expression)) != 0) { \
res = -errno; \
spa_log_error(impl->log, #expression ": %s", strerror(errno)); \
goto label; \
} \
} while(false);
static int static int
impl_init(const struct spa_handle_factory *factory, impl_init(const struct spa_handle_factory *factory,
struct spa_handle *handle, struct spa_handle *handle,
@ -1063,6 +1094,7 @@ impl_init(const struct spa_handle_factory *factory,
{ {
struct impl *impl; struct impl *impl;
const char *str; const char *str;
pthread_mutexattr_t attr;
int res; int res;
spa_return_val_if_fail(factory != NULL, -EINVAL); spa_return_val_if_fail(factory != NULL, -EINVAL);
@ -1094,6 +1126,10 @@ impl_init(const struct spa_handle_factory *factory,
impl->retry_timeout = atoi(str); impl->retry_timeout = atoi(str);
} }
CHECK(pthread_mutexattr_init(&attr), error_exit);
CHECK(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE), error_exit);
CHECK(pthread_mutex_init(&impl->queue_lock, &attr), error_exit);
impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); impl->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
spa_log_topic_init(impl->log, &log_topic); spa_log_topic_init(impl->log, &log_topic);
impl->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System); impl->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System);
@ -1101,13 +1137,12 @@ impl_init(const struct spa_handle_factory *factory,
if (impl->system == NULL) { if (impl->system == NULL) {
spa_log_error(impl->log, "%p: a System is needed", impl); spa_log_error(impl->log, "%p: a System is needed", impl);
res = -EINVAL; res = -EINVAL;
goto error_exit; goto error_exit_free_mutex;
} }
if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) { if ((res = spa_system_pollfd_create(impl->system, SPA_FD_CLOEXEC)) < 0) {
spa_log_error(impl->log, "%p: can't create pollfd: %s", spa_log_error(impl->log, "%p: can't create pollfd: %s",
impl, spa_strerror(res)); impl, spa_strerror(res));
goto error_exit; goto error_exit_free_mutex;
} }
impl->poll_fd = res; impl->poll_fd = res;
@ -1123,12 +1158,12 @@ impl_init(const struct spa_handle_factory *factory,
goto error_exit_free_poll; goto error_exit_free_poll;
} }
impl->queue = loop_create_queue(impl, 0); if (tss_create(&impl->queue_tss_id, (tss_dtor_t)loop_queue_destroy) != 0) {
if (impl->queue == NULL) {
res = -errno; res = -errno;
spa_log_error(impl->log, "%p: can't create queue: %m", impl); spa_log_error(impl->log, "%p: can't create tss: %m", impl);
goto error_exit_free_wakeup; goto error_exit_free_wakeup;
} }
spa_log_debug(impl->log, "%p: initialized", impl); spa_log_debug(impl->log, "%p: initialized", impl);
return 0; return 0;
@ -1137,6 +1172,8 @@ error_exit_free_wakeup:
loop_destroy_source(impl, impl->wakeup); loop_destroy_source(impl, impl->wakeup);
error_exit_free_poll: error_exit_free_poll:
spa_system_close(impl->system, impl->poll_fd); spa_system_close(impl->system, impl->poll_fd);
error_exit_free_mutex:
pthread_mutex_destroy(&impl->queue_lock);
error_exit: error_exit:
return res; return res;
} }