From db77f6d37dbd7de4e8e53a74ef7ba6de4404a683 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 8 Dec 2021 20:29:15 +0100 Subject: [PATCH] stream: add TRIGGER stream flag The trigger flag adds an extra dependency on the node so that it does not automatically get scheduled. A manual scheduling is required with, for example pw_stream_trigger_process(). This can be used to create an artificial dependency between a sink stream and a source stream, like when using loopback or filter-chain. Normally those streams are not linked in the graph but they have an internal dependency. Without any such dependency, the source part of the chain will be scheduled first and then the sink part and we get a cycle of delay (with possible quantum change etc). With this patch, the sink part will be scheduled first and its process function will trigger the 'downstream' source stream explicitly. The sink and source stream will stay in sync and will use the same quantum. This reduces the latency and glitches because of quantum changes. Fixes #1873 --- src/modules/module-filter-chain.c | 5 ++++- src/modules/module-loopback.c | 5 ++++- src/pipewire/impl-node.c | 4 ++++ src/pipewire/keys.h | 3 +++ src/pipewire/stream.c | 7 ++++++- src/pipewire/stream.h | 5 +++++ 6 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/modules/module-filter-chain.c b/src/modules/module-filter-chain.c index 5f2194e4d..566b1fde2 100644 --- a/src/modules/module-filter-chain.c +++ b/src/modules/module-filter-chain.c @@ -319,6 +319,8 @@ done: pw_stream_queue_buffer(impl->capture, in); if (out != NULL) pw_stream_queue_buffer(impl->playback, out); + + pw_stream_trigger_process(impl->playback); } static float get_default(struct impl *impl, struct descriptor *desc, uint32_t p) @@ -699,7 +701,8 @@ static int setup_streams(struct impl *impl) PW_ID_ANY, PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | - PW_STREAM_FLAG_RT_PROCESS, + PW_STREAM_FLAG_RT_PROCESS | + PW_STREAM_FLAG_TRIGGER, params, n_params); free(b.data); diff --git a/src/modules/module-loopback.c b/src/modules/module-loopback.c index 05a2e1e65..4622244c7 100644 --- a/src/modules/module-loopback.c +++ b/src/modules/module-loopback.c @@ -166,6 +166,8 @@ static void capture_process(void *d) pw_stream_queue_buffer(impl->capture, in); if (out != NULL) pw_stream_queue_buffer(impl->playback, out); + + pw_stream_trigger_process(impl->playback); } static void param_latency_changed(struct impl *impl, const struct spa_pod *param, @@ -294,7 +296,8 @@ static int setup_streams(struct impl *impl) PW_ID_ANY, PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | - PW_STREAM_FLAG_RT_PROCESS, + PW_STREAM_FLAG_RT_PROCESS | + PW_STREAM_FLAG_TRIGGER, params, n_params)) < 0) return res; diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 7f94fed2e..748a67752 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -876,6 +876,10 @@ static void check_properties(struct pw_impl_node *node) recalc_reason = "driver changed"; } + /* not scheduled automatically so we add an additional required trigger */ + if (pw_properties_get_bool(node->properties, PW_KEY_NODE_TRIGGER, false)) + node->rt.activation->state[0].required++; + /* group defines what nodes are scheduled together */ if ((str = pw_properties_get(node->properties, PW_KEY_NODE_GROUP)) == NULL) str = ""; diff --git a/src/pipewire/keys.h b/src/pipewire/keys.h index 0a00fdcfb..2be4bd741 100644 --- a/src/pipewire/keys.h +++ b/src/pipewire/keys.h @@ -183,6 +183,9 @@ extern "C" { #define PW_KEY_NODE_LINK_GROUP "node.link-group" /**< the node is internally linked to * nodes with the same link-group */ #define PW_KEY_NODE_NETWORK "node.network" /**< the node is on a network */ +#define PW_KEY_NODE_TRIGGER "node.trigger" /**< the node is not scheduled automatically + * based on the dependencies in the graph + * but it will be triggered explicitly. */ /** Port keys */ #define PW_KEY_PORT_ID "port.id" /**< port id */ diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 4c322d57f..3c0fdc312 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -168,6 +168,7 @@ struct stream { unsigned int process_rt:1; unsigned int driving:1; unsigned int using_trigger:1; + unsigned int trigger:1; }; static int get_param_index(uint32_t id) @@ -1766,6 +1767,10 @@ pw_stream_connect(struct pw_stream *stream, pw_properties_set(stream->properties, PW_KEY_NODE_EXCLUSIVE, "true"); if (flags & PW_STREAM_FLAG_DONT_RECONNECT) pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true"); + if (flags & PW_STREAM_FLAG_TRIGGER) { + pw_properties_set(stream->properties, PW_KEY_NODE_TRIGGER, "true"); + impl->trigger = true; + } if ((str = pw_properties_get(stream->properties, "mem.warn-mlock")) != NULL) impl->warn_mlock = pw_properties_parse_bool(str); @@ -2215,7 +2220,7 @@ int pw_stream_trigger_process(struct pw_stream *stream) /* flag to check for old or new behaviour */ impl->using_trigger = true; - if (!impl->driving) { + if (!impl->driving && !impl->trigger) { res = trigger_request_process(impl); } else { if (impl->direction == SPA_DIRECTION_OUTPUT && diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index 5404ee90a..55755fca0 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -271,6 +271,11 @@ enum pw_stream_flags { PW_STREAM_FLAG_ALLOC_BUFFERS = (1 << 8), /**< the application will allocate buffer * memory. In the add_buffer event, the * data of the buffer should be set */ + PW_STREAM_FLAG_TRIGGER = (1 << 9), /**< the output stream will not be scheduled + * automatically but _trigger_process() + * needs to be called. This can be used + * when the output of the stream depends + * on input from other streams. */ }; /** Create a new unconneced \ref pw_stream