modules: use acquire/release loop

Expose the acquire_loop/release_loop functions and use them in the
modules.

Make sure the nodes created from the module use the same data loop as
the module. We need to ensure this because otherwise, the nodes might
be scheduled on different data loops and the invoke or timer logic will
fail.
This commit is contained in:
Wim Taymans 2024-04-22 16:19:02 +02:00
parent c12cf748b6
commit c3d4abd7f1
11 changed files with 79 additions and 38 deletions

View file

@ -236,7 +236,7 @@ static const struct spa_dict_item module_props[] = {
struct impl {
struct pw_context *context;
struct pw_loop *main_loop;
struct pw_data_loop *data_loop;
struct pw_loop *data_loop;
struct pw_properties *props;
@ -540,7 +540,7 @@ static void resize_delay(struct stream *stream, uint32_t size)
for (i = 0; i < channels; ++i)
ringbuffer_init(&info.delay[i], SPA_PTROFF(info.buf, i*size, void), size);
pw_data_loop_invoke(stream->impl->data_loop, do_replace_delay, 0, NULL, 0, true, &info);
pw_loop_invoke(stream->impl->data_loop, do_replace_delay, 0, NULL, 0, true, &info);
free(info.buf);
}
@ -603,7 +603,7 @@ static int do_clear_delaybuf(struct spa_loop *loop, bool async, uint32_t seq,
static void clear_delaybuf(struct impl *impl)
{
pw_data_loop_invoke(impl->data_loop, do_clear_delaybuf, 0, NULL, 0, true, impl);
pw_loop_invoke(impl->data_loop, do_clear_delaybuf, 0, NULL, 0, true, impl);
}
static int do_add_stream(struct spa_loop *loop, bool async, uint32_t seq,
@ -635,7 +635,7 @@ static void remove_stream(struct stream *s, bool destroy)
{
pw_log_debug("destroy stream %d", s->id);
pw_data_loop_invoke(s->impl->data_loop, do_remove_stream, 0, NULL, 0, true, s);
pw_loop_invoke(s->impl->data_loop, do_remove_stream, 0, NULL, 0, true, s);
if (destroy && s->stream) {
spa_hook_remove(&s->stream_listener);
@ -861,7 +861,7 @@ static int create_stream(struct stream_info *info)
direction, PW_ID_ANY, flags, params, n_params)) < 0)
goto error;
pw_data_loop_invoke(impl->data_loop, do_add_stream, 0, NULL, 0, true, s);
pw_loop_invoke(impl->data_loop, do_add_stream, 0, NULL, 0, true, s);
update_delay(impl);
return 0;
@ -1385,6 +1385,8 @@ static void impl_destroy(struct impl *impl)
pw_core_disconnect(impl->core);
impl->core = NULL;
}
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
pw_properties_free(impl->stream_props);
pw_properties_free(impl->combine_props);
@ -1434,8 +1436,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
return -errno;
pw_log_debug("module %p: new %s", impl, args);
impl->main_loop = pw_context_get_main_loop(context);
impl->data_loop = pw_context_get_data_loop(context);
impl->module = module;
impl->context = context;
spa_list_init(&impl->streams);
@ -1454,6 +1456,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
}
impl->props = props;
impl->main_loop = pw_context_get_main_loop(context);
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
if ((str = pw_properties_get(props, "combine.mode")) == NULL)
str = "sink";
@ -1488,8 +1493,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
goto error;
}
impl->module = module;
impl->context = context;
pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name);
if (pw_properties_get(props, PW_KEY_NODE_GROUP) == NULL)
pw_properties_setf(props, PW_KEY_NODE_GROUP, "combine-%s-%u-%u",
@ -1523,6 +1527,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "stream.props")) != NULL)
pw_properties_update_string(impl->stream_props, str, strlen(str));
copy_props(props, impl->combine_props, PW_KEY_NODE_LOOP_NAME);
copy_props(props, impl->combine_props, PW_KEY_AUDIO_CHANNELS);
copy_props(props, impl->combine_props, SPA_KEY_AUDIO_POSITION);
copy_props(props, impl->combine_props, PW_KEY_NODE_NAME);
@ -1537,6 +1542,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
parse_audio_info(impl->combine_props, &impl->info);
copy_props(props, impl->stream_props, PW_KEY_NODE_LOOP_NAME);
copy_props(props, impl->stream_props, PW_KEY_NODE_GROUP);
copy_props(props, impl->stream_props, PW_KEY_NODE_VIRTUAL);
copy_props(props, impl->stream_props, PW_KEY_NODE_LINK_GROUP);

