impl-node: avoid bitfield races

Move the bits that are used in the realtime thread away from the bits
from the main thread to avoid bitfield races. Move some fields in rt
structs to make it explicit that they are only to be modified from the
realtime threads.
This commit is contained in:
Wim Taymans 2024-04-25 16:18:03 +02:00
parent 6fdf09e83e
commit 9844aed31b
3 changed files with 32 additions and 26 deletions

View file

@ -167,7 +167,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq,
struct pw_impl_node *this = user_data;
struct pw_impl_node *driver = this->driver_node;
if (!this->added) {
if (!this->rt.added) {
uint64_t dummy;
int res;
@ -176,7 +176,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq,
if (SPA_UNLIKELY(res != -EAGAIN && res != 0))
pw_log_warn("%p: read failed %m", this);
this->added = true;
this->rt.added = true;
/* remote nodes have their source added in client-node instead */
if (!this->remote)
spa_loop_add_source(loop, &this->source);
@ -190,11 +190,11 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct pw_impl_node *this = user_data;
if (this->added) {
if (this->rt.added) {
if (!this->remote)
spa_loop_remove_source(loop, &this->source);
remove_node(this);
this->added = false;
this->rt.added = false;
}
return 0;
}
@ -273,7 +273,7 @@ static int start_node(struct pw_impl_node *this)
return 0;
pw_log_debug("%p: start node driving:%d driver:%d added:%d", this,
this->driving, this->driver, this->added);
this->driving, this->driver, this->rt.added);
if (!(this->driving && this->driver)) {
impl->pending_play = true;
@ -373,7 +373,7 @@ static void node_update_state(struct pw_impl_node *node, enum pw_node_state stat
switch (state) {
case PW_NODE_STATE_RUNNING:
pw_log_debug("%p: start node driving:%d driver:%d added:%d", node,
node->driving, node->driver, node->added);
node->driving, node->driver, node->rt.added);
if (res >= 0) {
pw_loop_invoke(node->data_loop, do_node_add, 1, NULL, 0, true, node);
@ -447,7 +447,7 @@ static int suspend_node(struct pw_impl_node *this)
node_deactivate(this);
pw_log_debug("%p: suspend node driving:%d driver:%d added:%d", this,
this->driving, this->driver, this->added);
this->driving, this->driver, this->rt.added);
res = spa_node_send_command(this->node,
&SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Suspend));
@ -884,7 +884,7 @@ do_move_nodes(struct spa_loop *loop,
pw_log_trace("%p: driver:%p->%p", node, old, driver);
if (node->added) {
if (node->rt.added) {
remove_node(node);
add_node(node, driver);
}
@ -1336,7 +1336,7 @@ static inline int process_node(void *data)
if (SPA_UNLIKELY(!this->transport_sync))
a->pending_sync = false;
if (SPA_LIKELY(this->added)) {
if (SPA_LIKELY(this->rt.added)) {
/* process input mixers */
spa_list_for_each(p, &this->rt.input_mix, rt.node_link)
spa_node_process_fast(p->mix);
@ -1882,9 +1882,9 @@ static int node_ready(void *data, int status)
uint64_t nsec;
pw_log_trace_fp("%p: ready driver:%d exported:%d %p status:%d added:%d", node,
node->driver, node->exported, driver, status, node->added);
node->driver, node->exported, driver, status, node->rt.added);
if (SPA_UNLIKELY(!node->added)) {
if (SPA_UNLIKELY(!node->rt.added)) {
/* This can happen when we are stopping a node and removed it from the
* graph but we still have not completed the Pause/Suspend command on
* the node. In that case, the node might still emit ready events,

View file

@ -32,7 +32,9 @@ struct impl {
struct spa_hook_list mix_hooks;
struct spa_hook mix_listener;
struct spa_list mix_list;
struct {
struct spa_list mix_list;
} rt;
struct spa_list param_list;
struct spa_list pending_list;
@ -215,9 +217,9 @@ do_add_mix(struct spa_loop *loop,
struct pw_impl_port *this = mix->p;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
pw_log_trace("%p: add mix %p", this, mix);
if (!mix->active) {
spa_list_append(&impl->mix_list, &mix->rt_link);
mix->active = true;
if (!mix->rt.active) {
spa_list_append(&impl->rt.mix_list, &mix->rt.link);
mix->rt.active = true;
}
return 0;
}
@ -229,9 +231,9 @@ do_remove_mix(struct spa_loop *loop,
struct pw_impl_port_mix *mix = user_data;
struct pw_impl_port *this = mix->p;
pw_log_trace("%p: remove mix %p", this, mix);
if (mix->active) {
spa_list_remove(&mix->rt_link);
mix->active = false;
if (mix->rt.active) {
spa_list_remove(&mix->rt.link);
mix->rt.active = false;
}
return 0;
}
@ -280,7 +282,7 @@ static int tee_process(void *object)
uint32_t cycle = (this->node->rt.position->clock.cycle + 1) & 1;
pw_log_trace_fp("%p: tee input status:%d id:%d cycle:%d", this, io->status, io->buffer_id, cycle);
spa_list_for_each(mix, &impl->mix_list, rt_link) {
spa_list_for_each(mix, &impl->rt.mix_list, rt.link) {
pw_log_trace_fp("%p: port %d %p->%p id:%d", this,
mix->port.port_id, io, mix->io[cycle], mix->io[cycle]->buffer_id);
*mix->io[cycle] = *io;
@ -320,7 +322,7 @@ static int schedule_mix_input(void *object)
if (SPA_UNLIKELY(PW_IMPL_PORT_IS_CONTROL(this)))
return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
spa_list_for_each(mix, &impl->mix_list, rt_link) {
spa_list_for_each(mix, &impl->rt.mix_list, rt.link) {
pw_log_trace_fp("%p: mix input %d %p->%p status:%d id:%d cycle:%d", this,
mix->port.port_id, mix->io[cycle], io,
mix->io[cycle]->status, mix->io[cycle]->buffer_id, cycle);
@ -336,7 +338,7 @@ static int schedule_mix_reuse_buffer(void *object, uint32_t port_id, uint32_t bu
struct impl *impl = object;
struct pw_impl_port_mix *mix;
spa_list_for_each(mix, &impl->mix_list, rt_link) {
spa_list_for_each(mix, &impl->rt.mix_list, rt.link) {
pw_log_trace_fp("%p: reuse buffer %d %d", impl, port_id, buffer_id);
/* FIXME send reuse buffer to peer */
break;
@ -731,7 +733,7 @@ struct pw_impl_port *pw_context_create_port(
spa_list_init(&impl->param_list);
spa_list_init(&impl->pending_list);
impl->cache_params = true;
spa_list_init(&impl->mix_list);
spa_list_init(&impl->rt.mix_list);
this = &impl->this;

View file

@ -678,7 +678,6 @@ struct pw_impl_node {
unsigned int transport_sync:1; /**< supports transport sync */
unsigned int target_pending:1; /**< a quantum/rate update is pending */
unsigned int moved:1; /**< the node was moved drivers */
unsigned int added:1; /**< the node was add to graph */
unsigned int pause_on_idle:1; /**< Pause processing when IDLE */
unsigned int suspend_on_idle:1;
unsigned int need_resume:1;
@ -739,6 +738,8 @@ struct pw_impl_node {
struct spa_list driver_link; /* our link in driver */
struct spa_ratelimit rate_limit;
bool added; /**< the node was add to graph */
} rt;
struct spa_fraction target_rate;
uint64_t target_quantum;
@ -751,7 +752,6 @@ struct pw_impl_node {
struct pw_impl_port_mix {
struct spa_list link;
struct spa_list rt_link;
struct pw_impl_port *p;
struct {
enum spa_direction direction;
@ -761,8 +761,12 @@ struct pw_impl_port_mix {
void *io_data;
uint32_t id;
uint32_t peer_id;
unsigned int have_buffers:1;
unsigned int active:1;
bool have_buffers;
struct {
bool active;
struct spa_list link;
} rt;
};
struct pw_impl_port_implementation {