impl-node: update rt flags from rt threads

Update the added and prepared flags from the rt thread.

We also need to check if the node was prepared before we can schedule
it.
This commit is contained in:
Wim Taymans 2024-05-08 17:14:32 +02:00
parent 8b23a8a89e
commit b2844201c2
2 changed files with 46 additions and 26 deletions

View file

@ -88,7 +88,10 @@ do_node_prepare(struct spa_loop *loop, bool async, uint32_t seq,
uint64_t dummy;
int res;
pw_log_trace("%p: prepare driver %p", this, driver);
pw_log_trace("%p: prepare driver %p %d", this, driver, this->rt.prepared);
if (this->rt.prepared)
return 0;
/* clear the eventfd in case it was written to while the node was stopped */
res = spa_system_eventfd_read(this->rt.target.system, this->source.fd, &dummy);
@ -96,14 +99,18 @@ do_node_prepare(struct spa_loop *loop, bool async, uint32_t seq,
pw_log_warn("%p: read failed %m", this);
/* remote nodes have their source added in client-node instead */
if (!this->remote)
if (!this->remote) {
pw_log_trace("%p %p %p", this, loop, this->source.loop);
spa_loop_add_source(loop, &this->source);
}
if (!this->exported) {
/* trigger the driver when we complete */
copy_target(&this->rt.driver_target, &driver->rt.target);
spa_list_append(&this->rt.target_list, &this->rt.driver_target.link);
}
this->rt.prepared = true;
return 0;
}
@ -114,15 +121,22 @@ do_node_unprepare(struct spa_loop *loop, bool async, uint32_t seq,
{
struct pw_impl_node *this = user_data;
pw_log_trace("%p: unprepare driver %s", this, this->rt.driver_target.name);
pw_log_trace("%p: unprepare driver %s %d", this, this->rt.driver_target.name,
this->rt.prepared);
if (!this->remote)
if (!this->rt.prepared)
return 0;
if (!this->remote) {
pw_log_trace("%p %p %p", this, loop, this->source.loop);
spa_loop_remove_source(loop, &this->source);
}
if (!this->exported) {
spa_list_remove(&this->rt.driver_target.link);
spa_zero(this->rt.driver_target);
}
this->rt.prepared = false;
return 0;
}
@ -145,8 +159,12 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq,
struct pw_node_activation_state *dstate, *nstate;
struct pw_node_target *t;
pw_log_trace("%p: add to driver %p %p %p", this, driver,
driver->rt.target.activation, this->rt.target.activation);
pw_log_trace("%p: add to driver %p %p %p %d", this, driver,
driver->rt.target.activation, this->rt.target.activation,
this->rt.added);
if (this->rt.added)
return 0;
/* let the driver trigger us as part of the processing cycle */
spa_list_append(&driver->rt.target_list, &this->rt.target.link);
@ -168,6 +186,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq,
this, dstate, dstate->pending, dstate->required,
nstate, nstate->pending, nstate->required);
}
this->rt.added = true;
return 0;
}
@ -180,9 +199,13 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq,
struct pw_node_activation_state *dstate, *nstate;
struct pw_node_target *t;
pw_log_trace("%p: remove from driver %s %p %p",
pw_log_trace("%p: remove from driver %s %p %p %d",
this, this->rt.driver_target.name,
this->rt.driver_target.activation, this->rt.target.activation);
this->rt.driver_target.activation, this->rt.target.activation,
this->rt.added);
if (!this->rt.added)
return 0;
/* remove from driver */
spa_list_remove(&this->rt.target.link);
@ -203,6 +226,7 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq,
this, dstate, dstate->pending, dstate->required,
nstate, nstate->pending, nstate->required);
}
this->rt.added = false;
return 0;
}
@ -221,8 +245,7 @@ static void add_node_to_graph(struct pw_impl_node *node)
{
struct pw_impl_node *driver = node->driver_node;
if (node->rt.added)
return;
pw_log_debug("%p: driver:%p", node, driver);
if (node->data_loop == driver->data_loop) {
pw_loop_invoke(node->data_loop, do_node_prepare_add, 1, NULL, 0, true, node);
@ -231,7 +254,6 @@ static void add_node_to_graph(struct pw_impl_node *node)
if (!node->exported)
pw_loop_invoke(driver->data_loop, do_node_add, 1, NULL, 0, true, node);
}
node->rt.added = true;
}
static int
@ -249,8 +271,7 @@ static void remove_node_from_graph(struct pw_impl_node *node)
{
struct pw_impl_node *driver = node->driver_node;
if (!node->rt.added)
return;
pw_log_debug("%p: driver:%p", node, driver);
if (node->data_loop == driver->data_loop) {
pw_loop_invoke(node->data_loop, do_node_remove_unprepare, 1, NULL, 0, true, node);
@ -259,7 +280,6 @@ static void remove_node_from_graph(struct pw_impl_node *node)
pw_loop_invoke(driver->data_loop, do_node_remove, 1, NULL, 0, true, node);
pw_loop_invoke(node->data_loop, do_node_unprepare, 1, NULL, 0, true, node);
}
node->rt.added = false;
}
static int
@ -275,8 +295,7 @@ static void move_node_to_graph(struct pw_impl_node *node)
{
struct pw_impl_node *driver = node->driver_node;
if (!node->rt.added)
return;
pw_log_debug("%p: driver:%p", node, driver);
if (node->data_loop == driver->data_loop) {
pw_loop_invoke(node->data_loop, do_node_move, 1, NULL, 0, true, node);
@ -359,8 +378,8 @@ static int start_node(struct pw_impl_node *this)
if (impl->pending_state >= PW_NODE_STATE_RUNNING)
return 0;
pw_log_debug("%p: start node driving:%d driver:%d added:%d", this,
this->driving, this->driver, this->rt.added);
pw_log_debug("%p: start node driving:%d driver:%d prepared:%d", this,
this->driving, this->driver, this->rt.prepared);
if (!(this->driving && this->driver)) {
impl->pending_play = true;
@ -459,8 +478,8 @@ static void node_update_state(struct pw_impl_node *node, enum pw_node_state stat
switch (state) {
case PW_NODE_STATE_RUNNING:
pw_log_debug("%p: start node driving:%d driver:%d added:%d", node,
node->driving, node->driver, node->rt.added);
pw_log_debug("%p: start node driving:%d driver:%d prepared:%d", node,
node->driving, node->driver, node->rt.prepared);
if (res >= 0) {
add_node_to_graph(node);
@ -533,8 +552,8 @@ static int suspend_node(struct pw_impl_node *this)
node_deactivate(this);
pw_log_debug("%p: suspend node driving:%d driver:%d added:%d", this,
this->driving, this->driver, this->rt.added);
pw_log_debug("%p: suspend node driving:%d driver:%d prepared:%d", this,
this->driving, this->driver, this->rt.prepared);
res = spa_node_send_command(this->node,
&SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Suspend));
@ -1403,7 +1422,7 @@ static inline int process_node(void *data)
if (SPA_UNLIKELY(!this->transport_sync))
a->pending_sync = false;
if (SPA_LIKELY(this->rt.added)) {
if (SPA_LIKELY(this->rt.prepared)) {
/* process input mixers */
spa_list_for_each(p, &this->rt.input_mix, rt.node_link)
spa_node_process_fast(p->mix);
@ -1948,10 +1967,10 @@ static int node_ready(void *data, int status)
struct pw_impl_port *p;
uint64_t nsec;
pw_log_trace_fp("%p: ready driver:%d exported:%d %p status:%d added:%d", node,
node->driver, node->exported, driver, status, node->rt.added);
pw_log_trace_fp("%p: ready driver:%d exported:%d %p status:%d prepared:%d", node,
node->driver, node->exported, driver, status, node->rt.prepared);
if (SPA_UNLIKELY(!node->rt.added)) {
if (SPA_UNLIKELY(!node->rt.prepared)) {
/* This can happen when we are stopping a node and removed it from the
* graph but we still have not completed the Pause/Suspend command on
* the node. In that case, the node might still emit ready events,

View file

@ -739,6 +739,7 @@ struct pw_impl_node {
struct spa_ratelimit rate_limit;
bool prepared; /**< the node was added to loop */
bool added; /**< the node was added to driver */
} rt;
struct spa_fraction target_rate;