From 7206980023c0ba30c78559b49b9ef63261676545 Mon Sep 17 00:00:00 2001 From: columbarius Date: Sun, 7 Apr 2024 14:10:41 +0200 Subject: [PATCH] vulkan: Sync renderer for blit filters A spa_node has callback both on the main and data thread, which can modify the internal state of vulkan_blit_state. Especially critical are functions, which might drop buffers currently in use. To mitigate this a read-write lock is used. The data thread shall try to aquire a read lock before accessing the buffers, while the main thread has to aquire exclusive access via a write lock before modifying the buffers. --- spa/plugins/vulkan/vulkan-blit-dsp-filter.c | 53 +++++++++++++++++++-- spa/plugins/vulkan/vulkan-blit-filter.c | 53 +++++++++++++++++++-- 2 files changed, 96 insertions(+), 10 deletions(-) diff --git a/spa/plugins/vulkan/vulkan-blit-dsp-filter.c b/spa/plugins/vulkan/vulkan-blit-dsp-filter.c index 67e879556..afe073532 100644 --- a/spa/plugins/vulkan/vulkan-blit-dsp-filter.c +++ b/spa/plugins/vulkan/vulkan-blit-dsp-filter.c @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include #include @@ -84,7 +86,9 @@ struct impl { struct spa_hook_list hooks; struct spa_callbacks callbacks; - bool started; + // Synchronization between main and data thread + atomic_bool started; + pthread_rwlock_t renderlock; struct vulkan_blit_state state; struct port port[2]; @@ -92,6 +96,28 @@ struct impl { #define CHECK_PORT(this,d,p) ((p) < 1) +static int lock_init(struct impl *this) +{ + return pthread_rwlock_init(&this->renderlock, NULL); +} + +static void lock_destroy(struct impl *this) +{ + pthread_rwlock_destroy(&this->renderlock); +} + +static int lock_renderer(struct impl *this) +{ + spa_log_info(this->log, "Lock renderer"); + return pthread_rwlock_wrlock(&this->renderlock); +} + +static int unlock_renderer(struct impl *this) +{ + spa_log_info(this->log, "Unlock renderer"); + return pthread_rwlock_unlock(&this->renderlock); +} + static int impl_node_enum_params(void *object, int seq, uint32_t id, uint32_t start, uint32_t num, const struct spa_pod *filter) @@ -186,6 +212,7 @@ static int impl_node_send_command(void *object, const struct spa_command *comman this->started = true; spa_vulkan_blit_start(&this->state); + // The main thread needs to lock the renderer before changing its state break; case SPA_NODE_COMMAND_Suspend: @@ -193,8 +220,11 @@ static int impl_node_send_command(void *object, const struct spa_command *comman if (!this->started) return 0; - this->started = false; + lock_renderer(this); spa_vulkan_blit_stop(&this->state); + this->started = false; + unlock_renderer(this); + // Locking the renderer from the renderer is no longer required break; default: return -ENOTSUP; @@ -446,12 +476,12 @@ static int clear_buffers(struct impl *this, struct port *port) { if (port->n_buffers > 0) { spa_log_debug(this->log, "%p: clear buffers", this); - spa_vulkan_blit_stop(&this->state); + lock_renderer(this); spa_vulkan_blit_use_buffers(&this->state, &this->state.streams[port->stream_id], 0, &port->current_format, 0, NULL); + unlock_renderer(this); port->n_buffers = 0; spa_list_init(&port->empty); spa_list_init(&port->ready); - this->started = false; } return 0; } @@ -511,7 +541,6 @@ static int port_set_format(struct impl *this, struct port *port, if (format == NULL) { port->have_format = false; clear_buffers(this, port); - spa_vulkan_blit_unprepare(&this->state); } else { struct spa_video_info info = { 0 }; @@ -604,6 +633,7 @@ impl_node_port_use_buffers(void *object, if (n_buffers > MAX_BUFFERS) return -ENOSPC; + lock_renderer(this); for (i = 0; i < n_buffers; i++) { struct buffer *b; @@ -618,6 +648,7 @@ impl_node_port_use_buffers(void *object, } spa_vulkan_blit_use_buffers(&this->state, &this->state.streams[port->stream_id], flags, &port->current_format, n_buffers, buffers); port->n_buffers = n_buffers; + unlock_renderer(this); return 0; } @@ -670,6 +701,7 @@ static int impl_node_process(void *object) struct buffer *b; spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(this->started, -EINVAL); inport = &this->port[SPA_DIRECTION_INPUT]; if ((inio = inport->io) == NULL) @@ -699,6 +731,11 @@ static int impl_node_process(void *object) spa_log_debug(this->log, "%p: out of buffers", this); return -EPIPE; } + + if (pthread_rwlock_tryrdlock(&this->renderlock) < 0) { + return -EBUSY; + } + b = &inport->buffers[inio->buffer_id]; this->state.streams[inport->stream_id].pending_buffer_id = b->id; inio->status = SPA_STATUS_NEED_DATA; @@ -719,6 +756,8 @@ static int impl_node_process(void *object) outio->buffer_id = b->id; outio->status = SPA_STATUS_HAVE_DATA; + pthread_rwlock_unlock(&this->renderlock); + return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA; } @@ -765,7 +804,9 @@ static int impl_clear(struct spa_handle *handle) this = (struct impl *) handle; + spa_vulkan_blit_unprepare(&this->state); spa_vulkan_blit_deinit(&this->state); + lock_destroy(this); return 0; } @@ -816,6 +857,8 @@ impl_init(const struct spa_handle_factory *factory, this->info.params = this->params; this->info.n_params = N_NODE_PARAMS; + lock_init(this); + port = &this->port[SPA_DIRECTION_INPUT]; port->stream_id = SPA_DIRECTION_INPUT; port->direction = SPA_DIRECTION_INPUT; diff --git a/spa/plugins/vulkan/vulkan-blit-filter.c b/spa/plugins/vulkan/vulkan-blit-filter.c index f0c603068..7b8bfa395 100644 --- a/spa/plugins/vulkan/vulkan-blit-filter.c +++ b/spa/plugins/vulkan/vulkan-blit-filter.c @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include #include @@ -84,7 +86,9 @@ struct impl { struct spa_hook_list hooks; struct spa_callbacks callbacks; - bool started; + // Synchronization between main and data thread + atomic_bool started; + pthread_rwlock_t renderlock; struct vulkan_blit_state state; struct port port[2]; @@ -92,6 +96,28 @@ struct impl { #define CHECK_PORT(this,d,p) ((p) < 1) +static int lock_init(struct impl *this) +{ + return pthread_rwlock_init(&this->renderlock, NULL); +} + +static void lock_destroy(struct impl *this) +{ + pthread_rwlock_destroy(&this->renderlock); +} + +static int lock_renderer(struct impl *this) +{ + spa_log_info(this->log, "Lock renderer"); + return pthread_rwlock_wrlock(&this->renderlock); +} + +static int unlock_renderer(struct impl *this) +{ + spa_log_info(this->log, "Unlock renderer"); + return pthread_rwlock_unlock(&this->renderlock); +} + static int impl_node_enum_params(void *object, int seq, uint32_t id, uint32_t start, uint32_t num, const struct spa_pod *filter) @@ -186,6 +212,7 @@ static int impl_node_send_command(void *object, const struct spa_command *comman this->started = true; spa_vulkan_blit_start(&this->state); + // The main thread needs to lock the renderer before changing its state break; case SPA_NODE_COMMAND_Suspend: @@ -193,8 +220,11 @@ static int impl_node_send_command(void *object, const struct spa_command *comman if (!this->started) return 0; - this->started = false; + lock_renderer(this); spa_vulkan_blit_stop(&this->state); + this->started = false; + unlock_renderer(this); + // Locking the renderer from the renderer is no longer required break; default: return -ENOTSUP; @@ -491,12 +521,12 @@ static int clear_buffers(struct impl *this, struct port *port) { if (port->n_buffers > 0) { spa_log_debug(this->log, "%p: clear buffers", this); - spa_vulkan_blit_stop(&this->state); + lock_renderer(this); spa_vulkan_blit_use_buffers(&this->state, &this->state.streams[port->stream_id], 0, &port->current_format, 0, NULL); + unlock_renderer(this); port->n_buffers = 0; spa_list_init(&port->empty); spa_list_init(&port->ready); - this->started = false; } return 0; } @@ -601,7 +631,6 @@ static int port_set_format(struct impl *this, struct port *port, if (format == NULL) { port->have_format = false; clear_buffers(this, port); - spa_vulkan_blit_unprepare(&this->state); } else { struct spa_video_info info = { 0 }; @@ -701,6 +730,7 @@ impl_node_port_use_buffers(void *object, if (n_buffers > MAX_BUFFERS) return -ENOSPC; + lock_renderer(this); for (i = 0; i < n_buffers; i++) { struct buffer *b; @@ -715,6 +745,7 @@ impl_node_port_use_buffers(void *object, } spa_vulkan_blit_use_buffers(&this->state, &this->state.streams[port->stream_id], flags, &port->current_format, n_buffers, buffers); port->n_buffers = n_buffers; + unlock_renderer(this); return 0; } @@ -767,6 +798,7 @@ static int impl_node_process(void *object) struct buffer *b; spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(this->started, -EINVAL); inport = &this->port[SPA_DIRECTION_INPUT]; if ((inio = inport->io) == NULL) @@ -796,6 +828,11 @@ static int impl_node_process(void *object) spa_log_debug(this->log, "%p: out of buffers", this); return -EPIPE; } + + if (pthread_rwlock_tryrdlock(&this->renderlock) < 0) { + return -EBUSY; + } + b = &inport->buffers[inio->buffer_id]; this->state.streams[inport->stream_id].pending_buffer_id = b->id; inio->status = SPA_STATUS_NEED_DATA; @@ -821,6 +858,8 @@ static int impl_node_process(void *object) outio->buffer_id = b->id; outio->status = SPA_STATUS_HAVE_DATA; + pthread_rwlock_unlock(&this->renderlock); + return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA; } @@ -867,7 +906,9 @@ static int impl_clear(struct spa_handle *handle) this = (struct impl *) handle; + spa_vulkan_blit_unprepare(&this->state); spa_vulkan_blit_deinit(&this->state); + lock_destroy(this); return 0; } @@ -918,6 +959,8 @@ impl_init(const struct spa_handle_factory *factory, this->info.params = this->params; this->info.n_params = N_NODE_PARAMS; + lock_init(this); + port = &this->port[SPA_DIRECTION_INPUT]; port->stream_id = SPA_DIRECTION_INPUT; port->direction = SPA_DIRECTION_INPUT;