protocol-simple: add per stream properties and formats

Add capture.props and playback.props to configure the created streams
with arbitrary properties.

Improve format parsing, make it possible to have different formats peer
stream.

Improve some of the property handling.

This can now also be used to upload a stream to a snapcast server, add
an example of this to the docs.
This commit is contained in:
Wim Taymans 2024-05-14 17:30:05 +02:00
parent 37224ac84c
commit 78fbe6f750
2 changed files with 219 additions and 95 deletions

View file

@ -958,7 +958,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (impl->info.rate != 0 &&
pw_properties_get(props, PW_KEY_NODE_RATE) == NULL)
pw_properties_setf(props, PW_KEY_NODE_RATE,
"1/%u", impl->info.rate),
"1/%u", impl->info.rate);
copy_props(impl, props, PW_KEY_NODE_RATE);

View file

@ -40,20 +40,23 @@
* Each client that connects will create a capture and/or playback stream,
* depending on the configuration options.
*
* You can also use it to feed audio data to other clients such as the snapcast
* server.
*
* ## Module Name
*
* `libpipewire-module-protocol-simple`
*
* ## Module Options
*
* - `capture`: boolean if capture is enabled. This will create a capture stream
* for each connected client.
* - `playback`: boolean if playback is enabled. This will create a playback
* stream for each connected client.
* - `capture.node`: an optional node serial or name to use for capture.
* - `playback.node`: an optional node serial or name to use for playback.
* - `capture`: boolean if capture is enabled. This will create a capture stream or
* sink for each connected client.
* - `playback`: boolean if playback is enabled. This will create a playback or
* source stream for each connected client.
* - `server.address = []`: an array of server addresses to listen on as
* tcp:(<ip>:)<port>.
* - `capture.props`: optional properties for the capture stream
* - `playback.props`: optional properties for the playback stream
*
* ## General options
*
@ -68,6 +71,7 @@
* - \ref PW_KEY_NODE_RATE
* - \ref PW_KEY_STREAM_CAPTURE_SINK
* - \ref PW_KEY_NODE_NAME
* - \ref PW_KEY_TARGET_OBJECT
*
* By default the server will work with stereo 16 bits samples at 44.1KHz.
*
@ -83,15 +87,6 @@
* # Provide playback stream, client can send data to PipeWire for playback
* playback = true
* #
* # The node name or id to use for capture.
* #capture.node = null
* #
* # To make the capture stream capture the monitor ports
* #stream.capture.sink = false
* #
* # The node name or id to use for playback.
* #playback.node = null
* #
* #audio.rate = 44100
* #audio.format = S16
* #audio.channels = 2
@ -102,10 +97,65 @@
* server.address = [
* "tcp:4711"
* ]
* capture.props = {
* # The node name or id to use for capture.
* #target.object = null
* #
* # To make the capture stream capture the monitor ports
* #stream.capture.sink = false
* #
* # Make this a sink instead of a capture stream
* #media.class = Audio/Sink
* }
* playback.props = {
* # The node name or id to use for playback.
* #target.object = null
* #
* # Make this a source instead of a playback stream
* #media.class = Audio/Source
* }
* }
* }
* ]
*\endcode
*
* ## Example configuration for a snapcast server
*
*\code{.unparsed}
* context.modules = [
* { name = libpipewire-module-protocol-simple
* args = {
* # Provide sink
* capture = true
* audio.rate = 48000
* audio.format = S16
* audio.channels = 2
* audio.position = [ FL FR ]
*
* # The addresses this server listens on for new
* # client connections
* server.address = [
* "tcp:4711"
* ]
* capture.props = {
* # Make this a sink instead of a capture stream
* media.class = Audio/Sink
* }
* }
* }
* ]
*
* On the snapcast server, add the following to the `snapserver.conf` file:
*
*\code{.unparsed}
* [stream]
* sampleformat = 48000:16:2
* source = tcp://127.0.0.1:4711?name=PipeWireSnapcast&mode=client
*\endcode
*
* Snapcast will try to connect to the protocol-simple server and fetch the
* samples from it. Snapcast tries to reconnect when the connection is somehow
* broken.
*/
#define NAME "protocol-simple"
@ -120,22 +170,20 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define DEFAULT_RATE 44100
#define DEFAULT_CHANNELS 2
#define DEFAULT_POSITION "[ FL FR ]"
#define DEFAULT_LATENCY "1024/44100"
#define MAX_CLIENTS 10
#define MODULE_USAGE "( capture=<bool> ) " \
"( playback=<bool> ) " \
"( remote.name=<remote> ) " \
"( node.latency=<num/denom, default:"DEFAULT_LATENCY"> ) " \
"( node.rate=<1/rate, default:1/"SPA_STRINGIFY(DEFAULT_RATE)"> ) " \
"( capture.node=<source-target> ( stream.capture.sink=true )) " \
"( playback.node=<sink-target> ) " \
"( audio.rate=<sample-rate, default:"SPA_STRINGIFY(DEFAULT_RATE)"> ) " \
"( audio.format=<format, default:"DEFAULT_FORMAT"> ) " \
"( audio.channels=<channels, default: "SPA_STRINGIFY(DEFAULT_CHANNELS)"> ) " \
"( audio.position=<position, default:"DEFAULT_POSITION"> ) " \
"( server.address=<[ tcp:(<ip>:)<port>(,...) ], default:"DEFAULT_SERVER"> )" \
"( server.address=<[ tcp:(<ip>:)<port>(,...) ], default:"DEFAULT_SERVER"> ) " \
"( capture.props={ ... } ) " \
"( playback.props={ ... } )" \
static const struct spa_dict_item module_props[] = {
{ PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
@ -154,11 +202,16 @@ struct impl {
struct pw_work_queue *work_queue;
struct pw_properties *capture_props;
struct pw_properties *playback_props;
bool capture;
bool playback;
struct spa_audio_info_raw info;
uint32_t frame_size;
struct spa_audio_info_raw capture_info;
struct spa_audio_info_raw playback_info;
uint32_t capture_frame_size;
uint32_t playback_frame_size;
};
struct client {
@ -337,7 +390,7 @@ static void playback_process(void *data)
size = d->maxsize;
if (buf->requested)
size = SPA_MIN(size, buf->requested * impl->frame_size);
size = SPA_MIN(size, buf->requested * impl->playback_frame_size);
offset = 0;
while (size > 0) {
@ -364,7 +417,7 @@ static void playback_process(void *data)
}
d->chunk->offset = 0;
d->chunk->size = offset;
d->chunk->stride = impl->frame_size;
d->chunk->stride = impl->playback_frame_size;
pw_stream_queue_buffer(client->playback, buf);
}
@ -423,25 +476,12 @@ static int create_streams(struct impl *impl, struct client *client)
uint32_t n_params;
const struct spa_pod *params[1];
uint8_t buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
struct spa_pod_builder b;
struct pw_properties *props;
const char *latency;
int res;
if ((latency = pw_properties_get(impl->props, PW_KEY_NODE_LATENCY)) == NULL)
latency = DEFAULT_LATENCY;
if (impl->capture) {
props = pw_properties_new(
PW_KEY_NODE_LATENCY, latency,
PW_KEY_NODE_RATE, pw_properties_get(impl->props, PW_KEY_NODE_RATE),
PW_KEY_TARGET_OBJECT, pw_properties_get(impl->props, "capture.node"),
PW_KEY_STREAM_CAPTURE_SINK, pw_properties_get(impl->props,
PW_KEY_STREAM_CAPTURE_SINK),
PW_KEY_NODE_NETWORK, "true",
PW_KEY_NODE_NAME, pw_properties_get(impl->props, PW_KEY_NODE_NAME),
NULL);
if (props == NULL)
if ((props = pw_properties_copy(impl->capture_props)) == NULL)
return -errno;
pw_properties_setf(props,
@ -456,13 +496,7 @@ static int create_streams(struct impl *impl, struct client *client)
&capture_stream_events, client);
}
if (impl->playback) {
props = pw_properties_new(
PW_KEY_NODE_LATENCY, latency,
PW_KEY_NODE_RATE, pw_properties_get(impl->props, PW_KEY_NODE_RATE),
PW_KEY_TARGET_OBJECT, pw_properties_get(impl->props, "playback.node"),
PW_KEY_NODE_NETWORK, "true",
PW_KEY_NODE_NAME, pw_properties_get(impl->props, PW_KEY_NODE_NAME),
NULL);
props = pw_properties_copy(impl->playback_props);
if (props == NULL)
return -errno;
@ -479,11 +513,13 @@ static int create_streams(struct impl *impl, struct client *client)
&playback_stream_events, client);
}
n_params = 0;
params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat,
&impl->info);
if (impl->capture) {
n_params = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat,
&impl->capture_info);
if ((res = pw_stream_connect(client->capture,
PW_DIRECTION_INPUT,
PW_ID_ANY,
@ -494,6 +530,11 @@ static int create_streams(struct impl *impl, struct client *client)
return res;
}
if (impl->playback) {
n_params = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat,
&impl->playback_info);
if ((res = pw_stream_connect(client->playback,
PW_DIRECTION_OUTPUT,
PW_ID_ANY,
@ -731,6 +772,8 @@ static void impl_free(struct impl *impl)
spa_hook_remove(&impl->module_listener);
spa_list_consume(s, &impl->server_list, link)
server_free(s);
pw_properties_free(impl->capture_props);
pw_properties_free(impl->playback_props);
pw_properties_free(impl->props);
free(impl);
}
@ -755,21 +798,89 @@ static inline uint32_t channel_from_name(const char *name)
return SPA_AUDIO_CHANNEL_UNKNOWN;
}
static inline uint32_t parse_position(uint32_t *pos, const char *val, size_t len)
static void parse_position(struct spa_audio_info_raw *info, const char *val, size_t len)
{
uint32_t channels = 0;
struct spa_json it[2];
char v[256];
spa_json_init(&it[0], val, len);
if (spa_json_enter_array(&it[0], &it[1]) <= 0)
spa_json_init(&it[1], val, len);
if (spa_json_enter_array(&it[0], &it[1]) <= 0)
spa_json_init(&it[1], val, len);
info->channels = 0;
while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 &&
channels < SPA_AUDIO_MAX_CHANNELS) {
pos[channels++] = channel_from_name(v);
info->channels < SPA_AUDIO_MAX_CHANNELS) {
info->position[info->channels++] = channel_from_name(v);
}
}
static int calc_frame_size(struct spa_audio_info_raw *info)
{
int res = info->channels;
switch (info->format) {
case SPA_AUDIO_FORMAT_U8:
case SPA_AUDIO_FORMAT_S8:
case SPA_AUDIO_FORMAT_ALAW:
case SPA_AUDIO_FORMAT_ULAW:
return res;
case SPA_AUDIO_FORMAT_S16:
case SPA_AUDIO_FORMAT_S16_OE:
case SPA_AUDIO_FORMAT_U16:
return res * 2;
case SPA_AUDIO_FORMAT_S24:
case SPA_AUDIO_FORMAT_S24_OE:
case SPA_AUDIO_FORMAT_U24:
return res * 3;
case SPA_AUDIO_FORMAT_S24_32:
case SPA_AUDIO_FORMAT_S24_32_OE:
case SPA_AUDIO_FORMAT_S32:
case SPA_AUDIO_FORMAT_S32_OE:
case SPA_AUDIO_FORMAT_U32:
case SPA_AUDIO_FORMAT_U32_OE:
case SPA_AUDIO_FORMAT_F32:
case SPA_AUDIO_FORMAT_F32_OE:
return res * 4;
case SPA_AUDIO_FORMAT_F64:
case SPA_AUDIO_FORMAT_F64_OE:
return res * 8;
default:
return 0;
}
}
static int parse_audio_info(const struct pw_properties *props, struct spa_audio_info_raw *info)
{
const char *str;
spa_zero(*info);
if ((str = pw_properties_get(props, PW_KEY_AUDIO_FORMAT)) == NULL)
str = DEFAULT_FORMAT;
info->format = format_from_name(str, strlen(str));
info->rate = pw_properties_get_uint32(props, PW_KEY_AUDIO_RATE, info->rate);
if (info->rate == 0)
info->rate = DEFAULT_RATE;
info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels);
info->channels = SPA_MIN(info->channels, SPA_AUDIO_MAX_CHANNELS);
if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL)
parse_position(info, str, strlen(str));
if (info->channels == 0)
parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION));
return calc_frame_size(info);
}
static void copy_props(struct impl *impl, const char *key)
{
const char *str;
if ((str = pw_properties_get(impl->props, key)) != NULL) {
if (pw_properties_get(impl->capture_props, key) == NULL)
pw_properties_set(impl->capture_props, key, str);
if (pw_properties_get(impl->playback_props, key) == NULL)
pw_properties_set(impl->playback_props, key, str);
}
return channels;
}
static int parse_params(struct impl *impl)
@ -785,49 +896,62 @@ static int parse_params(struct impl *impl)
return -EINVAL;
}
if ((str = pw_properties_get(impl->props, "audio.format")) == NULL)
str = DEFAULT_FORMAT;
impl->info.format = format_from_name(str, strlen(str));
if (impl->info.format == SPA_AUDIO_FORMAT_UNKNOWN) {
pw_log_error("invalid format '%s'", str);
return -EINVAL;
if (pw_properties_get(impl->props, PW_KEY_NODE_VIRTUAL) == NULL)
pw_properties_set(impl->props, PW_KEY_NODE_VIRTUAL, "true");
if (pw_properties_get(impl->props, PW_KEY_NODE_NETWORK) == NULL)
pw_properties_set(impl->props, PW_KEY_NODE_NETWORK, "true");
impl->capture_props = pw_properties_new(
PW_KEY_TARGET_OBJECT, pw_properties_get(impl->props, "capture.node"),
PW_KEY_STREAM_CAPTURE_SINK, pw_properties_get(impl->props,
PW_KEY_STREAM_CAPTURE_SINK),
NULL);
impl->playback_props = pw_properties_new(
PW_KEY_TARGET_OBJECT, pw_properties_get(impl->props, "playback.node"),
NULL);
if (impl->capture_props == NULL || impl->playback_props == NULL) {
pw_log_error("can't create props: %m");
return -errno;
}
impl->info.rate = pw_properties_get_uint32(impl->props, "audio.rate", DEFAULT_RATE);
if (impl->info.rate == 0) {
pw_log_error("invalid rate '%s'", str);
return -EINVAL;
}
impl->info.channels = pw_properties_get_uint32(impl->props, "audio.channels", DEFAULT_CHANNELS);
if (impl->info.channels == 0) {
pw_log_error("invalid channels '%s'", str);
return -EINVAL;
}
if ((str = pw_properties_get(impl->props, "audio.position")) == NULL)
str = DEFAULT_POSITION;
if (parse_position(impl->info.position, str, strlen(str)) != impl->info.channels) {
pw_log_error("invalid position '%s'", str);
if ((str = pw_properties_get(impl->props, "capture.props")) != NULL)
pw_properties_update_string(impl->capture_props, str, strlen(str));
if ((str = pw_properties_get(impl->props, "playback.props")) != NULL)
pw_properties_update_string(impl->playback_props, str, strlen(str));
copy_props(impl, PW_KEY_AUDIO_FORMAT);
copy_props(impl, PW_KEY_AUDIO_RATE);
copy_props(impl, PW_KEY_AUDIO_CHANNELS);
copy_props(impl, SPA_KEY_AUDIO_POSITION);
copy_props(impl, PW_KEY_NODE_RATE);
copy_props(impl, PW_KEY_NODE_NAME);
copy_props(impl, PW_KEY_NODE_DESCRIPTION);
copy_props(impl, PW_KEY_NODE_GROUP);
copy_props(impl, PW_KEY_NODE_LATENCY);
copy_props(impl, PW_KEY_NODE_VIRTUAL);
copy_props(impl, PW_KEY_NODE_NETWORK);
impl->capture_frame_size = parse_audio_info(impl->capture_props, &impl->capture_info);
if (impl->capture_frame_size == 0) {
pw_log_error("unsupported capture audio format:%d channels:%d",
impl->capture_info.format, impl->capture_info.channels);
return -EINVAL;
}
switch (impl->info.format) {
case SPA_AUDIO_FORMAT_U8:
impl->frame_size = 1;
break;
case SPA_AUDIO_FORMAT_S16_LE:
case SPA_AUDIO_FORMAT_S16_BE:
case SPA_AUDIO_FORMAT_S16P:
impl->frame_size = 2;
break;
case SPA_AUDIO_FORMAT_S24_LE:
case SPA_AUDIO_FORMAT_S24_BE:
case SPA_AUDIO_FORMAT_S24P:
impl->frame_size = 3;
break;
default:
impl->frame_size = 4;
break;
impl->playback_frame_size = parse_audio_info(impl->playback_props, &impl->playback_info);
if (impl->playback_frame_size == 0) {
pw_log_error("unsupported playback audio format:%d channels:%d",
impl->playback_info.format, impl->playback_info.channels);
return -EINVAL;
}
impl->frame_size *= impl->info.channels;
if (impl->capture_info.rate != 0 &&
pw_properties_get(impl->capture_props, PW_KEY_NODE_RATE) == NULL)
pw_properties_setf(impl->capture_props, PW_KEY_NODE_RATE,
"1/%u", impl->capture_info.rate);
if (impl->playback_info.rate != 0 &&
pw_properties_get(impl->playback_props, PW_KEY_NODE_RATE) == NULL)
pw_properties_setf(impl->playback_props, PW_KEY_NODE_RATE,
"1/%u", impl->playback_info.rate);
if ((str = pw_properties_get(impl->props, "server.address")) == NULL)
str = DEFAULT_SERVER;