View file

@ -1284,6 +1284,9 @@ static void impl_destroy(struct impl *impl)
if (impl->ffado_timer)
pw_loop_destroy_source(impl->data_loop, impl->ffado_timer);
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
pw_properties_free(impl->sink.props);
pw_properties_free(impl->source.props);
pw_properties_free(impl->props);
@ -1378,7 +1381,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
{
struct pw_context *context = pw_impl_module_get_context(module);
struct pw_properties *props = NULL;
struct pw_data_loop *data_loop;
struct impl *impl;
const char *str;
int res;
@ -1442,8 +1444,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->module = module;
impl->context = context;
impl->main_loop = pw_context_get_main_loop(context);
data_loop = pw_context_get_data_loop(context);
impl->data_loop = pw_data_loop_get_loop(data_loop);
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
impl->system = impl->main_loop->system;
impl->reset_work_id = SPA_ID_INVALID;
@ -1473,6 +1474,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
goto error;
}
pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name);
if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL)
pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true");
if (pw_properties_get(props, PW_KEY_NODE_GROUP) == NULL)
@ -1497,6 +1499,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "source.props")) != NULL)
pw_properties_update_string(impl->source.props, str, strlen(str));
copy_props(impl, props, PW_KEY_NODE_LOOP_NAME);
copy_props(impl, props, PW_KEY_NODE_LINK_GROUP);
copy_props(impl, props, PW_KEY_NODE_GROUP);
copy_props(impl, props, PW_KEY_NODE_VIRTUAL);

View file

@ -1124,6 +1124,9 @@ static void impl_destroy(struct impl *impl)
if (impl->timer)
pw_loop_destroy_source(impl->main_loop, impl->timer);
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
pw_properties_free(impl->sink.props);
pw_properties_free(impl->source.props);
pw_properties_free(impl->props);
@ -1200,7 +1203,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
{
struct pw_context *context = pw_impl_module_get_context(module);
struct pw_properties *props = NULL;
struct pw_data_loop *data_loop;
struct impl *impl;
const char *str;
int res;
@ -1211,6 +1213,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (impl == NULL)
return -errno;
impl->module = module;
impl->context = context;
pw_log_debug("module %p: new %s", impl, args);
if (args == NULL)
@ -1223,8 +1228,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
goto error;
}
impl->props = props;
data_loop = pw_context_get_data_loop(context);
impl->data_loop = pw_data_loop_get_loop(data_loop);
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
impl->quantum_limit = pw_properties_get_uint32(
pw_context_get_properties(context),
"default.clock.quantum-limit", 8192u);
@ -1237,8 +1241,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
goto error;
}
impl->module = module;
impl->context = context;
impl->main_loop = pw_context_get_main_loop(context);
impl->system = impl->main_loop->system;
@ -1264,6 +1266,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->latency = pw_properties_get_uint32(impl->props, "netjack2.latency",
DEFAULT_NETWORK_LATENCY);
pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name);
if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL)
pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true");
if (pw_properties_get(props, PW_KEY_NODE_GROUP) == NULL)
@ -1284,6 +1287,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "source.props")) != NULL)
pw_properties_update_string(impl->source.props, str, strlen(str));
copy_props(impl, props, PW_KEY_NODE_LOOP_NAME);
copy_props(impl, props, PW_KEY_AUDIO_CHANNELS);
copy_props(impl, props, SPA_KEY_AUDIO_POSITION);
copy_props(impl, props, PW_KEY_NODE_ALWAYS_PROCESS);

View file

