From ffb0eff7083d43ae55e34d87b124d90a483a191b Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 27 May 2024 16:48:54 +0200 Subject: [PATCH] impl-node: resume peer targets when unlinking Atomically change the node status from TRIGGERED to AWAKE. Only trigger the peer nodes when the node was previously in the AWAKE state. When we remove a node from the graph or when we destroy a link, we need to manually resume the peers. We can do this now by atomically setting the node to FINISHED and checking if it was previously != FINISHED. This ensures that removing nodes/links never leaves some nodes (and also the driver) untriggered and cause a xruns. Fixes #4026 --- pipewire-jack/src/pipewire-jack.c | 32 +++++++++--------- src/pipewire/impl-link.c | 4 ++- src/pipewire/impl-node.c | 56 ++++++++++--------------------- src/pipewire/private.h | 34 +++++++++++++++++++ 4 files changed, 70 insertions(+), 56 deletions(-) diff --git a/pipewire-jack/src/pipewire-jack.c b/pipewire-jack/src/pipewire-jack.c index 5a17d0006..8525dfeb6 100644 --- a/pipewire-jack/src/pipewire-jack.c +++ b/pipewire-jack/src/pipewire-jack.c @@ -1801,13 +1801,6 @@ static inline int check_sample_rate(struct client *c, struct spa_io_position *po return c->sample_rate == sample_rate; } -static inline uint64_t get_time_ns(void) -{ - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return SPA_TIMESPEC_TO_NSEC(&ts); -} - static inline uint32_t cycle_run(struct client *c) { uint64_t cmd; @@ -1826,9 +1819,6 @@ static inline uint32_t cycle_run(struct client *c) } break; } - activation->status = PW_NODE_ACTIVATION_AWAKE; - activation->awake_time = get_time_ns(); - if (SPA_UNLIKELY(cmd > 1)) { pw_log_info("%p: missed %"PRIu64" wakeups", c, cmd - 1); activation->xrun_count += cmd - 1; @@ -1837,6 +1827,13 @@ static inline uint32_t cycle_run(struct client *c) activation->max_delay = SPA_MAX(activation->max_delay, 0u); } + if (!SPA_ATOMIC_CAS(activation->status, + PW_NODE_ACTIVATION_TRIGGERED, + PW_NODE_ACTIVATION_AWAKE)) + return 0; + + activation->awake_time = get_time_ns(c->l->system); + if (SPA_UNLIKELY(c->first)) { if (c->thread_init_callback) c->thread_init_callback(c->thread_init_arg); @@ -1895,14 +1892,15 @@ static inline void signal_sync(struct client *c) uint64_t cmd, nsec; struct link *l; struct pw_node_activation *activation = c->activation; + int old_status; complete_process(c, c->buffer_frames); - nsec = get_time_ns(); - activation->status = PW_NODE_ACTIVATION_FINISHED; + nsec = get_time_ns(c->l->system); + old_status = SPA_ATOMIC_XCHG(activation->status, PW_NODE_ACTIVATION_FINISHED); activation->finish_time = nsec; - if (c->async) + if (c->async || old_status != PW_NODE_ACTIVATION_AWAKE) return; cmd = 1; @@ -6529,7 +6527,7 @@ jack_nframes_t jack_frames_since_cycle_start (const jack_client_t *client) return_val_if_fail(c != NULL, 0); get_frame_times(c, ×); - diff = get_time_ns() - times.nsec; + diff = get_time_ns(c->l->system) - times.nsec; return (jack_nframes_t) floor(((double)times.sample_rate * diff) / SPA_NSEC_PER_SEC); } @@ -6627,7 +6625,9 @@ jack_nframes_t jack_time_to_frames(const jack_client_t *client, jack_time_t usec SPA_EXPORT jack_time_t jack_get_time(void) { - return get_time_ns()/SPA_NSEC_PER_USEC; + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return SPA_TIMESPEC_TO_NSEC(&ts); } SPA_EXPORT @@ -6828,7 +6828,7 @@ jack_nframes_t jack_get_current_transport_frame (const jack_client_t *client) res = pos.frame; if (state == JackTransportRolling) { - float usecs = get_time_ns()/1000 - pos.usecs; + float usecs = get_time_ns(c->l->system)/1000 - pos.usecs; res += (jack_nframes_t)floor((((float) pos.frame_rate) / 1000000.0f) * usecs); } return res; diff --git a/src/pipewire/impl-link.c b/src/pipewire/impl-link.c index 60117fea4..940d25558 100644 --- a/src/pipewire/impl-link.c +++ b/src/pipewire/impl-link.c @@ -117,8 +117,10 @@ static void pw_node_peer_deactivate(struct pw_node_peer *peer) spa_list_remove(&peer->target.link); if (peer->target.active) { - if (!peer->output->async) + if (!peer->output->async) { state->required--; + trigger_target(&peer->target, get_time_ns(peer->target.system)); + } peer->target.active = false; } } diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index fe341a945..0194a23d5 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -198,6 +198,7 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, struct pw_impl_node *this = user_data; struct pw_node_activation_state *dstate, *nstate; struct pw_node_target *t; + int old_state; pw_log_trace("%p: remove from driver %s %p %p %d", this, this->rt.driver_target.name, @@ -207,6 +208,8 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, if (!this->rt.added) return 0; + old_state = SPA_ATOMIC_XCHG(this->rt.target.activation->status, PW_NODE_ACTIVATION_FINISHED); + /* remove from driver */ spa_list_remove(&this->rt.target.link); nstate = &this->rt.target.activation->state[0]; @@ -218,8 +221,11 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, spa_list_for_each(t, &this->rt.target_list, link) { dstate = &t->activation->state[0]; if (t->active) { - if (!this->async) + if (!this->async) { dstate->required--; + if (old_state != PW_NODE_ACTIVATION_FINISHED) + trigger_target(t, get_time_ns(this->rt.target.system)); + } t->active = false; } pw_log_trace("%p: driver state:%p pending:%d/%d, node state:%p pending:%d/%d", @@ -1343,40 +1349,6 @@ static void check_states(struct pw_impl_node *driver, uint64_t nsec) } } -static inline uint64_t get_time_ns(struct spa_system *system) -{ - struct timespec ts; - spa_system_clock_gettime(system, CLOCK_MONOTONIC, &ts); - return SPA_TIMESPEC_TO_NSEC(&ts); -} - -static inline void wake_target(struct pw_node_target *t, uint64_t nsec) -{ - struct pw_node_activation *a = t->activation; - - if (SPA_ATOMIC_CAS(a->status, - PW_NODE_ACTIVATION_NOT_TRIGGERED, - PW_NODE_ACTIVATION_TRIGGERED)) { - a->signal_time = nsec; - if (SPA_UNLIKELY(spa_system_eventfd_write(t->system, t->fd, 1) < 0)) - pw_log_warn("%p: write failed %m", t->node); - } -} - -/* called from data-loop decrement the dependency counter of the target and when - * there are no more dependencies, trigger the node. */ -static inline void trigger_target(struct pw_node_target *t, uint64_t nsec) -{ - struct pw_node_activation *a = t->activation; - struct pw_node_activation_state *state = &a->state[0]; - - pw_log_trace_fp("%p: (%s-%u) state:%p pending:%d/%d", t->node, - t->name, t->id, state, state->pending, state->required); - - if (pw_node_activation_state_dec(state)) - wake_target(t, nsec); -} - /* called from data-loop when all the targets of a node need to be triggered */ static inline int trigger_targets(struct pw_node_target *t, int status, uint64_t nsec) { @@ -1423,14 +1395,18 @@ static inline int process_node(void *data) struct pw_impl_port *p; struct pw_node_activation *a = this->rt.target.activation; struct spa_system *data_system = this->rt.target.system; - int status; + int status, old_status; uint64_t nsec; + if (!SPA_ATOMIC_CAS(a->status, + PW_NODE_ACTIVATION_TRIGGERED, + PW_NODE_ACTIVATION_AWAKE)) + return 0; + nsec = get_time_ns(data_system); pw_log_trace_fp("%p: %s process remote:%u exported:%u %"PRIu64" %"PRIu64, this, this->name, this->remote, this->exported, a->signal_time, nsec); - a->status = PW_NODE_ACTIVATION_AWAKE; a->awake_time = nsec; /* when transport sync is not supported, just clear the flag */ @@ -1462,13 +1438,13 @@ static inline int process_node(void *data) nsec = get_time_ns(data_system); pw_log_trace_fp("%p: finished status:%d %"PRIu64, this, status, nsec); - a->status = PW_NODE_ACTIVATION_FINISHED; + old_status = SPA_ATOMIC_XCHG(a->status, PW_NODE_ACTIVATION_FINISHED); a->finish_time = nsec; /* we don't need to trigger targets when the node was driving the * graph because that means we finished the graph. */ if (SPA_LIKELY(!this->driving)) { - if (!this->async) + if (!this->async && old_status == PW_NODE_ACTIVATION_AWAKE) trigger_targets(&this->rt.target, status, nsec); } else { /* calculate CPU time when finished */ @@ -1645,6 +1621,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, reset_position(this, &this->rt.target.activation->position); this->rt.target.activation->sync_timeout = DEFAULT_SYNC_TIMEOUT; this->rt.target.activation->sync_left = 0; + this->rt.target.activation->status = PW_NODE_ACTIVATION_FINISHED; this->rt.rate_limit.interval = 2 * SPA_NSEC_PER_SEC; this->rt.rate_limit.burst = 1; @@ -2009,6 +1986,7 @@ static int node_ready(void *data, int status) " pending %d/%d cycle:%u", node->name, node->info.id, state, a->position.clock.duration, pending, state->required, a->position.clock.cycle); + SPA_ATOMIC_XCHG(a->status, PW_NODE_ACTIVATION_TRIGGERED); process_node(node); check_states(node, nsec); pw_impl_node_rt_emit_incomplete(node); diff --git a/src/pipewire/private.h b/src/pipewire/private.h index fecdf2717..48eadb54d 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -611,6 +611,40 @@ struct pw_node_activation { * to update wins */ }; +static inline uint64_t get_time_ns(struct spa_system *system) +{ + struct timespec ts; + spa_system_clock_gettime(system, CLOCK_MONOTONIC, &ts); + return SPA_TIMESPEC_TO_NSEC(&ts); +} + +static inline void wake_target(struct pw_node_target *t, uint64_t nsec) +{ + struct pw_node_activation *a = t->activation; + + if (SPA_ATOMIC_CAS(a->status, + PW_NODE_ACTIVATION_NOT_TRIGGERED, + PW_NODE_ACTIVATION_TRIGGERED)) { + a->signal_time = nsec; + if (SPA_UNLIKELY(spa_system_eventfd_write(t->system, t->fd, 1) < 0)) + pw_log_warn("%p: write failed %m", t->node); + } +} + +/* called from data-loop decrement the dependency counter of the target and when + * there are no more dependencies, trigger the node. */ +static inline void trigger_target(struct pw_node_target *t, uint64_t nsec) +{ + struct pw_node_activation *a = t->activation; + struct pw_node_activation_state *state = &a->state[0]; + + pw_log_trace_fp("%p: (%s-%u) state:%p pending:%d/%d", t->node, + t->name, t->id, state, state->pending, state->required); + + if (pw_node_activation_state_dec(state)) + wake_target(t, nsec); +} + #define pw_impl_node_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_impl_node_events, m, v, ##__VA_ARGS__) #define pw_impl_node_emit_destroy(n) pw_impl_node_emit(n, destroy, 0) #define pw_impl_node_emit_free(n) pw_impl_node_emit(n, free, 0)