vulkan: Sync renderer for blit filters

A spa_node has callback both on the main and data thread, which can
modify the internal state of vulkan_blit_state. Especially critical are
functions, which might drop buffers currently in use. To mitigate this
a read-write lock is used. The data thread shall try to aquire a read
lock before accessing the buffers, while the main thread has to aquire
exclusive access via a write lock before modifying the buffers.
This commit is contained in:
columbarius 2024-04-07 14:10:41 +02:00
parent 2010fa1349
commit 7206980023
2 changed files with 96 additions and 10 deletions

View file

@ -8,6 +8,8 @@
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdatomic.h>
#include <pthread.h>
#include <spa/support/plugin.h>
#include <spa/support/log.h>
@ -84,7 +86,9 @@ struct impl {
struct spa_hook_list hooks;
struct spa_callbacks callbacks;
bool started;
// Synchronization between main and data thread
atomic_bool started;
pthread_rwlock_t renderlock;
struct vulkan_blit_state state;
struct port port[2];
@ -92,6 +96,28 @@ struct impl {
#define CHECK_PORT(this,d,p) ((p) < 1)
static int lock_init(struct impl *this)
{
return pthread_rwlock_init(&this->renderlock, NULL);
}
static void lock_destroy(struct impl *this)
{
pthread_rwlock_destroy(&this->renderlock);
}
static int lock_renderer(struct impl *this)
{
spa_log_info(this->log, "Lock renderer");
return pthread_rwlock_wrlock(&this->renderlock);
}
static int unlock_renderer(struct impl *this)
{
spa_log_info(this->log, "Unlock renderer");
return pthread_rwlock_unlock(&this->renderlock);
}
static int impl_node_enum_params(void *object, int seq,
uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter)
@ -186,6 +212,7 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
this->started = true;
spa_vulkan_blit_start(&this->state);
// The main thread needs to lock the renderer before changing its state
break;
case SPA_NODE_COMMAND_Suspend:
@ -193,8 +220,11 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
if (!this->started)
return 0;
this->started = false;
lock_renderer(this);
spa_vulkan_blit_stop(&this->state);
this->started = false;
unlock_renderer(this);
// Locking the renderer from the renderer is no longer required
break;
default:
return -ENOTSUP;
@ -446,12 +476,12 @@ static int clear_buffers(struct impl *this, struct port *port)
{
if (port->n_buffers > 0) {
spa_log_debug(this->log, "%p: clear buffers", this);
spa_vulkan_blit_stop(&this->state);
lock_renderer(this);
spa_vulkan_blit_use_buffers(&this->state, &this->state.streams[port->stream_id], 0, &port->current_format, 0, NULL);
unlock_renderer(this);
port->n_buffers = 0;
spa_list_init(&port->empty);
spa_list_init(&port->ready);
this->started = false;
}
return 0;
}
@ -511,7 +541,6 @@ static int port_set_format(struct impl *this, struct port *port,
if (format == NULL) {
port->have_format = false;
clear_buffers(this, port);
spa_vulkan_blit_unprepare(&this->state);
} else {
struct spa_video_info info = { 0 };
@ -604,6 +633,7 @@ impl_node_port_use_buffers(void *object,
if (n_buffers > MAX_BUFFERS)
return -ENOSPC;
lock_renderer(this);
for (i = 0; i < n_buffers; i++) {
struct buffer *b;
@ -618,6 +648,7 @@ impl_node_port_use_buffers(void *object,
}
spa_vulkan_blit_use_buffers(&this->state, &this->state.streams[port->stream_id], flags, &port->current_format, n_buffers, buffers);
port->n_buffers = n_buffers;
unlock_renderer(this);
return 0;
}
@ -670,6 +701,7 @@ static int impl_node_process(void *object)
struct buffer *b;
spa_return_val_if_fail(this != NULL, -EINVAL);
spa_return_val_if_fail(this->started, -EINVAL);
inport = &this->port[SPA_DIRECTION_INPUT];
if ((inio = inport->io) == NULL)
@ -699,6 +731,11 @@ static int impl_node_process(void *object)
spa_log_debug(this->log, "%p: out of buffers", this);
return -EPIPE;
}
if (pthread_rwlock_tryrdlock(&this->renderlock) < 0) {
return -EBUSY;
}
b = &inport->buffers[inio->buffer_id];
this->state.streams[inport->stream_id].pending_buffer_id = b->id;
inio->status = SPA_STATUS_NEED_DATA;
@ -719,6 +756,8 @@ static int impl_node_process(void *object)
outio->buffer_id = b->id;
outio->status = SPA_STATUS_HAVE_DATA;
pthread_rwlock_unlock(&this->renderlock);
return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA;
}
@ -765,7 +804,9 @@ static int impl_clear(struct spa_handle *handle)
this = (struct impl *) handle;
spa_vulkan_blit_unprepare(&this->state);
spa_vulkan_blit_deinit(&this->state);
lock_destroy(this);
return 0;
}
@ -816,6 +857,8 @@ impl_init(const struct spa_handle_factory *factory,
this->info.params = this->params;
this->info.n_params = N_NODE_PARAMS;
lock_init(this);
port = &this->port[SPA_DIRECTION_INPUT];
port->stream_id = SPA_DIRECTION_INPUT;
port->direction = SPA_DIRECTION_INPUT;

View file

@ -8,6 +8,8 @@
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdatomic.h>
#include <pthread.h>
#include <spa/support/plugin.h>
#include <spa/support/log.h>
@ -84,7 +86,9 @@ struct impl {
struct spa_hook_list hooks;
struct spa_callbacks callbacks;
bool started;
// Synchronization between main and data thread
atomic_bool started;
pthread_rwlock_t renderlock;
struct vulkan_blit_state state;
struct port port[2];
@ -92,6 +96,28 @@ struct impl {
#define CHECK_PORT(this,d,p) ((p) < 1)
static int lock_init(struct impl *this)
{
return pthread_rwlock_init(&this->renderlock, NULL);
}
static void lock_destroy(struct impl *this)
{
pthread_rwlock_destroy(&this->renderlock);
}
static int lock_renderer(struct impl *this)
{
spa_log_info(this->log, "Lock renderer");
return pthread_rwlock_wrlock(&this->renderlock);
}
static int unlock_renderer(struct impl *this)
{
spa_log_info(this->log, "Unlock renderer");
return pthread_rwlock_unlock(&this->renderlock);
}
static int impl_node_enum_params(void *object, int seq,
uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter)
@ -186,6 +212,7 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
this->started = true;
spa_vulkan_blit_start(&this->state);
// The main thread needs to lock the renderer before changing its state
break;
case SPA_NODE_COMMAND_Suspend:
@ -193,8 +220,11 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
if (!this->started)
return 0;
this->started = false;
lock_renderer(this);
spa_vulkan_blit_stop(&this->state);
this->started = false;
unlock_renderer(this);
// Locking the renderer from the renderer is no longer required
break;
default:
return -ENOTSUP;
@ -491,12 +521,12 @@ static int clear_buffers(struct impl *this, struct port *port)
{
if (port->n_buffers > 0) {
spa_log_debug(this->log, "%p: clear buffers", this);
spa_vulkan_blit_stop(&this->state);
lock_renderer(this);
spa_vulkan_blit_use_buffers(&this->state, &this->state.streams[port->stream_id], 0, &port->current_format, 0, NULL);
unlock_renderer(this);
port->n_buffers = 0;
spa_list_init(&port->empty);
spa_list_init(&port->ready);
this->started = false;
}
return 0;
}
@ -601,7 +631,6 @@ static int port_set_format(struct impl *this, struct port *port,
if (format == NULL) {
port->have_format = false;
clear_buffers(this, port);
spa_vulkan_blit_unprepare(&this->state);
} else {
struct spa_video_info info = { 0 };
@ -701,6 +730,7 @@ impl_node_port_use_buffers(void *object,
if (n_buffers > MAX_BUFFERS)
return -ENOSPC;
lock_renderer(this);
for (i = 0; i < n_buffers; i++) {
struct buffer *b;
@ -715,6 +745,7 @@ impl_node_port_use_buffers(void *object,
}
spa_vulkan_blit_use_buffers(&this->state, &this->state.streams[port->stream_id], flags, &port->current_format, n_buffers, buffers);
port->n_buffers = n_buffers;
unlock_renderer(this);
return 0;
}
@ -767,6 +798,7 @@ static int impl_node_process(void *object)
struct buffer *b;
spa_return_val_if_fail(this != NULL, -EINVAL);
spa_return_val_if_fail(this->started, -EINVAL);
inport = &this->port[SPA_DIRECTION_INPUT];
if ((inio = inport->io) == NULL)
@ -796,6 +828,11 @@ static int impl_node_process(void *object)
spa_log_debug(this->log, "%p: out of buffers", this);
return -EPIPE;
}
if (pthread_rwlock_tryrdlock(&this->renderlock) < 0) {
return -EBUSY;
}
b = &inport->buffers[inio->buffer_id];
this->state.streams[inport->stream_id].pending_buffer_id = b->id;
inio->status = SPA_STATUS_NEED_DATA;
@ -821,6 +858,8 @@ static int impl_node_process(void *object)
outio->buffer_id = b->id;
outio->status = SPA_STATUS_HAVE_DATA;
pthread_rwlock_unlock(&this->renderlock);
return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA;
}
@ -867,7 +906,9 @@ static int impl_clear(struct spa_handle *handle)
this = (struct impl *) handle;
spa_vulkan_blit_unprepare(&this->state);
spa_vulkan_blit_deinit(&this->state);
lock_destroy(this);
return 0;
}
@ -918,6 +959,8 @@ impl_init(const struct spa_handle_factory *factory,
this->info.params = this->params;
this->info.n_params = N_NODE_PARAMS;
lock_init(this);
port = &this->port[SPA_DIRECTION_INPUT];
port->stream_id = SPA_DIRECTION_INPUT;
port->direction = SPA_DIRECTION_INPUT;