impl-node: implement async scheduling

When node.async is set, make the node async.

Advertize SPA_IO_AsyncBuffers on mixer ports when supported. Set a new
port flag when AsyncBuffer is supported on the port.

When making a link and if one of the nodes is async and the linked ports
support AsyncBuffer, make the link async and set this as a property on
the link. For async nodes we will use SPA_IO_AsyncBuffers on the mixer
ports.

Nodes that are async will not increment the peer required counters. This
ensures that the peer can start immediately before the async node is
ready.

On an async link, writers will write to the (cycle+1 & 1) async buffers
entry and readers will read from (cycle & 1). This makes the readers read
from the previously filled area.

We need to have two very controlled areas with specific rules for who
reads and who writes where because the two nodes will run concurrently
and no special synchronization is possible otherwise.

These async nodes can be paused and blocked without blocking or xrunning
the rest of graph. If the node didn't produce anything when the next
cycle starts, the graph will run with silence.

See #3509
This commit is contained in:
Wim Taymans 2024-04-09 17:45:34 +02:00
parent e8ac4e6a34
commit 68916e062b
11 changed files with 324 additions and 112 deletions

View file

@ -228,7 +228,8 @@ struct mix {
struct port *port;
struct port *peer_port;
struct spa_io_buffers *io;
struct spa_io_buffers *io[2];
struct spa_io_buffers *io_data;
struct buffer buffers[MAX_BUFFERS];
uint32_t n_buffers;
@ -254,7 +255,7 @@ struct port {
#define N_PORT_PARAMS 5
struct spa_param_info params[N_PORT_PARAMS];
struct spa_io_buffers io;
struct spa_io_buffers io[2];
struct spa_list mix;
uint32_t n_mix;
struct mix *global_mix;
@ -455,6 +456,7 @@ struct client {
uint32_t max_ports;
unsigned int fill_aliases:1;
unsigned int writable_input:1;
unsigned int async:1;
uint32_t max_frames;
@ -564,6 +566,7 @@ static inline jack_port_t *object_to_port(struct object *o)
struct io_info {
struct mix *mix;
void *data;
size_t size;
};
static int
@ -572,20 +575,36 @@ do_mix_set_io(struct spa_loop *loop, bool async, uint32_t seq,
{
const struct io_info *info = data;
struct port *port = info->mix->port;
info->mix->io = info->data;
if (info->mix->io) {
if (port->n_mix++ == 0 && port->global_mix != NULL)
port->global_mix->io = &port->io;
info->mix->io_data = info->data;
if (info->mix->io_data) {
if (info->size >= sizeof(struct spa_io_async_buffers)) {
info->mix->io[0] = &info->mix->io_data[0];
info->mix->io[1] = &info->mix->io_data[1];
} else if (info->size >= sizeof(struct spa_io_buffers)) {
info->mix->io[0] = &info->mix->io_data[0];
info->mix->io[1] = &info->mix->io_data[0];
} else {
info->mix->io[0] = NULL;
info->mix->io[1] = NULL;
}
if (port->n_mix++ == 0 && port->global_mix != NULL) {
port->global_mix->io_data = port->io;
port->global_mix->io[0] = &port->io[0];
port->global_mix->io[1] = &port->io[1];
}
} else {
if (--port->n_mix == 0 && port->global_mix != NULL)
port->global_mix->io = NULL;
if (--port->n_mix == 0 && port->global_mix != NULL) {
port->global_mix->io_data = NULL;
port->global_mix->io[0] = NULL;
port->global_mix->io[1] = NULL;
}
}
return 0;
}
static inline void mix_set_io(struct mix *mix, void *data)
static inline void mix_set_io(struct mix *mix, void *data, size_t size)
{
struct io_info info = { .mix = mix, .data = data };
struct io_info info = { .mix = mix, .data = data, .size = size };
pw_data_loop_invoke(mix->port->client->loop,
do_mix_set_io, SPA_ID_INVALID, &info, sizeof(info), false, NULL);
}
@ -597,13 +616,14 @@ static void init_mix(struct mix *mix, uint32_t mix_id, struct port *port, uint32
mix->peer_id = peer_id;
mix->port = port;
mix->peer_port = NULL;
mix->io = NULL;
mix->io_data = NULL;
mix->io[0] = mix->io[1] = NULL;
mix->n_buffers = 0;
spa_list_init(&mix->queue);
if (mix_id == SPA_ID_INVALID) {
port->global_mix = mix;
if (port->n_mix > 0)
mix_set_io(port->global_mix, &port->io);
mix_set_io(port->global_mix, &port->io, sizeof(port->io));
}
}
static struct mix *find_mix_peer(struct client *c, uint32_t peer_id)
@ -1492,6 +1512,7 @@ static inline void *get_buffer_output(struct port *p, uint32_t frames, uint32_t
struct buffer *b;
struct spa_data *d;
struct spa_io_buffers *io;
uint32_t cycle = (p->client->rt.position->clock.cycle + 1) & 1;
if (frames == 0 || !p->valid)
return NULL;
@ -1503,7 +1524,7 @@ static inline void *get_buffer_output(struct port *p, uint32_t frames, uint32_t
c, p->object->port.name, p->port_id, frames,
mix->n_buffers, mix->io);
if (SPA_UNLIKELY((io = mix->io) == NULL || mix->n_buffers == 0))
if (SPA_UNLIKELY((io = mix->io[cycle]) == NULL || mix->n_buffers == 0))
return NULL;
if (io->status == SPA_STATUS_HAVE_DATA &&
@ -1579,16 +1600,17 @@ static void prepare_output(struct port *p, uint32_t frames)
{
struct mix *mix;
struct spa_io_buffers *io;
uint32_t cycle = (p->client->rt.position->clock.cycle + 1) & 1;
if (SPA_UNLIKELY(p->empty_out || p->tied))
process_empty(p, frames);
if (p->global_mix == NULL || (io = p->global_mix->io) == NULL)
if (p->global_mix == NULL || (io = p->global_mix->io[cycle]) == NULL)
return;
spa_list_for_each(mix, &p->mix, port_link) {
if (SPA_LIKELY(mix->io != NULL))
*mix->io = *io;
*mix->io[cycle] = *io;
}
}
@ -1597,6 +1619,7 @@ static void complete_process(struct client *c, uint32_t frames)
struct port *p;
struct mix *mix;
union pw_map_item *item;
uint32_t cycle = (c->rt.position->clock.cycle + 1) & 1;
pw_array_for_each(item, &c->ports[SPA_DIRECTION_OUTPUT].items) {
if (pw_map_item_is_free(item))
@ -1605,7 +1628,7 @@ static void complete_process(struct client *c, uint32_t frames)
if (!p->valid)
continue;
prepare_output(p, frames);
p->io.status = SPA_STATUS_NEED_DATA;
p->io[cycle].status = SPA_STATUS_NEED_DATA;
}
pw_array_for_each(item, &c->ports[SPA_DIRECTION_INPUT].items) {
if (pw_map_item_is_free(item))
@ -1614,8 +1637,8 @@ static void complete_process(struct client *c, uint32_t frames)
if (!p->valid)
continue;
spa_list_for_each(mix, &p->mix, port_link) {
if (SPA_LIKELY(mix->io != NULL))
mix->io->status = SPA_STATUS_NEED_DATA;
if (SPA_LIKELY(mix->io[cycle] != NULL))
mix->io[cycle]->status = SPA_STATUS_NEED_DATA;
}
}
}
@ -1880,6 +1903,9 @@ static inline void signal_sync(struct client *c)
activation->status = PW_NODE_ACTIVATION_FINISHED;
activation->finish_time = nsec;
if (c->async)
return;
cmd = 1;
spa_list_for_each(l, &c->rt.target_links, target_link) {
struct pw_node_activation_state *state;
@ -2329,6 +2355,15 @@ static int param_io(struct client *c, struct port *p,
return 1;
}
static int param_io_async(struct client *c, struct port *p,
struct spa_pod **param, struct spa_pod_builder *b)
{
*param = spa_pod_builder_add_object(b,
SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_AsyncBuffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_async_buffers)));
return 1;
}
static int param_latency(struct client *c, struct port *p,
struct spa_pod **param, struct spa_pod_builder *b)
{
@ -2349,7 +2384,7 @@ static int param_latency_other(struct client *c, struct port *p,
static int port_set_format(struct client *c, struct port *p,
uint32_t flags, const struct spa_pod *param)
{
struct spa_pod *params[6];
struct spa_pod *params[7];
uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
@ -2410,8 +2445,9 @@ static int port_set_format(struct client *c, struct port *p,
param_format(c, p, &params[1], &b);
param_buffers(c, p, &params[2], &b);
param_io(c, p, &params[3], &b);
param_latency(c, p, &params[4], &b);
param_latency_other(c, p, &params[5], &b);
param_io_async(c, p, &params[4], &b);
param_latency(c, p, &params[5], &b);
param_latency_other(c, p, &params[6], &b);
pw_client_node_port_update(c->node,
p->direction,
@ -2429,7 +2465,7 @@ static int port_set_format(struct client *c, struct port *p,
static void port_update_latency(struct port *p)
{
struct client *c = p->client;
struct spa_pod *params[6];
struct spa_pod *params[7];
uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
@ -2437,8 +2473,9 @@ static void port_update_latency(struct port *p)
param_format(c, p, &params[1], &b);
param_buffers(c, p, &params[2], &b);
param_io(c, p, &params[3], &b);
param_latency(c, p, &params[4], &b);
param_latency_other(c, p, &params[5], &b);
param_io_async(c, p, &params[4], &b);
param_latency(c, p, &params[5], &b);
param_latency_other(c, p, &params[6], &b);
pw_log_info("port %s: update", p->object->port.name);
@ -2825,8 +2862,8 @@ static int client_node_port_set_io(void *data,
if (mem_id == SPA_ID_INVALID) {
mm = ptr = NULL;
}
else {
size = 0;
} else {
mm = pw_mempool_map_id(c->pool, mem_id,
PW_MEMMAP_FLAG_READWRITE, offset, size, tag);
if (mm == NULL) {
@ -2842,7 +2879,8 @@ static int client_node_port_set_io(void *data,
switch (id) {
case SPA_IO_Buffers:
mix_set_io(mix, ptr);
case SPA_IO_AsyncBuffers:
mix_set_io(mix, ptr, size);
if (old != NULL) {
old->tag[0] = SPA_ID_INVALID;
pw_core_set_paused(c->core, true);
@ -4051,6 +4089,7 @@ jack_client_t * jack_client_open (const char *client_name,
client->max_ports = pw_properties_get_uint32(client->props, "jack.max-client-ports", MAX_CLIENT_PORTS);
client->fill_aliases = pw_properties_get_bool(client->props, "jack.fill-aliases", false);
client->writable_input = pw_properties_get_bool(client->props, "jack.writable-input", true);
client->async = pw_properties_get_bool(client->props, PW_KEY_NODE_ASYNC, false);
client->self_connect_mode = SELF_CONNECT_ALLOW;
if ((str = pw_properties_get(client->props, "jack.self-connect-mode")) != NULL) {
@ -4986,7 +5025,7 @@ jack_port_t * jack_port_register (jack_client_t *client,
jack_port_type_id_t type_id;
uint8_t buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
struct spa_pod *params[6];
struct spa_pod *params[7];
uint32_t n_params = 0;
struct port *p;
int res, len;
@ -5098,6 +5137,7 @@ jack_port_t * jack_port_register (jack_client_t *client,
param_enum_format(c, p, &params[n_params++], &b);
param_buffers(c, p, &params[n_params++], &b);
param_io(c, p, &params[n_params++], &b);
param_io_async(c, p, &params[n_params++], &b);
param_latency(c, p, &params[n_params++], &b);
param_latency_other(c, p, &params[n_params++], &b);
@ -5204,14 +5244,15 @@ done:
return res;
}
static struct buffer *get_mix_buffer(struct mix *mix, jack_nframes_t frames)
static struct buffer *get_mix_buffer(struct port *p, struct mix *mix, jack_nframes_t frames)
{
struct spa_io_buffers *io;
uint32_t cycle = p->client->rt.position->clock.cycle & 1;
if (mix->peer_port != NULL)
prepare_output(mix->peer_port, frames);
io = mix->io;
io = mix->io[cycle];
if (io == NULL ||
io->status != SPA_STATUS_HAVE_DATA ||
io->buffer_id >= mix->n_buffers)
@ -5249,7 +5290,7 @@ static void *get_buffer_input_float(struct port *p, jack_nframes_t frames)
pw_log_trace_fp("%p: port %s mix %d.%d get buffer %d",
p->client, p->object->port.name, p->port_id, mix->id, frames);
if ((b = get_mix_buffer(mix, frames)) == NULL)
if ((b = get_mix_buffer(p, mix, frames)) == NULL)
continue;
if ((np = get_buffer_data(b, frames)) == NULL)
@ -5293,7 +5334,7 @@ static void *get_buffer_input_midi(struct port *p, jack_nframes_t frames)
pw_log_trace_fp("%p: port %p mix %d.%d get buffer %d",
p->client, p, p->port_id, mix->id, frames);
if ((b = get_mix_buffer(mix, frames)) == NULL)
if ((b = get_mix_buffer(p, mix, frames)) == NULL)
continue;
d = &b->datas[0];
@ -5374,7 +5415,7 @@ void * jack_port_get_buffer (jack_port_t *port, jack_nframes_t frames)
pw_log_trace("peer mix: %p %d", mix, mix->peer_id);
if ((b = get_mix_buffer(mix, frames)) == NULL)
if ((b = get_mix_buffer(p, mix, frames)) == NULL)
goto done;
if (o->port.type_id == TYPE_ID_MIDI) {

View file

@ -65,7 +65,7 @@ struct port {
struct port_props props;
struct spa_io_buffers *io;
struct spa_io_buffers *io[2];
uint64_t info_all;
struct spa_port_info info;
@ -100,6 +100,8 @@ struct impl {
struct spa_node_info info;
struct spa_param_info params[8];
struct spa_io_position *position;
struct spa_hook_list hooks;
uint32_t port_count;
@ -146,7 +148,16 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags,
static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
{
return -ENOTSUP;
struct impl *this = object;
switch (id) {
case SPA_IO_Position:
this->position = data;
break;
default:
return -ENOTSUP;
}
return 0;
}
static int impl_node_send_command(void *object, const struct spa_command *command)
@ -436,6 +447,12 @@ impl_node_port_enum_params(void *object, int seq,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
break;
case 1:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_AsyncBuffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_async_buffers)));
break;
default:
return 0;
}
@ -683,13 +700,24 @@ impl_node_port_use_buffers(void *object,
struct io_info {
struct port *port;
void *data;
size_t size;
};
static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct io_info *info = user_data;
info->port->io = info->data;
if (info->size >= sizeof(struct spa_io_async_buffers)) {
struct spa_io_async_buffers *ab = info->data;
info->port->io[0] = &ab->buffers[0];
info->port->io[1] = &ab->buffers[1];
} else if (info->size >= sizeof(struct spa_io_buffers)) {
info->port->io[0] = info->data;
info->port->io[1] = info->data;
} else {
info->port->io[0] = NULL;
info->port->io[1] = NULL;
}
return 0;
}
@ -712,9 +740,11 @@ impl_node_port_set_io(void *object,
port = GET_PORT(this, direction, port_id);
info.port = port;
info.data = data;
info.size = size;
switch (id) {
case SPA_IO_Buffers:
case SPA_IO_AsyncBuffers:
spa_loop_invoke(this->data_loop,
do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info);
break;
@ -748,11 +778,12 @@ static int impl_node_process(void *object)
struct buffer **buffers;
struct buffer *outb;
const void **datas;
uint32_t cycle = this->position->clock.cycle & 1;
spa_return_val_if_fail(this != NULL, -EINVAL);
outport = GET_OUT_PORT(this, 0);
if ((outio = outport->io) == NULL)
if ((outio = outport->io[cycle]) == NULL)
return -EIO;
spa_log_trace_fp(this->log, "%p: status %p %d %d",
@ -780,16 +811,17 @@ static int impl_node_process(void *object)
struct spa_data *bd;
uint32_t size, offs;
if (SPA_UNLIKELY(!PORT_VALID(inport) ||
(inio = inport->io) == NULL ||
inio->buffer_id >= inport->n_buffers ||
inio->status != SPA_STATUS_HAVE_DATA)) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d "
if (SPA_UNLIKELY(!PORT_VALID(inport) || (inio = inport->io[cycle]) == NULL)) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d io:%p/%p/%d",
this, i, PORT_VALID(inport),
inport->io[0], inport->io[1], cycle);
continue;
}
if (inio->buffer_id >= inport->n_buffers ||
inio->status != SPA_STATUS_HAVE_DATA) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d "
"io:%p status:%d buf_id:%d n_buffers:%d", this,
i, PORT_VALID(inport), inio,
inio ? inio->status : -1,
inio ? inio->buffer_id : SPA_ID_INVALID,
inport->n_buffers);
i, inio, inio->status, inio->buffer_id, inport->n_buffers);
continue;
}

View file

@ -61,7 +61,7 @@ struct port {
struct port_props props;
struct spa_io_buffers *io;
struct spa_io_buffers *io[2];
uint64_t info_all;
struct spa_port_info info;
@ -96,6 +96,8 @@ struct impl {
struct spa_node_info info;
struct spa_param_info params[8];
struct spa_io_position *position;
struct spa_hook_list hooks;
uint32_t port_count;
@ -141,7 +143,16 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags,
static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
{
return -ENOTSUP;
struct impl *this = object;
switch (id) {
case SPA_IO_Position:
this->position = data;
break;
default:
return -ENOTSUP;
}
return 0;
}
static int impl_node_send_command(void *object, const struct spa_command *command)
@ -413,6 +424,12 @@ next:
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
break;
case 1:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_AsyncBuffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_async_buffers)));
break;
default:
return 0;
}
@ -618,13 +635,24 @@ impl_node_port_use_buffers(void *object,
struct io_info {
struct port *port;
void *data;
size_t size;
};
static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct io_info *info = user_data;
info->port->io = info->data;
if (info->size >= sizeof(struct spa_io_async_buffers)) {
struct spa_io_async_buffers *ab = info->data;
info->port->io[0] = &ab->buffers[0];
info->port->io[1] = &ab->buffers[1];
} else if (info->size >= sizeof(struct spa_io_buffers)) {
info->port->io[0] = info->data;
info->port->io[1] = info->data;
} else {
info->port->io[0] = NULL;
info->port->io[1] = NULL;
}
return 0;
}
@ -647,9 +675,11 @@ impl_node_port_set_io(void *object,
port = GET_PORT(this, direction, port_id);
info.port = port;
info.data = data;
info.size = size;
switch (id) {
case SPA_IO_Buffers:
case SPA_IO_AsyncBuffers:
spa_loop_invoke(this->data_loop,
do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info);
break;
@ -683,11 +713,12 @@ static int impl_node_process(void *object)
struct buffer **buffers;
struct buffer *outb;
const void **datas;
uint32_t cycle = this->position->clock.cycle & 1;
spa_return_val_if_fail(this != NULL, -EINVAL);
outport = GET_OUT_PORT(this, 0);
if ((outio = outport->io) == NULL)
if ((outio = outport->io[cycle]) == NULL)
return -EIO;
spa_log_trace_fp(this->log, "%p: status %p %d %d",
@ -715,16 +746,17 @@ static int impl_node_process(void *object)
struct spa_data *bd;
uint32_t size, offs;
if (SPA_UNLIKELY(!PORT_VALID(inport) ||
(inio = inport->io) == NULL ||
inio->buffer_id >= inport->n_buffers ||
inio->status != SPA_STATUS_HAVE_DATA)) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d "
if (SPA_UNLIKELY(!PORT_VALID(inport) || (inio = inport->io[cycle]) == NULL)) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d io:%p/%p/%d",
this, i, PORT_VALID(inport),
inport->io[0], inport->io[1], cycle);
continue;
}
if (inio->buffer_id >= inport->n_buffers ||
inio->status != SPA_STATUS_HAVE_DATA) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d "
"io:%p status:%d buf_id:%d n_buffers:%d", this,
i, PORT_VALID(inport), inio,
inio ? inio->status : -1,
inio ? inio->buffer_id : SPA_ID_INVALID,
inport->n_buffers);
i, inio, inio->status, inio->buffer_id, inport->n_buffers);
continue;
}
@ -735,8 +767,8 @@ static int impl_node_process(void *object)
size = SPA_MIN(bd->maxsize - offs, bd->chunk->size);
maxsize = SPA_MIN(maxsize, size);
spa_log_trace_fp(this->log, "%p: mix input %d %p->%p %d %d %d:%d/%d %u", this,
i, inio, outio, inio->status, inio->buffer_id,
spa_log_trace_fp(this->log, "%p: mix input %d %p->%p %d %d/%d %d:%d/%d %u", this,
i, inio, outio, inio->status, inio->buffer_id, inport->n_buffers,
offs, size, (int)sizeof(float),
bd->chunk->flags);

View file

@ -41,7 +41,7 @@ struct port {
uint32_t direction;
uint32_t id;
struct spa_io_buffers *io;
struct spa_io_buffers *io[2];
uint64_t info_all;
struct spa_port_info info;
@ -68,6 +68,8 @@ struct impl {
struct spa_node_info info;
struct spa_param_info params[8];
struct spa_io_position *position;
struct spa_hook_list hooks;
uint32_t port_count;
@ -111,7 +113,16 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags,
static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
{
return -ENOTSUP;
struct impl *this = object;
switch (id) {
case SPA_IO_Position:
this->position = data;
break;
default:
return -ENOTSUP;
}
return 0;
}
static int impl_node_send_command(void *object, const struct spa_command *command)
@ -353,6 +364,12 @@ next:
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
break;
case 1:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_AsyncBuffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_async_buffers)));
break;
default:
return 0;
}
@ -531,13 +548,24 @@ impl_node_port_use_buffers(void *object,
struct io_info {
struct port *port;
void *data;
size_t size;
};
static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct io_info *info = user_data;
info->port->io = info->data;
if (info->size >= sizeof(struct spa_io_async_buffers)) {
struct spa_io_async_buffers *ab = info->data;
info->port->io[0] = &ab->buffers[0];
info->port->io[1] = &ab->buffers[1];
} else if (info->size >= sizeof(struct spa_io_buffers)) {
info->port->io[0] = info->data;
info->port->io[1] = info->data;
} else {
info->port->io[0] = NULL;
info->port->io[1] = NULL;
}
return 0;
}
@ -560,9 +588,11 @@ impl_node_port_set_io(void *object,
port = GET_PORT(this, direction, port_id);
info.port = port;
info.data = data;
info.size = size;
switch (id) {
case SPA_IO_Buffers:
case SPA_IO_AsyncBuffers:
spa_loop_invoke(this->data_loop,
do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info);
break;
@ -631,11 +661,12 @@ static int impl_node_process(void *object)
struct spa_pod_frame f;
struct buffer *outb;
struct spa_data *d;
uint32_t cycle = this->position->clock.cycle & 1;
spa_return_val_if_fail(this != NULL, -EINVAL);
outport = GET_OUT_PORT(this, 0);
if ((outio = outport->io) == NULL)
if ((outio = outport->io[cycle]) == NULL)
return -EIO;
spa_log_trace_fp(this->log, "%p: status %p %d %d",
@ -668,16 +699,17 @@ static int impl_node_process(void *object)
struct spa_io_buffers *inio = NULL;
void *pod;
if (!inport->valid ||
(inio = inport->io) == NULL ||
inio->buffer_id >= inport->n_buffers ||
if (SPA_UNLIKELY(!PORT_VALID(inport) || (inio = inport->io[cycle]) == NULL)) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d io:%p/%p/%d",
this, i, PORT_VALID(inport),
inport->io[0], inport->io[1], cycle);
continue;
}
if (inio->buffer_id >= inport->n_buffers ||
inio->status != SPA_STATUS_HAVE_DATA) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d "
spa_log_trace_fp(this->log, "%p: skip input idx:%d "
"io:%p status:%d buf_id:%d n_buffers:%d", this,
i, inport->valid, inio,
inio ? inio->status : -1,
inio ? inio->buffer_id : SPA_ID_INVALID,
inport->n_buffers);
i, inio, inio->status, inio->buffer_id, inport->n_buffers);
continue;
}

View file

@ -31,7 +31,8 @@ PW_LOG_TOPIC_EXTERN(mod_topic);
#define MAX_BUFFERS 64
#define MAX_METAS 16u
#define MAX_DATAS 64u
#define AREA_SIZE (4096u / sizeof(struct spa_io_buffers))
#define AREA_SLOT (sizeof(struct spa_io_async_buffers))
#define AREA_SIZE (4096u / AREA_SLOT)
#define MAX_AREAS 32
#define CHECK_FREE_PORT(impl,d,p) (p <= pw_map_get_size(&impl->ports[d]) && !CHECK_PORT(impl,d,p))
@ -1363,7 +1364,7 @@ static int add_area(struct impl *impl)
size_t size;
struct pw_memblock *area;
size = sizeof(struct spa_io_buffers) * AREA_SIZE;
size = AREA_SLOT * AREA_SIZE;
area = pw_mempool_alloc(impl->context_pool,
PW_MEMBLOCK_FLAG_READWRITE |
@ -1449,6 +1450,7 @@ static int port_init_mix(void *data, struct pw_impl_port_mix *mix)
struct mix *m;
uint32_t idx, pos, len;
struct pw_memblock *area;
struct spa_io_async_buffers *ab;
if ((m = create_mix(port, mix->port.port_id)) == NULL)
return -ENOMEM;
@ -1472,9 +1474,12 @@ static int port_init_mix(void *data, struct pw_impl_port_mix *mix)
}
area = *pw_array_get_unchecked(&impl->io_areas, idx, struct pw_memblock*);
mix->io = SPA_PTROFF(area->map->ptr,
pos * sizeof(struct spa_io_buffers), void);
*mix->io = SPA_IO_BUFFERS_INIT;
ab = SPA_PTROFF(area->map->ptr, pos * AREA_SLOT, void);
mix->io_data = ab;
mix->io[0] = &ab->buffers[0];
mix->io[1] = &ab->buffers[1];
*mix->io[0] = SPA_IO_BUFFERS_INIT;
*mix->io[1] = SPA_IO_BUFFERS_INIT;
m->peer_id = mix->peer_id;
m->impl_mix_id = mix->id;
@ -1484,8 +1489,8 @@ static int port_init_mix(void *data, struct pw_impl_port_mix *mix)
mix->port.direction, mix->p->port_id,
mix->port.port_id, mix->peer_id, NULL);
pw_log_debug("%p: init mix id:%d io:%p base:%p", impl,
mix->id, mix->io, area->map->ptr);
pw_log_debug("%p: init mix id:%d io:%p/%p base:%p", impl,
mix->id, mix->io[0], mix->io[1], area->map->ptr);
return 0;
no_mem:
@ -1606,11 +1611,24 @@ static int impl_mix_port_set_io(void *object,
if (mix == NULL)
return -EINVAL;
if (id == SPA_IO_Buffers) {
switch (id) {
case SPA_IO_Buffers:
if (data && size >= sizeof(struct spa_io_buffers))
mix->io = data;
mix->io[0] = mix->io[1] = data;
else
mix->io = NULL;
mix->io[0] = mix->io[1] = NULL;
break;
case SPA_IO_AsyncBuffers:
if (data && size >= sizeof(struct spa_io_async_buffers)) {
struct spa_io_async_buffers *ab = data;
mix->io[0] = &ab->buffers[0];
mix->io[1] = &ab->buffers[1];
}
else
mix->io[0] = mix->io[1] = NULL;
break;
default:
break;
}
return do_port_set_io(impl,
direction, port->port_id, mix->port.port_id,

View file

@ -44,9 +44,10 @@ struct impl {
struct spa_hook output_node_listener;
struct spa_hook output_global_listener;
struct spa_io_buffers io;
struct spa_io_buffers io[2];
struct pw_impl_node *inode, *onode;
bool async;
};
/** \endcond */
@ -98,7 +99,8 @@ static void pw_node_peer_activate(struct pw_node_peer *peer)
if (peer->active_count++ == 0) {
spa_list_append(&peer->output->rt.target_list, &peer->target.link);
if (!peer->target.active && peer->output->rt.driver_target.node != NULL) {
state->required++;
if (!peer->output->async)
state->required++;
peer->target.active = true;
}
}
@ -115,7 +117,8 @@ static void pw_node_peer_deactivate(struct pw_node_peer *peer)
spa_list_remove(&peer->target.link);
if (peer->target.active) {
state->required--;
if (!peer->output->async)
state->required--;
peer->target.active = false;
}
}
@ -534,14 +537,15 @@ static void select_io(struct pw_impl_link *this)
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct spa_io_buffers *io;
io = this->rt.in_mix.io;
io = this->rt.in_mix.io_data;
if (io == NULL)
io = this->rt.out_mix.io;
io = this->rt.out_mix.io_data;
if (io == NULL)
io = &impl->io;
io = impl->io;
this->io = io;
*this->io = SPA_IO_BUFFERS_INIT;
this->io[0] = SPA_IO_BUFFERS_INIT;
this->io[1] = SPA_IO_BUFFERS_INIT;
}
static int do_allocation(struct pw_impl_link *this)
@ -678,6 +682,7 @@ int pw_impl_link_activate(struct pw_impl_link *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
int res;
uint32_t io_type, io_size;
pw_log_debug("%p: activate activated:%d state:%s", this, impl->activated,
pw_link_state_as_string(this->info.state));
@ -686,11 +691,19 @@ int pw_impl_link_activate(struct pw_impl_link *this)
!impl->inode->runnable || !impl->onode->runnable)
return 0;
if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.in_mix)) < 0)
if (impl->async) {
io_type = SPA_IO_AsyncBuffers;
io_size = sizeof(struct spa_io_async_buffers);
} else {
io_type = SPA_IO_Buffers;
io_size = sizeof(struct spa_io_buffers);
}
if ((res = port_set_io(this, this->input, io_type, this->io,
io_size, &this->rt.in_mix)) < 0)
goto error;
if ((res = port_set_io(this, this->output, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.out_mix)) < 0)
if ((res = port_set_io(this, this->output, io_type, this->io,
io_size, &this->rt.out_mix)) < 0)
goto error_clean;
pw_loop_invoke(this->output->node->data_loop,
@ -703,7 +716,7 @@ int pw_impl_link_activate(struct pw_impl_link *this)
return 0;
error_clean:
port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0, &this->rt.in_mix);
port_set_io(this, this->input, io_type, NULL, 0, &this->rt.in_mix);
error:
pw_log_error("%p: can't activate link: %s", this, spa_strerror(res));
return res;
@ -892,7 +905,7 @@ int pw_impl_link_deactivate(struct pw_impl_link *this)
impl->activated = false;
pw_log_info("(%s) deactivated", this->name);
if (this->info.state < PW_LINK_STATE_PAUSED || this->destroyed)
link_update_state(this, PW_LINK_STATE_INIT, 0, NULL);
else
@ -1385,6 +1398,13 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
if (this->passive && str == NULL)
pw_properties_set(properties, PW_KEY_LINK_PASSIVE, "true");
impl->async = (output_node->async || input_node->async) &&
SPA_FLAG_IS_SET(output->flags, PW_IMPL_PORT_FLAG_ASYNC) &&
SPA_FLAG_IS_SET(input->flags, PW_IMPL_PORT_FLAG_ASYNC);
if (impl->async)
pw_properties_set(properties, PW_KEY_LINK_ASYNC, "true");
spa_hook_list_init(&this->listener_list);
impl->format_filter = format_filter;
@ -1414,8 +1434,6 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
spa_list_append(&output->links, &this->output_link);
spa_list_append(&input->links, &this->input_link);
impl->io = SPA_IO_BUFFERS_INIT;
select_io(this);
if (this->feedback) {
@ -1434,7 +1452,8 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
this->name = spa_aprintf("%d.%d.%d -> %d.%d.%d",
output_node->info.id, output->port_id, this->rt.out_mix.port.port_id,
input_node->info.id, input->port_id, this->rt.in_mix.port.port_id);
pw_log_info("(%s) (%s) -> (%s)", this->name, output_node->name, input_node->name);
pw_log_info("(%s) (%s) -> (%s) async:%04x:%04x:%d", this->name, output_node->name,
input_node->name, output->flags, input->flags, impl->async);
pw_impl_port_emit_link_added(output, this);
pw_impl_port_emit_link_added(input, this);

View file

@ -113,7 +113,8 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
spa_list_for_each(t, &this->rt.target_list, link) {
dstate = &t->activation->state[0];
if (!t->active) {
dstate->required++;
if (!this->async)
dstate->required++;
t->active = true;
}
pw_log_trace("%p: driver state:%p pending:%d/%d, node state:%p pending:%d/%d",
@ -146,7 +147,8 @@ static void remove_node(struct pw_impl_node *this)
spa_list_for_each(t, &this->rt.target_list, link) {
dstate = &t->activation->state[0];
if (t->active) {
dstate->required--;
if (!this->async)
dstate->required--;
t->active = false;
}
pw_log_trace("%p: driver state:%p pending:%d/%d, node state:%p pending:%d/%d",
@ -973,7 +975,7 @@ static void check_properties(struct pw_impl_node *node)
const char *str, *recalc_reason = NULL;
struct spa_fraction frac;
uint32_t value;
bool driver, trigger, transport, sync;
bool driver, trigger, transport, sync, async;
struct match match;
match = MATCH_INIT(node);
@ -1082,6 +1084,11 @@ static void check_properties(struct pw_impl_node *node)
node->transport = transport;
recalc_reason = "transport changed";
}
async = pw_properties_get_bool(node->properties, PW_KEY_NODE_ASYNC, false);
if (async != node->async) {
pw_log_info("%p: async %d -> %d", node, node->async, async);
node->async = async;
}
if ((str = pw_properties_get(node->properties, PW_KEY_MEDIA_CLASS)) != NULL &&
(strstr(str, "/Sink") != NULL || strstr(str, "/Source") != NULL)) {
@ -1360,7 +1367,8 @@ static inline int process_node(void *data)
/* we don't need to trigger targets when the node was driving the
* graph because that means we finished the graph. */
if (SPA_LIKELY(!this->driving)) {
trigger_targets(&this->rt.target, status, nsec);
if (!this->async)
trigger_targets(&this->rt.target, status, nsec);
} else {
/* calculate CPU time when finished */
a->signal_time = this->driver_start;

View file

@ -168,6 +168,12 @@ next:
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
break;
case 1:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_AsyncBuffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
break;
default:
return 0;
}
@ -242,13 +248,22 @@ static int port_set_io(void *object,
if (mix == NULL)
return -ENOENT;
if (id == SPA_IO_Buffers) {
switch (id) {
case SPA_IO_Buffers:
case SPA_IO_AsyncBuffers:
if (data == NULL || size == 0) {
pw_loop_invoke(this->node->data_loop,
do_remove_mix, SPA_ID_INVALID, NULL, 0, true, mix);
mix->io = NULL;
mix->io_data = mix->io[0] = mix->io[1] = NULL;
} else if (data != NULL && size >= sizeof(struct spa_io_buffers)) {
mix->io = data;
if (size >= sizeof(struct spa_io_async_buffers)) {
struct spa_io_async_buffers *ab = data;
mix->io_data = data;
mix->io[0] = &ab->buffers[0];
mix->io[1] = &ab->buffers[1];
} else {
mix->io_data = mix->io[0] = mix->io[1] = data;
}
pw_loop_invoke(this->node->data_loop,
do_add_mix, SPA_ID_INVALID, NULL, 0, false, mix);
}
@ -262,12 +277,13 @@ static int tee_process(void *object)
struct pw_impl_port *this = &impl->this;
struct pw_impl_port_mix *mix;
struct spa_io_buffers *io = &this->rt.io;
uint32_t cycle = (this->node->rt.position->clock.cycle + 1) & 1;
pw_log_trace_fp("%p: tee input %d %d", this, io->status, io->buffer_id);
spa_list_for_each(mix, &impl->mix_list, rt_link) {
pw_log_trace_fp("%p: port %d %p->%p %d", this,
mix->port.port_id, io, mix->io, mix->io->buffer_id);
*mix->io = *io;
mix->port.port_id, io, mix->io[cycle], mix->io[cycle]->buffer_id);
*mix->io[cycle] = *io;
}
io->status = SPA_STATUS_NEED_DATA;
@ -299,15 +315,17 @@ static int schedule_mix_input(void *object)
struct pw_impl_port *this = &impl->this;
struct spa_io_buffers *io = &this->rt.io;
struct pw_impl_port_mix *mix;
uint32_t cycle = (this->node->rt.position->clock.cycle + 1) & 1;
if (SPA_UNLIKELY(PW_IMPL_PORT_IS_CONTROL(this)))
return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
spa_list_for_each(mix, &impl->mix_list, rt_link) {
pw_log_trace_fp("%p: mix input %d %p->%p %d %d", this,
mix->port.port_id, mix->io, io, mix->io->status, mix->io->buffer_id);
*io = *mix->io;
mix->io->status = SPA_STATUS_NEED_DATA;
mix->port.port_id, mix->io[cycle], io,
mix->io[cycle]->status, mix->io[cycle]->buffer_id);
*io = *mix->io[cycle];
mix->io[cycle]->status = SPA_STATUS_NEED_DATA;
break;
}
return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
@ -541,6 +559,9 @@ static int check_param_io(void *data, int seq, uint32_t id,
pw_control_new(node->context, port, pid, psize, 0);
SPA_FLAG_SET(port->flags, PW_IMPL_PORT_FLAG_CONTROL);
break;
case SPA_IO_AsyncBuffers:
SPA_FLAG_SET(port->flags, PW_IMPL_PORT_FLAG_ASYNC);
SPA_FALLTHROUGH;
case SPA_IO_Buffers:
SPA_FLAG_SET(port->flags, PW_IMPL_PORT_FLAG_BUFFERS);
break;
@ -615,6 +636,7 @@ static void check_params(struct pw_impl_port *port)
port->info.params[i].user = 0;
port->flags &= ~(PW_IMPL_PORT_FLAG_CONTROL |
PW_IMPL_PORT_FLAG_ASYNC |
PW_IMPL_PORT_FLAG_BUFFERS);
pw_impl_port_for_each_param(port, 0, SPA_PARAM_IO, 0, 0, NULL, check_param_io, port);

View file

@ -182,6 +182,7 @@ extern "C" {
#define PW_KEY_NODE_CACHE_PARAMS "node.cache-params" /**< cache the node params */
#define PW_KEY_NODE_TRANSPORT_SYNC "node.transport.sync" /**< the node handles transport sync */
#define PW_KEY_NODE_DRIVER "node.driver" /**< node can drive the graph */
#define PW_KEY_NODE_ASYNC "node.async" /**< the node wants async scheduling */
#define PW_KEY_NODE_STREAM "node.stream" /**< node is a stream, the server side should
* add a converter */
#define PW_KEY_NODE_VIRTUAL "node.virtual" /**< the node is some sort of virtual
@ -230,6 +231,7 @@ extern "C" {
#define PW_KEY_LINK_FEEDBACK "link.feedback" /**< indicate that a link is a feedback
* link and the target will receive data
* in the next cycle */
#define PW_KEY_LINK_ASYNC "link.async" /**< the link is using async io */
/** device properties */
#define PW_KEY_DEVICE_ID "device.id" /**< device id */

View file

@ -688,6 +688,7 @@ struct pw_impl_node {
unsigned int checked; /**< for sorting */
unsigned int sync:1; /**< the sync-groups are active */
unsigned int transport:1; /**< the transport is active */
unsigned int async:1; /**< async processing, one cycle latency */
uint32_t port_user_data_size; /**< extra size for port user data */
@ -755,7 +756,8 @@ struct pw_impl_port_mix {
enum spa_direction direction;
uint32_t port_id;
} port;
struct spa_io_buffers *io;
struct spa_io_buffers *io[2];
void *io_data;
uint32_t id;
uint32_t peer_id;
unsigned int have_buffers:1;
@ -811,6 +813,7 @@ struct pw_impl_port {
#define PW_IMPL_PORT_FLAG_BUFFERS (1<<1) /**< port has data */
#define PW_IMPL_PORT_FLAG_CONTROL (1<<2) /**< port has control */
#define PW_IMPL_PORT_FLAG_NO_MIXER (1<<3) /**< don't try to add mixer to port */
#define PW_IMPL_PORT_FLAG_ASYNC (1<<4) /**< port support async io */
uint32_t flags;
uint64_t spa_flags;

View file

@ -2033,6 +2033,9 @@ pw_stream_connect(struct pw_stream *stream,
impl->media_subtype == SPA_MEDIA_SUBTYPE_control)
pw_properties_set(impl->port_props, PW_KEY_FORMAT_DSP, "8 bit raw midi");
if ((str = pw_properties_get(stream->properties, PW_KEY_NODE_ASYNC)) != NULL && spa_atob(str))
SPA_FLAG_SET(impl->info.flags, SPA_NODE_FLAG_ASYNC);
match = MATCH_INIT(stream);
pw_context_conf_section_match_rules(impl->context, "stream.rules",
&stream->properties->dict, execute_match, &match);