impl-node: resume peer targets when unlinking

Atomically change the node status from TRIGGERED to AWAKE. Only trigger
the peer nodes when the node was previously in the AWAKE state.

When we remove a node from the graph or when we destroy a link, we need
to manually resume the peers. We can do this now by atomically setting
the node to FINISHED and checking if it was previously != FINISHED.

This ensures that removing nodes/links never leaves some nodes (and also
the driver) untriggered and cause a xruns.

Fixes #4026
This commit is contained in:
Wim Taymans 2024-05-27 16:48:54 +02:00
parent 9f32b89e5a
commit ffb0eff708
4 changed files with 70 additions and 56 deletions

View File

@ -1801,13 +1801,6 @@ static inline int check_sample_rate(struct client *c, struct spa_io_position *po
return c->sample_rate == sample_rate;
}
static inline uint64_t get_time_ns(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return SPA_TIMESPEC_TO_NSEC(&ts);
}
static inline uint32_t cycle_run(struct client *c)
{
uint64_t cmd;
@ -1826,9 +1819,6 @@ static inline uint32_t cycle_run(struct client *c)
}
break;
}
activation->status = PW_NODE_ACTIVATION_AWAKE;
activation->awake_time = get_time_ns();
if (SPA_UNLIKELY(cmd > 1)) {
pw_log_info("%p: missed %"PRIu64" wakeups", c, cmd - 1);
activation->xrun_count += cmd - 1;
@ -1837,6 +1827,13 @@ static inline uint32_t cycle_run(struct client *c)
activation->max_delay = SPA_MAX(activation->max_delay, 0u);
}
if (!SPA_ATOMIC_CAS(activation->status,
PW_NODE_ACTIVATION_TRIGGERED,
PW_NODE_ACTIVATION_AWAKE))
return 0;
activation->awake_time = get_time_ns(c->l->system);
if (SPA_UNLIKELY(c->first)) {
if (c->thread_init_callback)
c->thread_init_callback(c->thread_init_arg);
@ -1895,14 +1892,15 @@ static inline void signal_sync(struct client *c)
uint64_t cmd, nsec;
struct link *l;
struct pw_node_activation *activation = c->activation;
int old_status;
complete_process(c, c->buffer_frames);
nsec = get_time_ns();
activation->status = PW_NODE_ACTIVATION_FINISHED;
nsec = get_time_ns(c->l->system);
old_status = SPA_ATOMIC_XCHG(activation->status, PW_NODE_ACTIVATION_FINISHED);
activation->finish_time = nsec;
if (c->async)
if (c->async || old_status != PW_NODE_ACTIVATION_AWAKE)
return;
cmd = 1;
@ -6529,7 +6527,7 @@ jack_nframes_t jack_frames_since_cycle_start (const jack_client_t *client)
return_val_if_fail(c != NULL, 0);
get_frame_times(c, &times);
diff = get_time_ns() - times.nsec;
diff = get_time_ns(c->l->system) - times.nsec;
return (jack_nframes_t) floor(((double)times.sample_rate * diff) / SPA_NSEC_PER_SEC);
}
@ -6627,7 +6625,9 @@ jack_nframes_t jack_time_to_frames(const jack_client_t *client, jack_time_t usec
SPA_EXPORT
jack_time_t jack_get_time(void)
{
return get_time_ns()/SPA_NSEC_PER_USEC;
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return SPA_TIMESPEC_TO_NSEC(&ts);
}
SPA_EXPORT
@ -6828,7 +6828,7 @@ jack_nframes_t jack_get_current_transport_frame (const jack_client_t *client)
res = pos.frame;
if (state == JackTransportRolling) {
float usecs = get_time_ns()/1000 - pos.usecs;
float usecs = get_time_ns(c->l->system)/1000 - pos.usecs;
res += (jack_nframes_t)floor((((float) pos.frame_rate) / 1000000.0f) * usecs);
}
return res;

View File

@ -117,8 +117,10 @@ static void pw_node_peer_deactivate(struct pw_node_peer *peer)
spa_list_remove(&peer->target.link);
if (peer->target.active) {
if (!peer->output->async)
if (!peer->output->async) {
state->required--;
trigger_target(&peer->target, get_time_ns(peer->target.system));
}
peer->target.active = false;
}
}

View File

