From 9b0a880afb76ceb032b1ea4e7640839aafa01462 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 16 Mar 2018 16:55:25 +0100 Subject: [PATCH] simplify things with just 1 process function Make just one process function in spa node. With the io area states we can do more complicated io patterns. --- meson.build | 4 +- spa/include/spa/graph/graph-scheduler2.h | 30 +++++-- spa/include/spa/graph/graph.h | 56 +++++++------ spa/include/spa/node/node.h | 83 +++----------------- spa/plugins/alsa/alsa-sink.c | 20 ++--- spa/plugins/alsa/alsa-utils.c | 2 - spa/plugins/v4l2/v4l2-source.c | 32 +++++--- spa/plugins/v4l2/v4l2-utils.c | 11 +-- spa/tests/test-perf.c | 8 +- spa/tests/test-ringbuffer.c | 8 +- spa/tests/test-v4l2.c | 2 +- src/examples/export-sink.c | 4 +- src/examples/export-source.c | 4 +- src/examples/local-v4l2.c | 4 +- src/modules/module-audio-dsp.c | 65 +-------------- src/modules/module-client-node/client-node.c | 71 ++--------------- src/pipewire/link.c | 42 ++++++++-- src/pipewire/mem.c | 2 +- src/pipewire/node.c | 12 ++- src/pipewire/port.c | 51 ++---------- src/pipewire/stream.c | 50 ++++++------ 21 files changed, 202 insertions(+), 359 deletions(-) diff --git a/meson.build b/meson.build index 9c9210e64..f3ceee9c3 100644 --- a/meson.build +++ b/meson.build @@ -165,8 +165,8 @@ endif subdir('spa') subdir('src') subdir('pkgconfig') -#subdir('pipewire-jack') -#subdir('alsa-plugins') +subdir('pipewire-jack') +subdir('alsa-plugins') if get_option('enable_docs') doxygen = find_program('doxygen', required : false) diff --git a/spa/include/spa/graph/graph-scheduler2.h b/spa/include/spa/graph/graph-scheduler2.h index 6a8134b09..8d67664e0 100644 --- a/spa/include/spa/graph/graph-scheduler2.h +++ b/spa/include/spa/graph/graph-scheduler2.h @@ -136,21 +136,35 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node * return 0; } +static inline void spa_graph_impl_add_graph(struct spa_graph *g, struct spa_list *pending) +{ + struct spa_graph_node *n; + struct spa_graph *sg; + + spa_list_for_each(n, &g->nodes, link) { + n->state->pending = n->state->required + 1; + spa_debug("graph %p node %p: add %d %d status %d", g, n, + n->state->pending, n->state->required, n->state->status); + spa_list_append(pending, &n->sched_link); + } + spa_list_for_each(sg, &g->subgraphs, link) + spa_graph_impl_add_graph(sg, pending); +} + static inline int spa_graph_impl_run(void *data) { struct spa_graph_data *d = (struct spa_graph_data *) data; struct spa_graph *g = d->graph; - struct spa_graph_node *n; + struct spa_graph_node *n, *tmp; + struct spa_list pending; spa_debug("graph %p run", d->graph); - spa_list_for_each(n, &g->nodes, link) { - n->state->pending = n->state->required + 1; - spa_debug("node %p: add %d %d status %d", n, - n->state->pending, n->state->required, - n->state->status); - } - spa_list_for_each(n, &g->nodes, link) + spa_list_init(&pending); + + spa_graph_impl_add_graph(g, &pending); + + spa_list_for_each_safe(n, tmp, &pending, sched_link) spa_graph_trigger(d->graph, n); return 0; diff --git a/spa/include/spa/graph/graph.h b/spa/include/spa/graph/graph.h index 4c34deab5..9f2710acf 100644 --- a/spa/include/spa/graph/graph.h +++ b/spa/include/spa/graph/graph.h @@ -48,6 +48,8 @@ struct spa_graph_callbacks { struct spa_graph { struct spa_list link; /* link for subgraph */ +#define SPA_GRAPH_FLAG_DRIVER (1 << 0) + uint32_t flags; /* flags */ struct spa_graph *parent; /* parent graph or NULL when driver */ struct spa_list nodes; /* list of nodes of this graph */ struct spa_list subgraphs; /* list of subgraphs */ @@ -55,25 +57,6 @@ struct spa_graph { void *callbacks_data; }; -static inline struct spa_graph * spa_graph_find_root(struct spa_graph *graph) -{ - while (graph->parent) - graph = graph->parent; - return graph; -} - -static inline void spa_graph_add_subgraph(struct spa_graph *graph, struct spa_graph *subgraph) -{ - subgraph->parent = graph; - spa_list_append(&graph->subgraphs, &subgraph->link); -} - -static inline void spa_graph_remove_subgraph(struct spa_graph *subgraph) -{ - subgraph->parent = NULL; - spa_list_remove(&subgraph->link); -} - #define spa_graph_need_input(g,n) ((g)->callbacks->need_input((g)->callbacks_data, (n))) #define spa_graph_have_output(g,n) ((g)->callbacks->have_output((g)->callbacks_data, (n))) #define spa_graph_run(g) ((g)->callbacks->run((g)->callbacks_data)) @@ -124,6 +107,8 @@ static inline void spa_graph_init(struct spa_graph *graph) { spa_list_init(&graph->nodes); spa_list_init(&graph->subgraphs); + graph->flags = 0; + spa_debug("graph %p init", graph); } static inline void @@ -135,6 +120,27 @@ spa_graph_set_callbacks(struct spa_graph *graph, graph->callbacks_data = data; } +static inline struct spa_graph *spa_graph_find_root(struct spa_graph *graph) +{ + while (graph->parent) + graph = graph->parent; + return graph; +} + +static inline void spa_graph_add_subgraph(struct spa_graph *graph, struct spa_graph *subgraph) +{ + subgraph->parent = graph; + spa_list_append(&graph->subgraphs, &subgraph->link); + spa_debug("graph %p add subgraph %p", graph, subgraph); +} + +static inline void spa_graph_remove_subgraph(struct spa_graph *subgraph) +{ + subgraph->parent = NULL; + spa_list_remove(&subgraph->link); + spa_debug("graph %p remove subgraph", subgraph); +} + static inline void spa_graph_node_init(struct spa_graph_node *node, struct spa_graph_state *state) { @@ -163,7 +169,7 @@ spa_graph_node_add(struct spa_graph *graph, node->graph = graph; node->sched_link.next = NULL; spa_list_append(&graph->nodes, &node->link); - spa_debug("node %p add", node); + spa_debug("node %p add to graph %p", node, graph); } static inline void @@ -241,15 +247,9 @@ static inline int spa_graph_node_impl_process(void *data, struct spa_graph_node { struct spa_graph *g = node->graph; struct spa_node *n = data; - //int old = node->state->status, res = 0; int res = 0; -// if (old == SPA_STATUS_NEED_BUFFER && n->process_input && - if (n->process_input && - !spa_list_is_empty(&node->ports[SPA_DIRECTION_INPUT])) - res = spa_node_process_input(n); - else - res = spa_node_process_output(n); + res = spa_node_process(n); spa_debug("node %p: process %d", node, res); @@ -258,8 +258,6 @@ static inline int spa_graph_node_impl_process(void *data, struct spa_graph_node if (res == SPA_STATUS_HAVE_BUFFER) spa_graph_have_output(g, node); - spa_debug("node %p: end %d", node, res); - return res; } diff --git a/spa/include/spa/node/node.h b/spa/include/spa/node/node.h index 4d9c68171..c08e49324 100644 --- a/spa/include/spa/node/node.h +++ b/spa/include/spa/node/node.h @@ -479,81 +479,21 @@ struct spa_node { uint32_t port_id, const struct spa_command *command); /** - * Process the input area of the node. + * Process the node * - * For synchronous nodes, this function is called to start processing data - * or when process_output returned SPA_STATUS_NEED_BUFFER + * Output io areas with SPA_STATUS_NEED_BUFFER will recycle the + * buffers if any. * - * For Asynchronous nodes, this function is called when a need_input event - * is received from the node. + * Input areas with SPA_STATUS_HAVE_BUFFER are consumed if possible + * and the status is set to SPA_STATUS_NEED_BUFFER or SPA_STATUS_OK. * - * Before calling this function, you must configure spa_port_io structures - * on the input ports you want to process data on. + * When the node has new output buffers, SPA_STATUS_HAVE_BUFFER + * is returned. * - * The node will loop through all spa_port_io structures and will - * process the buffers. For each port, the port io will be used as: - * - * - if status is set to SPA_STATUS_HAVE_BUFFER, buffer_id is read and processed. - * - * The spa_port_io of the port is then updated as follows. - * - * - buffer_id is set to a buffer id that should be reused. SPA_ID_INVALID - * is set when there is no buffer to reuse - * - * - status is set to SPA_STATUS_OK when no new buffer is needed on the port - * - * - status is set to SPA_STATUS_NEED_BUFFER when a new buffer is needed - * on the port. - * - * - status is set to a negative errno style error code when the buffer_id - * was invalid or any processing error happened on the port. - * - * This function must be called from the data thread. - * - * \param node a spa_node - * \return SPA_STATUS_OK on success or when the node is asynchronous - * SPA_STATUS_HAVE_BUFFER for synchronous nodes when output - * can be consumed. - * < 0 for errno style errors. One or more of the spa_port_io - * areas has an error. + * When no new output could be produced, SPA_STATUS_NEED_BUFFER is + * returned. */ - int (*process_input) (struct spa_node *node); - - /** - * Tell the node that output is consumed. - * - * For synchronous nodes, this function can be called when process_input - * returned SPA_STATUS_HAVE_BUFFER and the output on the spa_port_io - * areas has been consumed. - * - * For Asynchronous node, this function is called when a have_output event - * is received from the node. - * - * Before calling this function you must process the buffers - * in each of the output ports spa_port_io structure as follows: - * - * - use the buffer_id from the io for all the ports where the status is - * SPA_STATUS_HAVE_BUFFER - * - * - set buffer_id to a buffer_id you would like to reuse or SPA_ID_INVALID - * when no buffer is to be reused. - * - * - set the status to SPA_STATUS_NEED_BUFFER for all port you want more - * output from - * - * - set the status to SPA_STATUS_OK for the port you don't want more - * buffers from. - * - * This function must be called from the data thread. - * - * \param node a spa_node - * \return SPA_STATUS_OK on success or when the node is asynchronous - * SPA_STATUS_NEED_BUFFER for synchronous nodes when input - * is needed. - * < 0 for errno style errors. One or more of the spa_port_io - * areas has an error. - */ - int (*process_output) (struct spa_node *node); + int (*process) (struct spa_node *node); }; #define spa_node_enum_params(n,...) (n)->enum_params((n),__VA_ARGS__) @@ -572,8 +512,7 @@ struct spa_node { #define spa_node_port_set_io(n,...) (n)->port_set_io((n),__VA_ARGS__) #define spa_node_port_reuse_buffer(n,...) (n)->port_reuse_buffer((n),__VA_ARGS__) #define spa_node_port_send_command(n,...) (n)->port_send_command((n),__VA_ARGS__) -#define spa_node_process_input(n) (n)->process_input((n)) -#define spa_node_process_output(n) (n)->process_output((n)) +#define spa_node_process(n) (n)->process((n)) #ifdef __cplusplus } /* extern "C" */ diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 0f27d87ce..3a6c8e0dd 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -571,7 +571,7 @@ impl_node_port_send_command(struct spa_node *node, return -ENOTSUP; } -static int impl_node_process_input(struct spa_node *node) +static int impl_node_process(struct spa_node *node) { struct state *this; struct spa_io_buffers *input; @@ -582,13 +582,15 @@ static int impl_node_process_input(struct spa_node *node) input = this->io; spa_return_val_if_fail(input != NULL, -EIO); - spa_log_trace(this->log, NAME " %p: process input %d %d", this, input->status, input->buffer_id); + spa_log_trace(this->log, NAME " %p: process %d %d", this, input->status, input->buffer_id); - if (input->status == SPA_STATUS_HAVE_BUFFER && input->buffer_id < this->n_buffers) { + if (input->status == SPA_STATUS_HAVE_BUFFER && + input->buffer_id < this->n_buffers) { struct buffer *b = &this->buffers[input->buffer_id]; if (!SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) { - spa_log_warn(this->log, NAME " %p: buffer %u in use", this, input->buffer_id); + spa_log_warn(this->log, NAME " %p: buffer %u in use", + this, input->buffer_id); input->status = -EINVAL; return -EINVAL; } @@ -599,17 +601,12 @@ static int impl_node_process_input(struct spa_node *node) input->status = SPA_STATUS_OK; } - return SPA_STATUS_OK; } -static int impl_node_process_output(struct spa_node *node) -{ - return -ENOTSUP; -} - static const struct spa_dict_item node_info_items[] = { { "media.class", "Audio/Sink" }, + { "node.driver", "true" }, }; static const struct spa_dict node_info = { @@ -636,8 +633,7 @@ static const struct spa_node impl_node = { impl_node_port_set_io, impl_node_port_reuse_buffer, impl_node_port_send_command, - impl_node_process_input, - impl_node_process_output, + impl_node_process, }; static int impl_get_interface(struct spa_handle *handle, uint32_t interface_id, void **interface) diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index e8897e1c7..19f597292 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -343,8 +343,6 @@ static inline void try_pull(struct state *state, snd_pcm_uframes_t frames, if (spa_list_is_empty(&state->ready) && do_pull) { spa_log_trace(state->log, "alsa-util %p: %d %lu", state, io->status, state->filled + written); - if (io->status == SPA_STATUS_NEED_BUFFER) - return; io->status = SPA_STATUS_NEED_BUFFER; if (state->range) { state->range->offset = state->sample_count * state->frame_size; diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index 731d92f16..112ea7ae3 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -61,6 +61,7 @@ static void reset_props(struct props *props) #define BUFFER_FLAG_MAPPED (1<<2) struct buffer { + struct spa_list link; struct spa_buffer *outbuf; struct spa_meta_header *h; uint32_t flags; @@ -174,6 +175,7 @@ struct port { struct buffer buffers[MAX_BUFFERS]; uint32_t n_buffers; + struct spa_list queue; struct spa_source source; @@ -810,17 +812,13 @@ static int impl_node_port_send_command(struct spa_node *node, return -ENOTSUP; } -static int impl_node_process_input(struct spa_node *node) -{ - return -ENOTSUP; -} - -static int impl_node_process_output(struct spa_node *node) +static int impl_node_process(struct spa_node *node) { struct impl *this; - int i, res = SPA_STATUS_OK; + int i, res; struct spa_io_buffers *io; struct port *port; + struct buffer *b; spa_return_val_if_fail(node != NULL, -EINVAL); @@ -834,7 +832,9 @@ static int impl_node_process_output(struct spa_node *node) return SPA_STATUS_HAVE_BUFFER; if (io->buffer_id < port->n_buffers) { - res = spa_v4l2_buffer_recycle(this, io->buffer_id); + if ((res = spa_v4l2_buffer_recycle(this, io->buffer_id)) < 0) + return res; + io->buffer_id = SPA_ID_INVALID; } for (i = 0; i < port->n_controls; i++) { @@ -856,7 +856,16 @@ static int impl_node_process_output(struct spa_node *node) control->value = *control->io = c.value; } } - return res; + if (spa_list_is_empty(&port->queue)) + return -EPIPE; + + b = spa_list_first(&port->queue, struct buffer, link); + spa_list_remove(&b->link); + + io->buffer_id = b->outbuf->id; + io->status = SPA_STATUS_HAVE_BUFFER; + + return SPA_STATUS_HAVE_BUFFER; } static const struct spa_dict_item info_items[] = { @@ -889,8 +898,7 @@ static const struct spa_node impl_node = { impl_node_port_set_io, impl_node_port_reuse_buffer, impl_node_port_send_command, - impl_node_process_input, - impl_node_process_output, + impl_node_process, }; static int impl_clock_enum_params(struct spa_clock *clock, uint32_t id, uint32_t *index, @@ -1012,6 +1020,8 @@ impl_init(const struct spa_handle_factory *factory, reset_props(&this->props); + spa_list_init(&port->queue); + port->log = this->log; port->info.flags = SPA_PORT_INFO_FLAG_LIVE | SPA_PORT_INFO_FLAG_PHYSICAL | diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 56e2c9bb9..7a0803920 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -118,6 +118,7 @@ static int spa_v4l2_buffer_recycle(struct impl *this, uint32_t buffer_id) spa_log_error(port->log, "VIDIOC_QBUF: %m"); return -err; } + return 0; } @@ -1145,7 +1146,6 @@ static int mmap_read(struct impl *this) struct buffer *b; struct spa_data *d; int64_t pts; - struct spa_io_buffers *io = port->io; spa_zero(buf); buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; @@ -1178,10 +1178,9 @@ static int mmap_read(struct impl *this) d[0].chunk->stride = port->fmt.fmt.pix.bytesperline; SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUTSTANDING); - io->buffer_id = b->outbuf->id; - io->status = SPA_STATUS_HAVE_BUFFER; + spa_list_append(&port->queue, &b->link); - spa_log_trace(port->log, "v4l2 %p: have output %d", this, io->buffer_id); + spa_log_trace(port->log, "v4l2 %p: have output %d", this, buf.index); this->callbacks->have_output(this->callbacks_data); return 0; @@ -1499,10 +1498,12 @@ static int spa_v4l2_stream_off(struct impl *this) struct buffer *b; b = &port->buffers[i]; - if (!SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUTSTANDING)) + if (!SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUTSTANDING)) { if (xioctl(port->fd, VIDIOC_QBUF, &b->v4l2_buffer) < 0) spa_log_warn(this->log, "VIDIOC_QBUF: %s", strerror(errno)); + } } + spa_list_init(&port->queue); port->started = false; return 0; diff --git a/spa/tests/test-perf.c b/spa/tests/test-perf.c index bc546270a..fa0174aa4 100644 --- a/spa/tests/test-perf.c +++ b/spa/tests/test-perf.c @@ -229,8 +229,8 @@ static void on_sink_pull(struct data *data) { spa_log_trace(data->log, "do sink pull"); if (data->mode & MODE_DIRECT) { - spa_node_process_output(data->source); - spa_node_process_input(data->sink); + spa_node_process(data->source); + spa_node_process(data->sink); } else { spa_graph_need_input(&data->graph, &data->sink_node); } @@ -240,8 +240,8 @@ static void on_source_push(struct data *data) { spa_log_trace(data->log, "do source push"); if (data->mode & MODE_DIRECT) { - spa_node_process_output(data->source); - spa_node_process_input(data->sink); + spa_node_process(data->source); + spa_node_process(data->sink); } else { spa_graph_have_output(&data->graph, &data->source_node); } diff --git a/spa/tests/test-ringbuffer.c b/spa/tests/test-ringbuffer.c index 8ba40bb45..30d911588 100644 --- a/spa/tests/test-ringbuffer.c +++ b/spa/tests/test-ringbuffer.c @@ -219,12 +219,12 @@ static void on_sink_need_input(void *_data) struct data *data = _data; int res; - res = spa_node_process_output(data->source); + res = spa_node_process(data->source); if (res != SPA_STATUS_HAVE_BUFFER) - printf("got process_output error from source %d\n", res); + printf("got process error from source %d\n", res); - if ((res = spa_node_process_input(data->sink)) < 0) - printf("got process_input error from sink %d\n", res); + if ((res = spa_node_process(data->sink)) < 0) + printf("got process error from sink %d\n", res); } static void diff --git a/spa/tests/test-v4l2.c b/spa/tests/test-v4l2.c index d7e8bdabe..5cba59be7 100644 --- a/spa/tests/test-v4l2.c +++ b/spa/tests/test-v4l2.c @@ -276,7 +276,7 @@ static void on_source_have_output(void *_data) io->status = SPA_STATUS_NEED_BUFFER; - if ((res = spa_node_process_output(data->source)) < 0) + if ((res = spa_node_process(data->source)) < 0) printf("got pull error %d\n", res); } diff --git a/src/examples/export-sink.c b/src/examples/export-sink.c index db482df2f..6afc1e118 100644 --- a/src/examples/export-sink.c +++ b/src/examples/export-sink.c @@ -539,7 +539,7 @@ static int do_render(struct spa_loop *loop, bool async, uint32_t seq, return 0; } -static int impl_node_process_input(struct spa_node *node) +static int impl_node_process(struct spa_node *node) { struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); struct spa_buffer *buf; @@ -574,7 +574,7 @@ static const struct spa_node impl_node = { .port_enum_params = impl_port_enum_params, .port_set_param = impl_port_set_param, .port_use_buffers = impl_port_use_buffers, - .process_input = impl_node_process_input, + .process = impl_node_process, }; static void make_node(struct data *data) diff --git a/src/examples/export-source.c b/src/examples/export-source.c index 5dc20bb75..eb2c4c370 100644 --- a/src/examples/export-source.c +++ b/src/examples/export-source.c @@ -467,7 +467,7 @@ static void fill_s16(struct data *d, void *dest, int avail) } } -static int impl_node_process_output(struct spa_node *node) +static int impl_node_process(struct spa_node *node) { struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); struct buffer *b; @@ -529,7 +529,7 @@ static const struct spa_node impl_node = { .port_set_param = impl_port_set_param, .port_use_buffers = impl_port_use_buffers, .port_reuse_buffer = impl_port_reuse_buffer, - .process_output = impl_node_process_output, + .process = impl_node_process, }; static void make_node(struct data *data) diff --git a/src/examples/local-v4l2.c b/src/examples/local-v4l2.c index b071af43a..387d2a356 100644 --- a/src/examples/local-v4l2.c +++ b/src/examples/local-v4l2.c @@ -433,7 +433,7 @@ static int do_render(struct spa_loop *loop, bool async, uint32_t seq, return 0; } -static int impl_node_process_input(struct spa_node *node) +static int impl_node_process(struct spa_node *node) { struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); int res; @@ -461,7 +461,7 @@ static const struct spa_node impl_node = { .port_enum_params = impl_port_enum_params, .port_set_param = impl_port_set_param, .port_use_buffers = impl_port_use_buffers, - .process_input = impl_node_process_input, + .process = impl_node_process, }; static void make_nodes(struct data *data) diff --git a/src/modules/module-audio-dsp.c b/src/modules/module-audio-dsp.c index baa7b4430..1c8eec6a9 100644 --- a/src/modules/module-audio-dsp.c +++ b/src/modules/module-audio-dsp.c @@ -279,7 +279,7 @@ static void dequeue_buffer(struct node *n, struct buffer *b) SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); } -static int node_process_input(struct spa_node *node) +static int node_process(struct spa_node *node) { struct node *n = SPA_CONTAINER_OF(node, struct node, node_impl); struct pw_node *this = n->node; @@ -311,43 +311,6 @@ static int node_process_input(struct spa_node *node) return outio->status; } -static int node_process_output(struct spa_node *node) -{ - struct node *n = SPA_CONTAINER_OF(node, struct node, node_impl); - struct pw_node *this = n->node; - struct port *outp = GET_OUT_PORT(n, 0); - struct spa_io_buffers *outio = outp->io; - int i; - - pw_log_trace(NAME " %p: process output %d", this, outio->status); - - if (outio->status == SPA_STATUS_HAVE_BUFFER) - return SPA_STATUS_HAVE_BUFFER; - - if (outio->buffer_id < outp->n_buffers) { - recycle_buffer(n, outp, outio->buffer_id); - outio->buffer_id = SPA_ID_INVALID; - } - if (n->n_in_ports == 0) { - outio->buffer_id = SPA_ID_INVALID; - outio->status = SPA_STATUS_HAVE_BUFFER; - } - else { - for (i = 0; i < n->n_in_ports; i++) { - struct port *inp = GET_IN_PORT(n, i); - struct spa_io_buffers *inio = inp->io; - - if (inio == NULL) - continue; - - inio->status = SPA_STATUS_NEED_BUFFER; - pw_log_trace(NAME " %p: port %d %d", this, i, inio->buffer_id); - } - outio->status = SPA_STATUS_NEED_BUFFER; - } - return outio->status; -} - static int port_set_io(struct spa_node *node, enum spa_direction direction, uint32_t port_id, uint32_t id, void *data, size_t size) @@ -623,12 +586,11 @@ static const struct spa_node node_impl = { .port_set_io = port_set_io, .port_reuse_buffer = port_reuse_buffer, .port_send_command = port_send_command, - .process_input = node_process_input, - .process_output = node_process_output, + .process = node_process, }; -static int schedule_mix_input(struct spa_node *_node) +static int schedule_mix(struct spa_node *_node) { struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node); struct port *p = port->owner_data; @@ -678,24 +640,6 @@ static int schedule_mix_input(struct spa_node *_node) return SPA_STATUS_HAVE_BUFFER; } -static int schedule_mix_output(struct spa_node *_node) -{ - struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node); - struct spa_graph_node *node = &port->rt.mix_node; - struct spa_graph_port *gp; - struct spa_io_buffers *io = port->rt.mix_port.io; - - pw_log_trace("port %p", port); - - spa_list_for_each(gp, &node->ports[SPA_DIRECTION_INPUT], link) { - pw_log_trace("port %p: port %d %d %p->%p %d %d", port, - gp->port_id, gp->flags, io, gp->io, io->status, io->buffer_id); - gp->io->status = io->status; - } - io->status = SPA_STATUS_HAVE_BUFFER; - return io->status; -} - static int schedule_mix_use_buffers(struct spa_node *_node, enum spa_direction direction, uint32_t port_id, @@ -720,8 +664,7 @@ static const struct spa_node schedule_mix_node = { SPA_VERSION_NODE, NULL, .port_use_buffers = schedule_mix_use_buffers, - .process_input = schedule_mix_input, - .process_output = schedule_mix_output, + .process = schedule_mix, }; static void port_free(void *data) diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 01c22aae9..b9fdfdfdf 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -842,63 +842,18 @@ impl_node_port_send_command(struct spa_node *node, return 0; } -static int impl_node_process_input(struct spa_node *node) +static int impl_node_process(struct spa_node *node) { struct node *this = SPA_CONTAINER_OF(node, struct node, node); struct impl *impl = this->impl; - struct spa_graph_node *n = &impl->this.node->rt.node; - bool client_reuse = impl->client_reuse; - struct spa_graph_port *p, *pp; int res; - if (impl->input_ready == 0) { - /* the client is not ready to receive our buffers, recycle them */ - pw_log_trace("node not ready, recycle buffers"); - spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) - p->io->status = SPA_STATUS_NEED_BUFFER; - res = SPA_STATUS_NEED_BUFFER; - } - else { - spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { - struct spa_io_buffers *io = p->io; - - pw_log_trace("set io status to %d %d", io->status, io->buffer_id); - - /* explicitly recycle buffers when the client is not going to do it */ - if (!client_reuse && (pp = p->peer)) - spa_graph_node_reuse_buffer(pp->node, - pp->port_id, io->buffer_id); - } - pw_log_trace("client-node %p: send process input", this); - pw_client_node_transport_add_message(impl->transport, - &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT)); - do_flush(this); - - impl->input_ready--; - res = SPA_STATUS_OK; - } - return res; -} - -static int impl_node_process_output(struct spa_node *node) -{ - struct node *this; - struct impl *impl; - - this = SPA_CONTAINER_OF(node, struct node, node); - impl = this->impl; - - pw_log_trace("client-node %p: process output %d", this, impl->out_pending); - if (impl->out_pending) - return SPA_STATUS_OK; - - impl->out_pending = true; - + pw_log_trace("client-node %p: send process input", this); pw_client_node_transport_add_message(impl->transport, - &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT)); + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT)); do_flush(this); - - return SPA_STATUS_OK; + res = SPA_STATUS_OK; + return res; } static int handle_node_message(struct node *this, struct pw_client_node_message *message) @@ -1082,8 +1037,7 @@ static const struct spa_node impl_node = { impl_node_port_set_io, impl_node_port_reuse_buffer, impl_node_port_send_command, - impl_node_process_input, - impl_node_process_output, + impl_node_process, }; static int @@ -1285,7 +1239,7 @@ static int mix_port_set_io(struct spa_node *node, id, data, size); } -static int mix_port_process_input(struct spa_node *data) +static int mix_port_process(struct spa_node *data) { struct pw_port *p = SPA_CONTAINER_OF(data, struct pw_port, mix_node); struct spa_io_buffers *io = p->rt.mix_port.io; @@ -1293,22 +1247,13 @@ static int mix_port_process_input(struct spa_node *data) return SPA_STATUS_HAVE_BUFFER; } -static int mix_port_process_output(struct spa_node *data) -{ - struct pw_port *p = SPA_CONTAINER_OF(data, struct pw_port, mix_node); - struct spa_io_buffers *io = p->rt.mix_port.io; - pw_log_trace("client-node %p: pass %d %d", data, io->status, io->buffer_id); - return SPA_STATUS_NEED_BUFFER; -} - static void node_port_added(void *data, struct pw_port *port) { struct impl *impl = data; pw_log_debug("client-node %p: port %p added", &impl->this, port); port->mix_node.port_set_io = mix_port_set_io; - port->mix_node.process_input = mix_port_process_input; - port->mix_node.process_output = mix_port_process_output; + port->mix_node.process = mix_port_process; port->implementation = &port_impl; port->implementation_data = impl; diff --git a/src/pipewire/link.c b/src/pipewire/link.c index 2c59a062f..d2508e298 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -1154,6 +1154,30 @@ static const struct pw_node_events output_node_events = { .async_complete = output_node_async_complete, }; +static int +do_join_graphs(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct pw_link *this = user_data; + struct spa_graph *in_graph, *out_graph; + struct spa_graph *in_root, *out_root; + + in_graph = this->input->node->rt.node.graph; + out_graph = this->output->node->rt.node.graph; + + in_root = spa_graph_find_root(in_graph); + out_root = spa_graph_find_root(out_graph); + + if (out_root == in_root) + return 0; + + if (SPA_FLAG_CHECK(in_root->flags, SPA_GRAPH_FLAG_DRIVER)) + spa_graph_add_subgraph(in_root, out_root); + else + spa_graph_add_subgraph(out_root, in_root); + return 0; +} + struct pw_link *pw_link_new(struct pw_core *core, struct pw_port *output, struct pw_port *input, @@ -1166,6 +1190,7 @@ struct pw_link *pw_link_new(struct pw_core *core, struct pw_link *this; struct pw_node *input_node, *output_node; struct spa_graph *in_graph, *out_graph; + struct spa_graph *in_root, *out_root; if (output == input) goto same_ports; @@ -1179,7 +1204,16 @@ struct pw_link *pw_link_new(struct pw_core *core, in_graph = input_node->rt.node.graph; out_graph = output_node->rt.node.graph; - if (in_graph != NULL && out_graph != NULL && in_graph != out_graph) + pw_log_debug("link new %p %p", in_graph, out_graph); + + in_root = spa_graph_find_root(in_graph); + out_root = spa_graph_find_root(out_graph); + + pw_log_debug("link new %p %p", in_root, out_root); + + if (SPA_FLAG_CHECK(in_root->flags, SPA_GRAPH_FLAG_DRIVER) && + SPA_FLAG_CHECK(out_root->flags, SPA_GRAPH_FLAG_DRIVER) && + in_root != out_root) goto link_not_supported; impl = calloc(1, sizeof(struct impl) + user_data_size); @@ -1243,10 +1277,8 @@ struct pw_link *pw_link_new(struct pw_core *core, output_node, output->port_id, this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id, input_node, input->port_id, this->rt.mix[SPA_DIRECTION_INPUT].port.port_id); - if (out_graph != NULL) - pw_node_join_graph(input_node, out_graph); - else if (in_graph != NULL) - pw_node_join_graph(output_node, in_graph); + pw_loop_invoke(output->node->data_loop, + do_join_graphs, SPA_ID_INVALID, NULL, 0, false, this); spa_hook_list_call(&output->listener_list, struct pw_port_events, link_added, this); spa_hook_list_call(&input->listener_list, struct pw_port_events, link_added, this); diff --git a/src/pipewire/mem.c b/src/pipewire/mem.c index 80e6886b2..94cdfa15f 100644 --- a/src/pipewire/mem.c +++ b/src/pipewire/mem.c @@ -257,7 +257,7 @@ void pw_memblock_free(struct pw_memblock *mem) if (mem == NULL) return; - pw_log_debug("mem %p: free", mem); + pw_log_debug("mem %p: free %p %d", mem, mem->ptr, mem->fd); if (mem->flags & PW_MEMBLOCK_FLAG_WITH_FD) { if (mem->ptr) munmap(mem->ptr, mem->size); diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 8b17cc388..f71b009ce 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -392,6 +392,11 @@ static void check_properties(struct pw_node *node) node->driver = pw_properties_parse_bool(str); else node->driver = false; + + if (node->driver) + SPA_FLAG_SET(impl->graph.flags, SPA_GRAPH_FLAG_DRIVER); + else + SPA_FLAG_UNSET(impl->graph.flags, SPA_GRAPH_FLAG_DRIVER); } static int @@ -458,7 +463,7 @@ struct pw_node *pw_node_new(struct pw_core *core, this->rt.activation = &impl->activation; spa_graph_node_init(&this->rt.node, &this->rt.activation->state); - pw_loop_invoke(this->data_loop, do_node_join, 1, NULL, 0, false, this); + pw_loop_invoke(this->data_loop, do_node_join, 1, NULL, 0, true, this); spa_list_init(&this->rt.links[SPA_DIRECTION_INPUT]); spa_list_init(&this->rt.links[SPA_DIRECTION_OUTPUT]); @@ -628,7 +633,12 @@ do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct pw_node *this = user_data; + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); + + if (impl->graph.parent) + spa_graph_remove_subgraph(&impl->graph); spa_graph_node_remove(&this->rt.node); + return 0; } diff --git a/src/pipewire/port.c b/src/pipewire/port.c index 1eb8d4ce1..451922f6e 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -64,23 +64,8 @@ static int schedule_tee_input(struct spa_node *data) *p->io = *io; } } - return io->status; -} -static int schedule_tee_output(struct spa_node *data) -{ - struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node); - struct spa_graph_node *node = &this->rt.mix_node; - struct spa_graph_port *p; - struct spa_io_buffers *io = this->rt.mix_port.io; - - spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { - pw_log_trace("port %p: port %d %d %p->%p %d %d", - this, p->port_id, p->flags, p->io, io, - p->io->status, p->io->buffer_id); - *io = *p->io; - } - pw_log_trace("port %p: tee output %d %d", this, io->status, io->buffer_id); - return io->status; + io->status = SPA_STATUS_NEED_BUFFER; + return SPA_STATUS_HAVE_BUFFER; } static int schedule_tee_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id) @@ -98,8 +83,7 @@ static int schedule_tee_reuse_buffer(struct spa_node *data, uint32_t port_id, ui static const struct spa_node schedule_tee_node = { SPA_VERSION_NODE, NULL, - .process_input = schedule_tee_input, - .process_output = schedule_tee_output, + .process = schedule_tee_input, .port_reuse_buffer = schedule_tee_reuse_buffer, }; @@ -119,24 +103,6 @@ static int schedule_mix_input(struct spa_node *data) return io->status; } -static int schedule_mix_output(struct spa_node *data) -{ - struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node); - struct spa_graph_node *node = &this->rt.mix_node; - struct spa_graph_port *p; - struct spa_io_buffers *io = this->rt.mix_port.io; - - if (!spa_list_is_empty(&node->ports[SPA_DIRECTION_INPUT])) { - spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { - pw_log_trace("port %p: port %d %d %p->%p", this, - p->port_id, p->flags, io, p->io); - *p->io = *io; - } - } - pw_log_trace("port %p: output %d %d", this, io->status, io->buffer_id); - return io->status; -} - static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id) { struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node); @@ -155,8 +121,7 @@ static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, ui static const struct spa_node schedule_mix_node = { SPA_VERSION_NODE, NULL, - .process_input = schedule_mix_input, - .process_output = schedule_mix_output, + .process = schedule_mix_input, .port_reuse_buffer = schedule_mix_reuse_buffer, }; @@ -335,9 +300,7 @@ static int do_add_port(struct spa_loop *loop, spa_graph_port_add(&this->node->rt.node, &this->rt.port); spa_graph_port_add(&this->rt.mix_node, &this->rt.mix_port); spa_graph_port_link(&this->rt.port, &this->rt.mix_port); - - if (this->rt.mix_node.graph) - spa_graph_node_add(this->rt.mix_node.graph, &this->rt.mix_node); + spa_graph_node_add(this->node->rt.node.graph, &this->rt.mix_node); return 0; } @@ -549,9 +512,7 @@ static int do_remove_port(struct spa_loop *loop, spa_graph_port_remove(p); spa_graph_port_remove(&this->rt.mix_port); - if (this->rt.mix_node.graph) - spa_graph_node_remove(&this->rt.mix_node); - this->rt.mix_node.graph = NULL; + spa_graph_node_remove(&this->rt.mix_node); return 0; } diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index da02bb146..ac04d5689 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -612,41 +612,37 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod struct buffer *b; uint32_t buffer_id; - buffer_id = io->buffer_id; + if (impl->direction == SPA_DIRECTION_INPUT) { + buffer_id = io->buffer_id; - pw_log_trace("stream %p: process input %d %d", stream, io->status, + pw_log_trace("stream %p: process input %d %d", stream, io->status, buffer_id); - if ((b = find_buffer(stream, buffer_id)) == NULL) - return; + if ((b = find_buffer(stream, buffer_id)) == NULL) + return; - if (impl->client_reuse) + if (impl->client_reuse) + io->buffer_id = SPA_ID_INVALID; + + if (io->status == SPA_STATUS_HAVE_BUFFER) { + SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); + + impl->in_new_buffer = true; + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + new_buffer, buffer_id); + impl->in_new_buffer = false; + } + io->status = SPA_STATUS_NEED_BUFFER; + } else { + reuse_buffer(stream, io->buffer_id); io->buffer_id = SPA_ID_INVALID; - if (io->status == SPA_STATUS_HAVE_BUFFER) { - SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); - - impl->in_new_buffer = true; + pw_log_trace("stream %p: process output", stream); + impl->in_need_buffer = true; spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - new_buffer, buffer_id); - impl->in_new_buffer = false; + need_buffer); + impl->in_need_buffer = false; } - - io->status = SPA_STATUS_NEED_BUFFER; - send_need_input(stream); - break; - } - case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT: - { - struct spa_io_buffers *io = impl->io; - - reuse_buffer(stream, io->buffer_id); - io->buffer_id = SPA_ID_INVALID; - - pw_log_trace("stream %p: process output", stream); - impl->in_need_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, need_buffer); - impl->in_need_buffer = false; break; } case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER: