remove from driver in driver data-loop

This commit is contained in:
Wim Taymans 2024-04-30 15:26:53 +02:00
parent afd9960dc3
commit bd14e212c7
3 changed files with 97 additions and 88 deletions

View file

@ -98,6 +98,7 @@ static void pw_node_peer_activate(struct pw_node_peer *peer)
if (peer->active_count++ == 0) {
spa_list_append(&peer->output->rt.target_list, &peer->target.link);
if (!peer->target.active && peer->output->rt.driver_target.node != NULL) {
if (!peer->output->async)
state->required++;
@ -113,7 +114,6 @@ static void pw_node_peer_deactivate(struct pw_node_peer *peer)
struct pw_node_activation_state *state;
state = &peer->target.activation->state[0];
if (--peer->active_count == 0) {
spa_list_remove(&peer->target.link);
if (peer->target.active) {
@ -126,7 +126,6 @@ static void pw_node_peer_deactivate(struct pw_node_peer *peer)
peer->target.name, state, state->pending, state->required);
}
static void info_changed(struct pw_impl_link *link)
{
struct pw_resource *resource;

View file

@ -73,26 +73,78 @@ struct resource_data {
/** \endcond */
/* Called from the node data loop when a node needs to be scheduled by
* the given driver. 3 things needs to happen:
* the given driver.
*
* - the node adds the source to the data loop
* - the node needs to trigger the driver when it completes. This means
* the driver is added to the target list.
*/
static int
do_node_prepare(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct pw_impl_node *this = user_data;
struct pw_impl_node *driver = this->driver_node;
uint64_t dummy;
int res;
pw_log_trace("%p: prepare driver %p", this, driver);
/* clear the eventfd in case it was written to while the node was stopped */
res = spa_system_eventfd_read(this->rt.target.system, this->source.fd, &dummy);
if (SPA_UNLIKELY(res != -EAGAIN && res != 0))
pw_log_warn("%p: read failed %m", this);
/* remote nodes have their source added in client-node instead */
if (!this->remote)
spa_loop_add_source(loop, &this->source);
if (!this->exported) {
/* trigger the driver when we complete */
copy_target(&this->rt.driver_target, &driver->rt.target);
spa_list_append(&this->rt.target_list, &this->rt.driver_target.link);
}
return 0;
}
/* called from the node data loop and undoes the changes done in do_node_prepare. */
static int
do_node_unprepare(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct pw_impl_node *this = user_data;
pw_log_trace("%p: unprepare driver %s", this, this->rt.driver_target.name);
if (!this->remote)
spa_loop_remove_source(loop, &this->source);
if (!this->exported) {
spa_list_remove(&this->rt.driver_target.link);
spa_zero(this->rt.driver_target);
}
return 0;
}
/* Called from the driver data loop when a node needs to be scheduled by
* the driver. 2 things needs to happen:
*
* - the node is added to the driver target list and the required state
* is incremented. This makes sure the node is woken up when the driver
* starts a new cycle.
* - the node needs to trigger the driver when it completes. This means
* the driver is added to the target list.
* - the node targets (including the driver we added above) have their
* required state incremented.
*
* This code is called from the data-loop to ensure synchronization
* - the node targets (including the driver we added in do_node_prepare above)
* have their required state incremented. We can safely read the target
* list because we always sync on updates.
*/
static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
static int
do_node_add(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct pw_impl_node *this = user_data;
struct pw_impl_node *driver = this->driver_node;
struct pw_node_activation_state *dstate, *nstate;
struct pw_node_target *t;
if (this->exported)
return;
pw_log_trace("%p: add to driver %p %p %p", this, driver,
driver->rt.target.activation, this->rt.target.activation);
@ -103,11 +155,6 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
nstate->required++;
this->rt.target.active = true;
}
/* trigger the driver when we complete */
copy_target(&this->rt.driver_target, &driver->rt.target);
spa_list_append(&this->rt.target_list, &this->rt.driver_target.link);
/* now increment the required states of all this node targets, including
* the driver we added above */
spa_list_for_each(t, &this->rt.target_list, link) {
@ -121,23 +168,24 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
this, dstate, dstate->pending, dstate->required,
nstate, nstate->pending, nstate->required);
}
return 0;
}
/* called from the data loop and undoes the changes done in add_node. */
static void remove_node(struct pw_impl_node *this)
/* called from the driver data loop and undoes the changes done in do_node_add. */
static int
do_node_remove(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct pw_impl_node *this = user_data;
struct pw_node_activation_state *dstate, *nstate;
struct pw_node_target *t;
if (this->exported)
return;
pw_log_trace("%p: remove from driver %s %p %p",
this, this->rt.driver_target.name,
this->rt.driver_target.activation, this->rt.target.activation);
/* remove from driver */
spa_list_remove(&this->rt.target.link);
nstate = &this->rt.target.activation->state[0];
if (this->rt.target.active) {
nstate->required--;
@ -155,48 +203,29 @@ static void remove_node(struct pw_impl_node *this)
this, dstate, dstate->pending, dstate->required,
nstate, nstate->pending, nstate->required);
}
spa_list_remove(&this->rt.driver_target.link);
spa_zero(this->rt.driver_target);
}
static int
do_node_add(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct pw_impl_node *this = user_data;
struct pw_impl_node *driver = this->driver_node;
if (!this->rt.added) {
uint64_t dummy;
int res;
/* clear the eventfd in case it was written to while the node was stopped */
res = spa_system_eventfd_read(this->rt.target.system, this->source.fd, &dummy);
if (SPA_UNLIKELY(res != -EAGAIN && res != 0))
pw_log_warn("%p: read failed %m", this);
this->rt.added = true;
/* remote nodes have their source added in client-node instead */
if (!this->remote)
spa_loop_add_source(loop, &this->source);
add_node(this, driver);
}
return 0;
}
static int
do_node_remove(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
static void add_node_to_graph(struct pw_impl_node *node)
{
struct pw_impl_node *this = user_data;
if (this->rt.added) {
if (!this->remote)
spa_loop_remove_source(loop, &this->source);
remove_node(this);
this->rt.added = false;
struct pw_impl_node *driver = node->driver_node;
if (!node->rt.added) {
pw_loop_invoke(node->data_loop, do_node_prepare, 1, NULL, 0, true, node);
if (!node->exported)
pw_loop_invoke(driver->data_loop, do_node_add, 1, NULL, 0, true, node);
node->rt.added = true;
}
}
static void remove_node_from_graph(struct pw_impl_node *node)
{
struct pw_impl_node *driver = node->driver_node;
if (node->rt.added) {
if (!node->exported)
pw_loop_invoke(driver->data_loop, do_node_remove, 1, NULL, 0, true, node);
pw_loop_invoke(node->data_loop, do_node_unprepare, 1, NULL, 0, true, node);
node->rt.added = false;
}
return 0;
}
static void node_deactivate(struct pw_impl_node *this)
@ -207,7 +236,7 @@ static void node_deactivate(struct pw_impl_node *this)
pw_log_debug("%p: deactivate", this);
/* make sure the node doesn't get woken up while not active */
pw_loop_invoke(this->data_loop, do_node_remove, 1, NULL, 0, true, this);
remove_node_from_graph(this);
spa_list_for_each(port, &this->input_ports, link) {
spa_list_for_each(link, &port->links, input_link)
@ -376,7 +405,7 @@ static void node_update_state(struct pw_impl_node *node, enum pw_node_state stat
node->driving, node->driver, node->rt.added);
if (res >= 0) {
pw_loop_invoke(node->data_loop, do_node_add, 1, NULL, 0, true, node);
add_node_to_graph(node);
}
if (node->driving && node->driver) {
res = spa_node_send_command(node->node,
@ -384,7 +413,7 @@ static void node_update_state(struct pw_impl_node *node, enum pw_node_state stat
if (res < 0) {
state = PW_NODE_STATE_ERROR;
error = spa_aprintf("Start error: %s", spa_strerror(res));
pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node);
remove_node_from_graph(node);
}
}
break;
@ -392,7 +421,7 @@ static void node_update_state(struct pw_impl_node *node, enum pw_node_state stat
case PW_NODE_STATE_SUSPENDED:
case PW_NODE_STATE_ERROR:
if (state != PW_NODE_STATE_IDLE || node->pause_on_idle)
pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node);
remove_node_from_graph(node);
break;
default:
break;
@ -873,24 +902,6 @@ int pw_impl_node_initialized(struct pw_impl_node *this)
return 0;
}
static int
do_move_nodes(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct impl *impl = user_data;
struct pw_impl_node *old = *(struct pw_impl_node **)data;
struct pw_impl_node *node = &impl->this;
struct pw_impl_node *driver = node->driver_node;
pw_log_trace("%p: driver:%p->%p", node, old, driver);
if (node->rt.added) {
remove_node(node);
add_node(node, driver);
}
return 0;
}
static void remove_segment_owner(struct pw_impl_node *driver, uint32_t node_id)
{
struct pw_node_activation *a = driver->rt.target.activation;
@ -938,9 +949,8 @@ int pw_impl_node_set_driver(struct pw_impl_node *node, struct pw_impl_node *driv
node->driver_node = driver;
node->moved = true;
pw_loop_invoke(node->data_loop,
do_move_nodes, SPA_ID_INVALID, &old, sizeof(struct pw_impl_node *),
true, impl);
remove_node_from_graph(node);
add_node_to_graph(node);
pw_impl_node_emit_driver_changed(node, old, driver);
@ -2624,7 +2634,7 @@ int pw_impl_node_set_active(struct pw_impl_node *node, bool active)
pw_context_recalc_graph(node->context,
active ? "node activate" : "node deactivate");
else if (!active && node->exported)
pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node);
remove_node_from_graph(node);
}
return 0;
}

View file

@ -739,7 +739,7 @@ struct pw_impl_node {
struct spa_ratelimit rate_limit;
bool added; /**< the node was add to graph */
bool added; /**< the node was added to driver */
} rt;
struct spa_fraction target_rate;
uint64_t target_quantum;