@ -1154,6 +1154,9 @@ static void impl_destroy(struct impl *impl)
if (impl->core && impl->do_disconnect)
pw_core_disconnect(impl->core);
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
pw_properties_free(impl->sink_props);
pw_properties_free(impl->source_props);
pw_properties_free(impl->props);
@ -1230,7 +1233,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
{
struct pw_context *context = pw_impl_module_get_context(module);
struct pw_properties *props = NULL;
struct pw_data_loop *data_loop;
struct impl *impl;
const char *str;
int res;
@ -1244,6 +1246,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
pw_log_debug("module %p: new %s", impl, args);
spa_list_init(&impl->follower_list);
impl->module = module;
impl->context = context;
if (args == NULL)
args = "";
@ -1254,8 +1259,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
goto error;
}
impl->props = props;
data_loop = pw_context_get_data_loop(context);
impl->data_loop = pw_data_loop_get_loop(data_loop);
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
impl->quantum_limit = pw_properties_get_uint32(
pw_context_get_properties(context),
"default.clock.quantum-limit", 8192u);
@ -1268,8 +1272,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
goto error;
}
impl->module = module;
impl->context = context;
impl->main_loop = pw_context_get_main_loop(context);
impl->system = impl->main_loop->system;
@ -1313,6 +1315,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->kbps = pw_properties_get_uint32(impl->props, "netjack2.kbps",
DEFAULT_KBPS);
pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name);
if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL)
pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true");
if (pw_properties_get(props, PW_KEY_NODE_NETWORK) == NULL)
@ -1337,6 +1340,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "source.props")) != NULL)
pw_properties_update_string(impl->source_props, str, strlen(str));
copy_props(impl, props, PW_KEY_NODE_LOOP_NAME);
copy_props(impl, props, PW_KEY_NODE_VIRTUAL);
copy_props(impl, props, PW_KEY_NODE_NETWORK);
copy_props(impl, props, PW_KEY_NODE_LINK_GROUP);

View file

@ -731,6 +731,8 @@ static void impl_destroy(struct impl *impl)
if (impl->fd >= 0)
close(impl->fd);
pw_context_release_loop(impl->context, impl->data_loop);
pw_properties_free(impl->stream_props);
pw_properties_free(impl->props);
@ -857,7 +859,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
struct pw_properties *props = NULL;
struct impl *impl;
const char *str, *media_class = NULL;
struct pw_data_loop *data_loop;
int res;
PW_LOG_TOPIC_INIT(mod_topic);
@ -891,8 +892,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->module = module;
impl->context = context;
impl->main_loop = pw_context_get_main_loop(context);
data_loop = pw_context_get_data_loop(context);
impl->data_loop = pw_data_loop_get_loop(data_loop);
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
if ((str = pw_properties_get(props, "tunnel.mode")) == NULL)
str = "playback";
@ -923,6 +923,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "tunnel.may-pause")) != NULL)
impl->may_pause = spa_atob(str);
pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name);
if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL)
pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true");
if (pw_properties_get(props, PW_KEY_MEDIA_CLASS) == NULL)
@ -931,6 +932,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "stream.props")) != NULL)
pw_properties_update_string(impl->stream_props, str, strlen(str));
copy_props(impl, props, PW_KEY_NODE_LOOP_NAME);
copy_props(impl, props, PW_KEY_AUDIO_FORMAT);
copy_props(impl, props, PW_KEY_AUDIO_RATE);
copy_props(impl, props, PW_KEY_AUDIO_CHANNELS);

View file

@ -1218,6 +1218,9 @@ static void impl_destroy(struct impl *impl)
if (impl->client)
avahi_client_free(impl->client);
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
pw_properties_free(impl->stream_props);
pw_properties_free(impl->props);
@ -1691,7 +1694,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->module = module;
impl->context = context;
impl->loop = pw_context_get_main_loop(context);
impl->data_loop = pw_data_loop_get_loop(pw_context_get_data_loop(context));
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name);
if (pw_properties_get(props, "sess.media") == NULL)
pw_properties_set(props, "sess.media", "midi");
@ -1699,6 +1704,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "stream.props")) != NULL)
pw_properties_update_string(stream_props, str, strlen(str));
copy_props(impl, props, PW_KEY_NODE_LOOP_NAME);
copy_props(impl, props, PW_KEY_AUDIO_FORMAT);
copy_props(impl, props, PW_KEY_AUDIO_RATE);
copy_props(impl, props, PW_KEY_AUDIO_CHANNELS);

View file

@ -461,6 +461,9 @@ static void impl_destroy(struct impl *impl)
if (impl->timer)
pw_loop_destroy_source(impl->loop, impl->timer);
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
pw_properties_free(impl->stream_props);
pw_properties_free(impl->props);
@ -537,11 +540,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->module = module;
impl->context = context;
impl->loop = pw_context_get_main_loop(context);
impl->data_loop = pw_data_loop_get_loop(pw_context_get_data_loop(context));
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
if ((sess_name = pw_properties_get(props, "sess.name")) == NULL)
sess_name = pw_get_host_name();
pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name);
if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL)
pw_properties_setf(props, PW_KEY_NODE_NAME, "rtp_session.%s", sess_name);
if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL)
@ -553,6 +557,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "stream.props")) != NULL)
pw_properties_update_string(stream_props, str, strlen(str));
copy_props(impl, props, PW_KEY_NODE_LOOP_NAME);
copy_props(impl, props, PW_KEY_AUDIO_FORMAT);
copy_props(impl, props, PW_KEY_AUDIO_RATE);
copy_props(impl, props, PW_KEY_AUDIO_CHANNELS);