@ -198,6 +198,7 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq,
struct pw_impl_node *this = user_data;
struct pw_node_activation_state *dstate, *nstate;
struct pw_node_target *t;
int old_state;
pw_log_trace("%p: remove from driver %s %p %p %d",
this, this->rt.driver_target.name,
@ -207,6 +208,8 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq,
if (!this->rt.added)
return 0;
old_state = SPA_ATOMIC_XCHG(this->rt.target.activation->status, PW_NODE_ACTIVATION_FINISHED);
/* remove from driver */
spa_list_remove(&this->rt.target.link);
nstate = &this->rt.target.activation->state[0];
@ -218,8 +221,11 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq,
spa_list_for_each(t, &this->rt.target_list, link) {
dstate = &t->activation->state[0];
if (t->active) {
if (!this->async)
if (!this->async) {
dstate->required--;
if (old_state != PW_NODE_ACTIVATION_FINISHED)
trigger_target(t, get_time_ns(this->rt.target.system));
}
t->active = false;
}
pw_log_trace("%p: driver state:%p pending:%d/%d, node state:%p pending:%d/%d",
@ -1343,40 +1349,6 @@ static void check_states(struct pw_impl_node *driver, uint64_t nsec)
}
}
static inline uint64_t get_time_ns(struct spa_system *system)
{
struct timespec ts;
spa_system_clock_gettime(system, CLOCK_MONOTONIC, &ts);
return SPA_TIMESPEC_TO_NSEC(&ts);
}
static inline void wake_target(struct pw_node_target *t, uint64_t nsec)
{
struct pw_node_activation *a = t->activation;
if (SPA_ATOMIC_CAS(a->status,
PW_NODE_ACTIVATION_NOT_TRIGGERED,
PW_NODE_ACTIVATION_TRIGGERED)) {
a->signal_time = nsec;
if (SPA_UNLIKELY(spa_system_eventfd_write(t->system, t->fd, 1) < 0))
pw_log_warn("%p: write failed %m", t->node);
}
}
/* called from data-loop decrement the dependency counter of the target and when
* there are no more dependencies, trigger the node. */
static inline void trigger_target(struct pw_node_target *t, uint64_t nsec)
{
struct pw_node_activation *a = t->activation;
struct pw_node_activation_state *state = &a->state[0];
pw_log_trace_fp("%p: (%s-%u) state:%p pending:%d/%d", t->node,
t->name, t->id, state, state->pending, state->required);
if (pw_node_activation_state_dec(state))
wake_target(t, nsec);
}
/* called from data-loop when all the targets of a node need to be triggered */
static inline int trigger_targets(struct pw_node_target *t, int status, uint64_t nsec)
{
@ -1423,14 +1395,18 @@ static inline int process_node(void *data)
struct pw_impl_port *p;
struct pw_node_activation *a = this->rt.target.activation;
struct spa_system *data_system = this->rt.target.system;
int status;
int status, old_status;
uint64_t nsec;
if (!SPA_ATOMIC_CAS(a->status,
PW_NODE_ACTIVATION_TRIGGERED,
PW_NODE_ACTIVATION_AWAKE))
return 0;
nsec = get_time_ns(data_system);
pw_log_trace_fp("%p: %s process remote:%u exported:%u %"PRIu64" %"PRIu64,
this, this->name, this->remote, this->exported,
a->signal_time, nsec);
a->status = PW_NODE_ACTIVATION_AWAKE;
a->awake_time = nsec;
/* when transport sync is not supported, just clear the flag */
@ -1462,13 +1438,13 @@ static inline int process_node(void *data)
nsec = get_time_ns(data_system);
pw_log_trace_fp("%p: finished status:%d %"PRIu64, this, status, nsec);
a->status = PW_NODE_ACTIVATION_FINISHED;
old_status = SPA_ATOMIC_XCHG(a->status, PW_NODE_ACTIVATION_FINISHED);
a->finish_time = nsec;
/* we don't need to trigger targets when the node was driving the
* graph because that means we finished the graph. */
if (SPA_LIKELY(!this->driving)) {
if (!this->async)
if (!this->async && old_status == PW_NODE_ACTIVATION_AWAKE)
trigger_targets(&this->rt.target, status, nsec);
} else {
/* calculate CPU time when finished */
@ -1645,6 +1621,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
reset_position(this, &this->rt.target.activation->position);
this->rt.target.activation->sync_timeout = DEFAULT_SYNC_TIMEOUT;
this->rt.target.activation->sync_left = 0;
this->rt.target.activation->status = PW_NODE_ACTIVATION_FINISHED;
this->rt.rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
this->rt.rate_limit.burst = 1;
@ -2009,6 +1986,7 @@ static int node_ready(void *data, int status)
" pending %d/%d cycle:%u", node->name, node->info.id,
state, a->position.clock.duration,
pending, state->required, a->position.clock.cycle);
SPA_ATOMIC_XCHG(a->status, PW_NODE_ACTIVATION_TRIGGERED);
process_node(node);
check_states(node, nsec);
pw_impl_node_rt_emit_incomplete(node);

View File

@ -611,6 +611,40 @@ struct pw_node_activation {
* to update wins */
};
static inline uint64_t get_time_ns(struct spa_system *system)
{
struct timespec ts;
spa_system_clock_gettime(system, CLOCK_MONOTONIC, &ts);
return SPA_TIMESPEC_TO_NSEC(&ts);
}
static inline void wake_target(struct pw_node_target *t, uint64_t nsec)
{
struct pw_node_activation *a = t->activation;
if (SPA_ATOMIC_CAS(a->status,
PW_NODE_ACTIVATION_NOT_TRIGGERED,
PW_NODE_ACTIVATION_TRIGGERED)) {
a->signal_time = nsec;
if (SPA_UNLIKELY(spa_system_eventfd_write(t->system, t->fd, 1) < 0))
pw_log_warn("%p: write failed %m", t->node);
}
}
/* called from data-loop decrement the dependency counter of the target and when
* there are no more dependencies, trigger the node. */
static inline void trigger_target(struct pw_node_target *t, uint64_t nsec)
{
struct pw_node_activation *a = t->activation;
struct pw_node_activation_state *state = &a->state[0];
pw_log_trace_fp("%p: (%s-%u) state:%p pending:%d/%d", t->node,
t->name, t->id, state, state->pending, state->required);
if (pw_node_activation_state_dec(state))
wake_target(t, nsec);
}
#define pw_impl_node_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_impl_node_events, m, v, ##__VA_ARGS__)
#define pw_impl_node_emit_destroy(n) pw_impl_node_emit(n, destroy, 0)
#define pw_impl_node_emit_free(n) pw_impl_node_emit(n, free, 0)