View file

@ -44,6 +44,8 @@ struct impl {
struct spa_audio_info info;
struct spa_audio_info stream_info;
struct pw_context *context;
struct pw_stream *stream;
struct spa_hook stream_listener;
struct pw_stream_events stream_events;
@ -311,8 +313,6 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core,
enum pw_stream_flags flags;
float latency_msec;
int res;
struct pw_data_loop *data_loop;
struct pw_context *context;
impl = calloc(1, sizeof(*impl));
if (impl == NULL) {
@ -322,10 +322,9 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core,
impl->first = true;
spa_hook_list_init(&impl->listener_list);
impl->stream_events = stream_events;
context = pw_core_get_context(core);
impl->main_loop = pw_context_get_main_loop(context);
data_loop = pw_context_get_data_loop(context);
impl->data_loop = pw_data_loop_get_loop(data_loop);
impl->context = pw_core_get_context(core);
impl->main_loop = pw_context_get_main_loop(impl->context);
impl->data_loop = pw_context_acquire_loop(impl->context, &props->dict);
impl->timer = pw_loop_add_timer(impl->data_loop, on_flush_timeout, impl);
if (impl->timer == NULL) {
res = -errno;
@ -614,6 +613,9 @@ void rtp_stream_destroy(struct rtp_stream *s)
if (impl->timer)
pw_loop_destroy_source(impl->data_loop, impl->timer);
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
spa_hook_list_clean(&impl->listener_list);
free(impl);
}

View file

@ -370,6 +370,9 @@ static void impl_destroy(struct impl *impl)
if (impl->timer)
pw_loop_destroy_source(impl->loop, impl->timer);
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
pw_properties_free(impl->stream_props);
pw_properties_free(impl->props);
@ -444,11 +447,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->module = module;
impl->context = context;
impl->loop = pw_context_get_main_loop(context);
impl->data_loop = pw_data_loop_get_loop(pw_context_get_data_loop(context));
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
if ((sess_name = pw_properties_get(props, "sess.name")) == NULL)
sess_name = pw_get_host_name();
pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name);
if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL)
pw_properties_setf(props, PW_KEY_NODE_NAME, "vban_session.%s", sess_name);
if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL)
@ -460,6 +464,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "stream.props")) != NULL)
pw_properties_update_string(stream_props, str, strlen(str));
copy_props(impl, props, PW_KEY_NODE_LOOP_NAME);
copy_props(impl, props, PW_KEY_AUDIO_FORMAT);
copy_props(impl, props, PW_KEY_AUDIO_RATE);
copy_props(impl, props, PW_KEY_AUDIO_CHANNELS);

View file

@ -130,10 +130,17 @@ const struct spa_support *pw_context_get_support(struct pw_context *context, uin
struct pw_loop *pw_context_get_main_loop(struct pw_context *context);
/** Get the context data loop. This loop runs on the realtime thread. This
* acquires a loop from the generic data.rt class.
* acquires a loop from the generic data.rt class. Use pw_context_acquire_loop() instead.
* Since 0.3.56 */
struct pw_data_loop *pw_context_get_data_loop(struct pw_context *context);
/** Get a data-loop.
* Since 1.1.0 */
struct pw_loop *pw_context_acquire_loop(struct pw_context *context, const struct spa_dict *props);
/** Release a data-loop.
* Since 1.1.0 */
void pw_context_release_loop(struct pw_context *context, struct pw_loop *loop);
/** Get the work queue from the context: Since 0.3.26 */
struct pw_work_queue *pw_context_get_work_queue(struct pw_context *context);

View file

@ -1183,9 +1183,6 @@ void pw_proxy_remove(struct pw_proxy *proxy);
int pw_context_recalc_graph(struct pw_context *context, const char *reason);
struct pw_loop *pw_context_acquire_loop(struct pw_context *context, const struct spa_dict *props);
void pw_context_release_loop(struct pw_context *context, struct pw_loop *loop);
void pw_impl_port_update_info(struct pw_impl_port *port, const struct spa_port_info *info);
int pw_impl_port_register(struct pw_impl_port *port,