Update some docs
Remove special structs for some events
Remove some unused flags
Update some plugins
This commit is contained in:
Wim Taymans 2017-01-19 18:10:00 +01:00
parent 7a9dc2c4fd
commit 0398f997d7
12 changed files with 390 additions and 352 deletions

View file

@ -1,7 +1,7 @@
Pinos
-----
The idea is to make a DBus service where you can provide
The idea is to make a server where you can provide
and consume media to/from.
Some of the requirements are:
@ -16,39 +16,16 @@ only and should be able to handle compressed video and other
streamable media as well.
The design is in some part inspired by pulseaudio, hence its original
name. Increasinly we also seem to add functionality of jack.
name. Increasinly we also seem to add functionality of jack and
GStreamer.
Objects
-------
Protocol
--------
Daemon1: the main pinos daemon
/org/pinos/server
Client1: a connected client, the result object from call
Daemon1.ConnectClient
/org/pinos/client*
Node1: a processing node, this can be a source, sink or transform
element. Nodes have ports
/org/pinos/node*
Link1: a link between 2 ports
/org/pinos/link*
DBus protocol
-------------
The main daemon is registered on the session bus with name: org.pinos
Various Node1 objects are registered in the server based on the available
sources or sinks of content. Node1 has properties and its ports have format
descriptions of what it can provide or consume.
First a client needs to register a Node1 with pinos by calling
org.pinos.Daemon1.CreateClientNode(). This creates a new Client1 object and
a Node 1 object that the client must use for further communication.
A client then needs to use the pinos protocol to control the Node1. It will
also receive commands and notifications from pinos.
The protocol is similar to wayland but with custom
serialization/deserialization of messages. This is because the datastructures
in the messages are more complicated.
fd management
@ -58,13 +35,6 @@ Clients receive fds with buffers and memory after a format was negotiated.
Updates to these buffers are notified by a message containing the id of the
buffer.
* client remove
When a client disconnects from pinos, it must have released all fd-indexes
that it received. Pinos will force a release and will reuse the fd-indexes
when the client disconnects.
Wire
----

View file

@ -170,12 +170,12 @@ static void
send_need_input (SpaProxy *this)
{
PinosNode *pnode = this->pnode;
SpaNodeEventNeedInput ni;
SpaNodeEvent event;
uint64_t cmd = 1;
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
ni.event.size = sizeof (ni);
pinos_transport_add_event (pnode->transport, &ni.event);
event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
event.size = sizeof (event);
pinos_transport_add_event (pnode->transport, &event);
write (this->data_source.fd, &cmd, 8);
}
@ -183,12 +183,12 @@ static void
send_have_output (SpaProxy *this)
{
PinosNode *pnode = this->pnode;
SpaNodeEventHaveOutput ho;
SpaNodeEvent event;
uint64_t cmd = 1;
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho);
pinos_transport_add_event (pnode->transport, &ho.event);
event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
event.size = sizeof (event);
pinos_transport_add_event (pnode->transport, &event);
write (this->data_source.fd, &cmd, 8);
}

View file

@ -41,8 +41,8 @@ typedef enum {
SPA_RESULT_NO_FORMAT = -4,
SPA_RESULT_INVALID_COMMAND = -5,
SPA_RESULT_INVALID_PORT = -6,
SPA_RESULT_HAVE_ENOUGH_INPUT = -7,
SPA_RESULT_NEED_MORE_INPUT = -8,
SPA_RESULT_HAVE_OUTPUT = -7,
SPA_RESULT_NEED_INPUT = -8,
SPA_RESULT_PORTS_CHANGED = -9,
SPA_RESULT_FORMAT_CHANGED = -10,
SPA_RESULT_PROPERTIES_CHANGED = -11,
@ -69,6 +69,7 @@ typedef enum {
SPA_RESULT_NO_MEMORY = -32,
SPA_RESULT_NO_PERMISSION = -33,
SPA_RESULT_SKIPPED = -34,
SPA_RESULT_OUT_OF_BUFFERS = -35,
} SpaResult;
#define SPA_ASYNC_MASK (3 << 30)

View file

@ -76,14 +76,6 @@ typedef struct {
SpaResult res;
} SpaNodeEventAsyncComplete;
typedef struct {
SpaNodeEvent event;
} SpaNodeEventHaveOutput;
typedef struct {
SpaNodeEvent event;
} SpaNodeEventNeedInput;
typedef struct {
SpaNodeEvent event;
uint32_t port_id;

View file

@ -84,7 +84,6 @@ typedef enum {
*/
typedef enum {
SPA_PORT_INPUT_FLAG_NONE = 0,
SPA_PORT_INPUT_FLAG_NEED_INPUT = 1 << 0,
} SpaPortInputFlags;
/**
@ -106,13 +105,9 @@ typedef struct {
/**
* SpaPortOutputFlags:
* @SPA_PORT_OUTPUT_FLAG_NONE: no flag
* @SPA_PORT_OUTPUT_FLAG_PULL: force a #SPA_NODE_EVENT_NEED_INPUT event on the
* peer input ports when no data is available.
*/
typedef enum {
SPA_PORT_OUTPUT_FLAG_NONE = 0,
SPA_PORT_OUTPUT_FLAG_HAVE_OUTPUT = (1 << 0),
SPA_PORT_OUTPUT_FLAG_PULL = (1 << 1),
} SpaPortOutputFlags;
/**
@ -152,7 +147,10 @@ typedef void (*SpaNodeEventCallback) (SpaNode *node,
/**
* SpaNode:
*
* The main processing nodes.
* A SpaNode is a component that can comsume and produce buffers.
*
*
*
*/
struct _SpaNode {
/* the total size of this node. This can be used to expand this
@ -559,18 +557,65 @@ struct _SpaNode {
uint32_t port_id,
SpaNodeCommand *command);
/**
* SpaNode::process:
* SpaNode::process_input:
* @node: a #SpaNode
* @flags: process flags
*
* Process the node.
* Process the input area of the node.
*
* Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_HAVE_ENOUGH_INPUT when output can be produced.
* For synchronous nodes, this function is called to start processing data
* or when process_output returned SPA_RESULT_NEED_MORE_INPUT
*
* For Asynchronous node, this function is called when a NEED_INPUT event
* is received from the node.
*
* Before calling this function, you must configure SpaPortInput structures
* configured on the input ports.
*
* The node will loop through all SpaPortInput structures and will
* process the buffers. For each port, the port info will be updated as:
*
* - The buffer_id is set to SPA_ID_INVALID and the status is set to
* SPA_RESULT_OK when the buffer was successfully consumed
*
* - The buffer_id is untouched and the status is set to an error when
* the buffer was invalid.
*
* - The buffer_id is untouched and the status is set to SPA_RESULT_OK
* when no input was consumed. This can happen when the node does not
* need input on this port.
*
* Returns: #SPA_RESULT_OK on success or when the node is asynchronous
* #SPA_RESULT_HAVE_OUTPUT for synchronous nodes when output
* can be consumed.
* #SPA_RESULT_OUT_OF_BUFFERS for synchronous nodes when buffers
* should be released with port_reuse_buffer
* #SPA_RESULT_ERROR when one of the inputs is in error
*/
SpaResult (*process_input) (SpaNode *node);
SpaResult (*process_output) (SpaNode *node);
/**
* SpaNode::process_output:
* @node: a #SpaNode
*
* Tell the node to produce more output.
*
* Before calling this function you must process the buffers and events
* in the SpaPortOutput structure and set the buffer_id to SPA_ID_INVALID
* for all consumed buffers. Buffers that you do not want to consume should
* be returned to the node with port_reuse_buffer.
*
* For synchronous nodes, this function can be called when process_input
* returned #SPA_RESULT_HAVE_ENOUGH_INPUT.
*
* For Asynchronous node, this function is called when a HAVE_OUTPUT event
* is received from the node.
*
* Returns: #SPA_RESULT_OK on success or when the node is asynchronous
* #SPA_RESULT_NEED_INPUT for synchronous nodes when input
* is needed.
* #SPA_RESULT_ERROR when one of the outputs is in error
*/
SpaResult (*process_output) (SpaNode *node);
};
#define spa_node_get_props(n,...) (n)->get_props((n),__VA_ARGS__)

View file

@ -132,31 +132,6 @@ typedef struct {
SpaDict *extra;
} SpaPortInfo;
/**
* SpaPortStatusFlags:
* @SPA_PORT_STATUS_FLAG_NONE: no status flags
* @SPA_PORT_STATUS_FLAG_HAVE_FORMAT: port has a format
* @SPA_PORT_STATUS_FLAG_HAVE_BUFFERS: port has buffers
* @SPA_PORT_STATUS_FLAG_HAVE_OUTPUT: port has output
* @SPA_PORT_STATUS_FLAG_NEED_INPUT: port needs input
*/
typedef enum {
SPA_PORT_STATUS_FLAG_NONE = 0,
SPA_PORT_STATUS_FLAG_HAVE_FORMAT = 1 << 0,
SPA_PORT_STATUS_FLAG_HAVE_BUFFERS = 1 << 1,
SPA_PORT_STATUS_FLAG_HAVE_OUTPUT = 1 << 2,
SPA_PORT_STATUS_FLAG_NEED_INPUT = 1 << 3,
} SpaPortStatusFlags;
/**
* SpaPortStatus:
* @flags: port status flags
*/
typedef struct {
SpaPortStatusFlags flags;
} SpaPortStatus;
#ifdef __cplusplus
} /* extern "C" */
#endif

View file

@ -266,11 +266,11 @@ pull_frames_queue (SpaALSAState *state,
snd_pcm_uframes_t frames)
{
if (spa_list_is_empty (&state->ready)) {
SpaNodeEventNeedInput ni;
SpaNodeEvent event;
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
ni.event.size = sizeof (ni);
state->event_cb (&state->node, &ni.event, state->user_data);
event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
event.size = sizeof (event);
state->event_cb (&state->node, &event, state->user_data);
}
if (!spa_list_is_empty (&state->ready)) {
uint8_t *src, *dst;
@ -487,7 +487,7 @@ mmap_read (SpaALSAState *state)
}
if (b) {
SpaNodeEventHaveOutput ho;
SpaNodeEvent event;
SpaData *d;
SpaPortOutput *output;
@ -501,9 +501,9 @@ mmap_read (SpaALSAState *state)
output->buffer_id = b->outbuf->id;
output->status = SPA_RESULT_OK;
}
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho);
state->event_cb (&state->node, &ho.event, state->user_data);
event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
event.size = sizeof (event);
state->event_cb (&state->node, &event, state->user_data);
}
return 0;
}

View file

@ -300,8 +300,6 @@ spa_audiomixer_node_remove_port (SpaNode *node,
if (input && input->buffer_id) {
this->port_queued--;
}
if (this->port_count == this->port_queued)
((SpaPortOutput*)this->out_ports[0].io)->flags |= SPA_PORT_STATUS_FLAG_HAVE_OUTPUT;
return SPA_RESULT_OK;
}
@ -543,9 +541,6 @@ spa_audiomixer_node_process_input (SpaNode *node)
if ((output = this->out_ports[0].io) == NULL)
return SPA_RESULT_OK;
if (output->flags & SPA_PORT_STATUS_FLAG_HAVE_OUTPUT)
return SPA_RESULT_HAVE_ENOUGH_INPUT;
this->port_queued = 0;
for (i = 0; i < MAX_PORTS; i++) {
@ -560,28 +555,17 @@ spa_audiomixer_node_process_input (SpaNode *node)
have_error = true;
continue;
}
port->buffer = port->buffers[port->n_buffers];
if (port->buffer == NULL) {
port->buffer = port->buffers[input->buffer_id];
input->buffer_id = SPA_ID_INVALID;
}
input->status = SPA_RESULT_OK;
this->port_queued++;
}
if (this->port_queued == this->port_count)
output->flags |= SPA_PORT_STATUS_FLAG_HAVE_OUTPUT;
if (have_error)
return SPA_RESULT_ERROR;
return SPA_RESULT_OK;
}
static void
pull_port (SpaAudioMixer *this, uint32_t port_id, SpaPortOutput *output, size_t pull_size)
{
SpaNodeEventNeedInput ni;
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
ni.event.size = sizeof (ni);
this->event_cb (&this->node, &ni.event, this->user_data);
return this->port_queued == this->port_count ? SPA_RESULT_HAVE_OUTPUT : SPA_RESULT_OK;
}
static void
@ -615,8 +599,6 @@ add_port_data (SpaAudioMixer *this, SpaBuffer *out, SpaAudioMixerPort *port)
if ((is -= chunk) == 0) {
if (++port->buffer_index == port->buffer->n_datas) {
port->buffer = NULL;
((SpaPortInput*)port->io)->flags = SPA_PORT_STATUS_FLAG_NEED_INPUT;
((SpaPortOutput*)this->out_ports[0].io)->flags = SPA_PORT_STATUS_FLAG_HAVE_OUTPUT;
break;
}
port->buffer_offset = 0;
@ -637,24 +619,17 @@ add_port_data (SpaAudioMixer *this, SpaBuffer *out, SpaAudioMixerPort *port)
static SpaResult
mix_data (SpaAudioMixer *this, SpaPortOutput *output)
{
int i, min_size, min_port, pull_size;
int i, min_size, min_port;
SpaBuffer *buf;
pull_size = 0;
min_size = 0;
min_port = 0;
for (i = 0; i < MAX_PORTS; i++) {
if (!this->in_ports[i].valid)
continue;
if (this->in_ports[i].buffer == NULL) {
if (pull_size && output->flags & SPA_PORT_OUTPUT_FLAG_PULL) {
pull_port (this, i, output, pull_size);
}
if (this->in_ports[i].buffer == NULL)
return SPA_RESULT_NEED_MORE_INPUT;
}
if (this->in_ports[i].buffer == NULL)
return SPA_RESULT_NEED_INPUT;
if (min_size == 0 || this->in_ports[i].buffer_queued < min_size) {
min_size = this->in_ports[i].buffer_queued;
@ -662,15 +637,12 @@ mix_data (SpaAudioMixer *this, SpaPortOutput *output)
}
}
if (min_port == 0)
return SPA_RESULT_NEED_MORE_INPUT;
return SPA_RESULT_NEED_INPUT;
buf = this->in_ports[min_port].buffer;
output->buffer_id = buf->id;
this->in_ports[min_port].buffer = NULL;
((SpaPortInput*)this->in_ports[min_port].io)->flags = SPA_PORT_STATUS_FLAG_NEED_INPUT;
((SpaPortOutput*)this->out_ports[0].io)->flags &= ~SPA_PORT_STATUS_FLAG_HAVE_OUTPUT;
for (i = 0; i < MAX_PORTS; i++) {
if (!this->in_ports[i].valid || this->in_ports[i].buffer == NULL)
continue;
@ -699,9 +671,6 @@ spa_audiomixer_node_process_output (SpaNode *node)
if (!port->have_format)
return SPA_RESULT_NO_FORMAT;
// if (!(output->flags & SPA_PORT_STATUS_FLAG_HAVE_OUTPUT))
// return SPA_RESULT_NEED_MORE_INPUT;
if ((output->status = mix_data (this, output)) < 0)
return SPA_RESULT_ERROR;

View file

@ -25,8 +25,8 @@
#include <spa/id-map.h>
#include <spa/log.h>
#include <spa/node.h>
#include <spa/loop.h>
#include <spa/node.h>
#include <spa/list.h>
#include <spa/audio/format.h>
#include <lib/props.h>
@ -35,6 +35,11 @@
#define BYTES_TO_SAMPLES(this,b) ((b)/(this)->bpf)
#define BYTES_TO_TIME(this,b) SAMPLES_TO_TIME(this, BYTES_TO_SAMPLES (this, b))
typedef struct {
uint32_t node;
uint32_t clock;
} URI;
typedef struct _SpaAudioTestSrc SpaAudioTestSrc;
typedef struct {
@ -58,11 +63,6 @@ struct _ATSBuffer {
SpaList link;
};
typedef struct {
uint32_t node;
uint32_t clock;
} URI;
struct _SpaAudioTestSrc {
SpaHandle handle;
SpaNode node;
@ -78,7 +78,6 @@ struct _SpaAudioTestSrc {
SpaNodeEventCallback event_cb;
void *user_data;
bool timer_enabled;
SpaSource timer_source;
struct itimerspec timerspec;
@ -135,15 +134,21 @@ static const SpaPropRangeInfo freq_range[] = {
};
enum {
PROP_ID_LIVE,
PROP_ID_WAVE,
PROP_ID_FREQ,
PROP_ID_VOLUME,
PROP_ID_LIVE,
PROP_ID_LAST,
};
static const SpaPropInfo prop_info[] =
{
{ PROP_ID_LIVE, offsetof (SpaAudioTestSrcProps, live),
"live",
SPA_PROP_FLAG_READWRITE,
SPA_PROP_TYPE_BOOL, sizeof (bool),
SPA_PROP_RANGE_TYPE_NONE, 0, NULL,
NULL },
{ PROP_ID_WAVE, offsetof (SpaAudioTestSrcProps, wave),
"wave",
SPA_PROP_FLAG_READWRITE,
@ -162,12 +167,6 @@ static const SpaPropInfo prop_info[] =
SPA_PROP_TYPE_DOUBLE, sizeof (double),
SPA_PROP_RANGE_TYPE_MIN_MAX, 2, volume_range,
NULL },
{ PROP_ID_LIVE, offsetof (SpaAudioTestSrcProps, live),
"live",
SPA_PROP_FLAG_READWRITE,
SPA_PROP_TYPE_BOOL, sizeof (bool),
SPA_PROP_RANGE_TYPE_NONE, 0, NULL,
NULL },
};
static SpaResult
@ -229,12 +228,12 @@ spa_audiotestsrc_node_set_props (SpaNode *node,
static SpaResult
send_have_output (SpaAudioTestSrc *this)
{
SpaNodeEventHaveOutput ho;
SpaNodeEvent event;
if (this->event_cb) {
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho);
this->event_cb (&this->node, &ho.event, this->user_data);
event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
event.size = sizeof (event);
this->event_cb (&this->node, &event, this->user_data);
}
return SPA_RESULT_OK;
@ -253,8 +252,24 @@ fill_buffer (SpaAudioTestSrc *this, ATSBuffer *b)
return SPA_RESULT_OK;
}
static SpaResult update_loop_enabled (SpaAudioTestSrc *this, bool enabled);
static void
set_timer (SpaAudioTestSrc *this, bool enabled)
{
if (enabled) {
if (this->props[1].live) {
uint64_t next_time = this->start_time + this->elapsed_time;
this->timerspec.it_value.tv_sec = next_time / SPA_NSEC_PER_SEC;
this->timerspec.it_value.tv_nsec = next_time % SPA_NSEC_PER_SEC;
} else {
this->timerspec.it_value.tv_sec = 0;
this->timerspec.it_value.tv_nsec = 1;
}
} else {
this->timerspec.it_value.tv_sec = 0;
this->timerspec.it_value.tv_nsec = 0;
}
timerfd_settime (this->timer_source.fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL);
}
static void
audiotestsrc_on_output (SpaSource *source)
@ -262,9 +277,13 @@ audiotestsrc_on_output (SpaSource *source)
SpaAudioTestSrc *this = source->data;
ATSBuffer *b;
SpaPortOutput *output;
uint64_t expirations;
if (read (this->timer_source.fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t))
perror ("read timerfd");
if (spa_list_is_empty (&this->empty)) {
update_loop_enabled (this, false);
set_timer (this, false);
return;
}
b = spa_list_first (&this->empty, ATSBuffer, link);
@ -280,18 +299,7 @@ audiotestsrc_on_output (SpaSource *source)
this->sample_count += b->size / this->bpf;
this->elapsed_time = SAMPLES_TO_TIME (this, this->sample_count);
if (this->props[1].live) {
uint64_t expirations, next_time;
if (read (this->timer_source.fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t))
perror ("read timerfd");
next_time = this->start_time + this->elapsed_time;
this->timerspec.it_value.tv_sec = next_time / SPA_NSEC_PER_SEC;
this->timerspec.it_value.tv_nsec = next_time % SPA_NSEC_PER_SEC;
timerfd_settime (this->timer_source.fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL);
}
set_timer (this, true);
if ((output = this->output)) {
b->outstanding = true;
@ -307,33 +315,6 @@ update_state (SpaAudioTestSrc *this, SpaNodeState state)
this->node.state = state;
}
static SpaResult
update_loop_enabled (SpaAudioTestSrc *this, bool enabled)
{
if (this->event_cb && this->timer_enabled != enabled) {
this->timer_enabled = enabled;
if (enabled)
this->timer_source.mask = SPA_IO_IN;
else
this->timer_source.mask = 0;
if (this->props[1].live) {
if (enabled) {
uint64_t next_time = this->start_time + this->elapsed_time;
this->timerspec.it_value.tv_sec = next_time / SPA_NSEC_PER_SEC;
this->timerspec.it_value.tv_nsec = next_time % SPA_NSEC_PER_SEC;
}
else {
this->timerspec.it_value.tv_sec = 0;
this->timerspec.it_value.tv_nsec = 0;
}
timerfd_settime (this->timer_source.fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL);
}
spa_loop_update_source (this->data_loop, &this->timer_source);
}
return SPA_RESULT_OK;
}
static SpaResult
spa_audiotestsrc_node_send_command (SpaNode *node,
SpaNodeCommand *command)
@ -371,7 +352,7 @@ spa_audiotestsrc_node_send_command (SpaNode *node,
this->elapsed_time = 0;
this->started = true;
update_loop_enabled (this, true);
set_timer (this, true);
update_state (this, SPA_NODE_STATE_STREAMING);
break;
}
@ -387,7 +368,7 @@ spa_audiotestsrc_node_send_command (SpaNode *node,
return SPA_RESULT_OK;
this->started = false;
update_loop_enabled (this, false);
set_timer (this, false);
update_state (this, SPA_NODE_STATE_PAUSED);
break;
}
@ -728,9 +709,6 @@ spa_audiotestsrc_node_port_alloc_buffers (SpaNode *node,
if (!this->have_format)
return SPA_RESULT_NO_FORMAT;
if (this->n_buffers == 0)
return SPA_RESULT_NO_BUFFERS;
return SPA_RESULT_NOT_IMPLEMENTED;
}
@ -792,7 +770,7 @@ spa_audiotestsrc_node_port_reuse_buffer (SpaNode *node,
spa_list_insert (this->empty.prev, &b->link);
if (!this->props[1].live)
update_loop_enabled (this, true);
set_timer (this, true);
return SPA_RESULT_OK;
}

View file

@ -893,7 +893,7 @@ static void
v4l2_on_fd_events (SpaSource *source)
{
SpaV4l2Source *this = source->data;
SpaNodeEventHaveOutput ho;
SpaNodeEvent event;
if (source->rmask & SPA_IO_ERR)
return;
@ -904,9 +904,9 @@ v4l2_on_fd_events (SpaSource *source)
if (mmap_read (this) < 0)
return;
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho);
this->event_cb (&this->node, &ho.event, this->user_data);
event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
event.size = sizeof (event);
this->event_cb (&this->node, &event, this->user_data);
}
static SpaResult

View file

@ -77,7 +77,6 @@ struct _SpaVideoTestSrc {
void *user_data;
SpaSource timer_source;
bool timer_enabled;
struct itimerspec timerspec;
SpaPortInfo info;
@ -194,12 +193,12 @@ spa_videotestsrc_node_set_props (SpaNode *node,
static SpaResult
send_have_output (SpaVideoTestSrc *this)
{
SpaNodeEventHaveOutput ho;
SpaNodeEvent event;
if (this->event_cb) {
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho);
this->event_cb (&this->node, &ho.event, this->user_data);
event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
event.size = sizeof (event);
this->event_cb (&this->node, &event, this->user_data);
}
return SPA_RESULT_OK;
@ -213,7 +212,24 @@ fill_buffer (SpaVideoTestSrc *this, VTSBuffer *b)
return draw (this, b->ptr);
}
static SpaResult update_loop_enabled (SpaVideoTestSrc *this, bool enabled);
static void
set_timer (SpaVideoTestSrc *this, bool enabled)
{
if (enabled) {
if (this->props[1].live) {
uint64_t next_time = this->start_time + this->elapsed_time;
this->timerspec.it_value.tv_sec = next_time / SPA_NSEC_PER_SEC;
this->timerspec.it_value.tv_nsec = next_time % SPA_NSEC_PER_SEC;
} else {
this->timerspec.it_value.tv_sec = 0;
this->timerspec.it_value.tv_nsec = 1;
}
} else {
this->timerspec.it_value.tv_sec = 0;
this->timerspec.it_value.tv_nsec = 0;
}
timerfd_settime (this->timer_source.fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL);
}
static void
videotestsrc_on_output (SpaSource *source)
@ -221,9 +237,13 @@ videotestsrc_on_output (SpaSource *source)
SpaVideoTestSrc *this = source->data;
VTSBuffer *b;
SpaPortOutput *output;
uint64_t expirations;
if (read (this->timer_source.fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t))
perror ("read timerfd");
if (spa_list_is_empty (&this->empty)) {
update_loop_enabled (this, false);
set_timer (this, false);
return;
}
b = spa_list_first (&this->empty, VTSBuffer, link);
@ -239,18 +259,7 @@ videotestsrc_on_output (SpaSource *source)
this->frame_count++;
this->elapsed_time = FRAMES_TO_TIME (this, this->frame_count);
if (this->props[1].live) {
uint64_t expirations, next_time;
if (read (this->timer_source.fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t))
perror ("read timerfd");
next_time = this->start_time + this->elapsed_time;
this->timerspec.it_value.tv_sec = next_time / SPA_NSEC_PER_SEC;
this->timerspec.it_value.tv_nsec = next_time % SPA_NSEC_PER_SEC;
timerfd_settime (this->timer_source.fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL);
}
set_timer (this, true);
if ((output = this->output)) {
b->outstanding = true;
@ -266,33 +275,6 @@ update_state (SpaVideoTestSrc *this, SpaNodeState state)
this->node.state = state;
}
static SpaResult
update_loop_enabled (SpaVideoTestSrc *this, bool enabled)
{
if (this->event_cb && this->timer_enabled != enabled) {
this->timer_enabled = enabled;
if (enabled)
this->timer_source.mask = SPA_IO_IN;
else
this->timer_source.mask = 0;
if (this->props[1].live) {
if (enabled) {
uint64_t next_time = this->start_time + this->elapsed_time;
this->timerspec.it_value.tv_sec = next_time / SPA_NSEC_PER_SEC;
this->timerspec.it_value.tv_nsec = next_time % SPA_NSEC_PER_SEC;
}
else {
this->timerspec.it_value.tv_sec = 0;
this->timerspec.it_value.tv_nsec = 0;
}
timerfd_settime (this->timer_source.fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL);
}
spa_loop_update_source (this->data_loop, &this->timer_source);
}
return SPA_RESULT_OK;
}
static SpaResult
spa_videotestsrc_node_send_command (SpaNode *node,
SpaNodeCommand *command)
@ -330,7 +312,7 @@ spa_videotestsrc_node_send_command (SpaNode *node,
this->elapsed_time = 0;
this->started = true;
update_loop_enabled (this, true);
set_timer (this, true);
update_state (this, SPA_NODE_STATE_STREAMING);
break;
}
@ -346,7 +328,7 @@ spa_videotestsrc_node_send_command (SpaNode *node,
return SPA_RESULT_OK;
this->started = false;
update_loop_enabled (this, false);
set_timer (this, false);
update_state (this, SPA_NODE_STATE_PAUSED);
break;
}
@ -699,9 +681,6 @@ spa_videotestsrc_node_port_alloc_buffers (SpaNode *node,
if (!this->have_format)
return SPA_RESULT_NO_FORMAT;
if (this->n_buffers == 0)
return SPA_RESULT_NO_BUFFERS;
return SPA_RESULT_NOT_IMPLEMENTED;
}
@ -763,7 +742,7 @@ spa_videotestsrc_node_port_reuse_buffer (SpaNode *node,
spa_list_insert (this->empty.prev, &b->link);
if (!this->props[1].live)
update_loop_enabled (this, true);
set_timer (this, true);
return SPA_RESULT_OK;
}

View file

@ -23,9 +23,12 @@
#include <spa/log.h>
#include <spa/id-map.h>
#include <spa/node.h>
#include <spa/list.h>
#include <spa/audio/format.h>
#include <lib/props.h>
#define MAX_BUFFERS 16
typedef struct _SpaVolume SpaVolume;
typedef struct {
@ -35,12 +38,28 @@ typedef struct {
} SpaVolumeProps;
typedef struct {
bool have_format;
SpaPortInfo info;
SpaBuffer **buffers;
unsigned int n_buffers;
SpaBuffer *buffer;
void *io;
SpaBuffer *outbuf;
bool outstanding;
SpaMetaHeader *h;
void *ptr;
size_t size;
SpaList link;
} SpaVolumeBuffer;
typedef struct {
bool have_format;
SpaPortInfo info;
SpaAllocParam *params[2];
SpaAllocParamBuffers param_buffers;
SpaAllocParamMetaEnable param_meta;
SpaVolumeBuffer buffers[MAX_BUFFERS];
unsigned int n_buffers;
void *io;
SpaList empty;
} SpaVolumePort;
typedef struct {
@ -60,7 +79,6 @@ struct _SpaVolume {
SpaNodeEventCallback event_cb;
void *user_data;
bool have_format;
SpaFormatAudio query_format;
SpaFormatAudio current_format;
@ -114,7 +132,7 @@ reset_volume_props (SpaVolumeProps *props)
static void
update_state (SpaVolume *this, SpaNodeState state)
{
this->node.state = state;
this->node.state = state;
}
static SpaResult
@ -300,6 +318,17 @@ spa_volume_node_port_enum_formats (SpaNode *node,
return SPA_RESULT_OK;
}
static SpaResult
clear_buffers (SpaVolume *this, SpaVolumePort *port)
{
if (port->n_buffers > 0) {
spa_log_info (this->log, "volume %p: clear buffers", this);
port->n_buffers = 0;
spa_list_init (&port->empty);
}
return SPA_RESULT_OK;
}
static SpaResult
spa_volume_node_port_set_format (SpaNode *node,
SpaDirection direction,
@ -311,7 +340,7 @@ spa_volume_node_port_set_format (SpaNode *node,
SpaVolumePort *port;
SpaResult res;
if (node == NULL || format == NULL)
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaVolume, node);
@ -323,13 +352,37 @@ spa_volume_node_port_set_format (SpaNode *node,
if (format == NULL) {
port->have_format = false;
return SPA_RESULT_OK;
clear_buffers (this, port);
} else {
if ((res = spa_format_audio_parse (format, &this->current_format)) < 0)
return res;
port->have_format = true;
}
if ((res = spa_format_audio_parse (format, &this->current_format)) < 0)
return res;
if (port->have_format) {
port->info.maxbuffering = -1;
port->info.latency = 0;
port->have_format = true;
port->info.n_params = 2;
port->info.params = port->params;
port->params[0] = &port->param_buffers.param;
port->param_buffers.param.type = SPA_ALLOC_PARAM_TYPE_BUFFERS;
port->param_buffers.param.size = sizeof (port->param_buffers);
port->param_buffers.minsize = 16;
port->param_buffers.stride = 16;
port->param_buffers.min_buffers = 2;
port->param_buffers.max_buffers = MAX_BUFFERS;
port->param_buffers.align = 16;
port->params[1] = &port->param_meta.param;
port->param_meta.param.type = SPA_ALLOC_PARAM_TYPE_META_ENABLE;
port->param_meta.param.size = sizeof (port->param_meta);
port->param_meta.type = SPA_META_TYPE_HEADER;
port->info.extra = NULL;
update_state (this, SPA_NODE_STATE_READY);
}
else
update_state (this, SPA_NODE_STATE_CONFIGURE);
return SPA_RESULT_OK;
}
@ -409,7 +462,58 @@ spa_volume_node_port_use_buffers (SpaNode *node,
SpaBuffer **buffers,
uint32_t n_buffers)
{
return SPA_RESULT_NOT_IMPLEMENTED;
SpaVolume *this;
SpaVolumePort *port;
unsigned int i;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaVolume, node);
if (!CHECK_PORT (this, direction, port_id))
return SPA_RESULT_INVALID_PORT;
port = direction == SPA_DIRECTION_INPUT ? &this->in_ports[port_id] : &this->out_ports[port_id];
if (!port->have_format)
return SPA_RESULT_NO_FORMAT;
clear_buffers (this, port);
for (i = 0; i < n_buffers; i++) {
SpaVolumeBuffer *b;
SpaData *d = buffers[i]->datas;
b = &port->buffers[i];
b->outbuf = buffers[i];
b->outstanding = true;
b->h = spa_buffer_find_meta (buffers[i], SPA_META_TYPE_HEADER);
switch (d[0].type) {
case SPA_DATA_TYPE_MEMPTR:
case SPA_DATA_TYPE_MEMFD:
case SPA_DATA_TYPE_DMABUF:
if (d[0].data == NULL) {
spa_log_error (this->log, "volume %p: invalid memory on buffer %p", this, buffers[i]);
continue;
}
b->ptr = d[0].data;
b->size = d[0].maxsize;
break;
default:
break;
}
spa_list_insert (port->empty.prev, &b->link);
}
port->n_buffers = n_buffers;
if (port->n_buffers > 0) {
update_state (this, SPA_NODE_STATE_PAUSED);
} else {
update_state (this, SPA_NODE_STATE_READY);
}
return SPA_RESULT_OK;
}
static SpaResult
@ -469,7 +573,34 @@ spa_volume_node_port_reuse_buffer (SpaNode *node,
uint32_t port_id,
uint32_t buffer_id)
{
return SPA_RESULT_NOT_IMPLEMENTED;
SpaVolume *this;
SpaVolumeBuffer *b;
SpaVolumePort *port;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaVolume, node);
if (!CHECK_PORT (this, SPA_DIRECTION_OUTPUT, port_id))
return SPA_RESULT_INVALID_PORT;
port = &this->out_ports[port_id];
if (port->n_buffers == 0)
return SPA_RESULT_NO_BUFFERS;
if (buffer_id >= port->n_buffers)
return SPA_RESULT_INVALID_BUFFER_ID;
b = &port->buffers[buffer_id];
if (!b->outstanding)
return SPA_RESULT_OK;
b->outstanding = false;
spa_list_insert (port->empty.prev, &b->link);
return SPA_RESULT_OK;
}
static SpaResult
@ -481,47 +612,19 @@ spa_volume_node_port_send_command (SpaNode *node,
return SPA_RESULT_NOT_IMPLEMENTED;
}
static SpaResult
spa_volume_node_process_input (SpaNode *node)
{
SpaVolume *this;
SpaPortInput *input;
SpaPortOutput *output;
SpaVolumePort *port;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaVolume, node);
port = &this->in_ports[0];
if ((input = port->io) == NULL)
return SPA_RESULT_ERROR;
if ((output = this->out_ports[0].io) == NULL)
return SPA_RESULT_ERROR;
if (input->buffer_id >= port->n_buffers) {
input->status = SPA_RESULT_INVALID_BUFFER_ID;
return SPA_RESULT_ERROR;
}
if (!port->have_format) {
input->status = SPA_RESULT_NO_FORMAT;
return SPA_RESULT_ERROR;
}
port->buffer = port->buffers[input->buffer_id];
input->status = SPA_RESULT_HAVE_ENOUGH_INPUT;
input->flags &= ~SPA_PORT_STATUS_FLAG_NEED_INPUT;
output->flags |= SPA_PORT_STATUS_FLAG_HAVE_OUTPUT;
return SPA_RESULT_HAVE_ENOUGH_INPUT;
}
static SpaBuffer *
find_free_buffer (SpaVolume *this, SpaVolumePort *port)
{
return NULL;
SpaVolumeBuffer *b;
if (spa_list_is_empty (&port->empty))
return NULL;
b = spa_list_first (&port->empty, SpaVolumeBuffer, link);
spa_list_remove (&b->link);
b->outstanding = true;
return b->outbuf;
}
static void
@ -536,42 +639,16 @@ release_buffer (SpaVolume *this, SpaBuffer *buffer)
this->event_cb (&this->node, &rb.event, this->user_data);
}
static SpaResult
spa_volume_node_process_output (SpaNode *node)
static void
do_volume (SpaVolume *this, SpaBuffer *dbuf, SpaBuffer *sbuf)
{
SpaVolume *this;
SpaVolumePort *port;
unsigned int si, di, i, n_samples, n_bytes, soff, doff ;
SpaBuffer *sbuf, *dbuf;
SpaData *sd, *dd;
uint16_t *src, *dst;
double volume;
SpaPortInput *input;
SpaPortInput *output;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaVolume, node);
port = &this->out_ports[0];
if ((output = port->io) == NULL)
return SPA_RESULT_ERROR;
if ((input = this->in_ports[0].io) == NULL)
return SPA_RESULT_ERROR;
if (!port->have_format)
return SPA_RESULT_NO_FORMAT;
if (this->in_ports[0].buffer == NULL)
return SPA_RESULT_NEED_MORE_INPUT;
volume = this->props[1].volume;
sbuf = this->in_ports[0].buffer;
dbuf = find_free_buffer (this, port);
si = di = 0;
soff = doff = 0;
@ -603,17 +680,67 @@ spa_volume_node_process_output (SpaNode *node)
doff = 0;
}
}
if (sbuf != dbuf)
release_buffer (this, sbuf);
}
static SpaResult
spa_volume_node_process_input (SpaNode *node)
{
SpaVolume *this;
SpaPortInput *input;
SpaPortOutput *output;
SpaVolumePort *in_port, *out_port;
SpaBuffer *dbuf, *sbuf;
if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaVolume, node);
in_port = &this->in_ports[0];
out_port = &this->out_ports[0];
if ((input = in_port->io) == NULL)
return SPA_RESULT_ERROR;
if ((output = out_port->io) == NULL)
return SPA_RESULT_ERROR;
if (!in_port->have_format) {
input->status = SPA_RESULT_NO_FORMAT;
return SPA_RESULT_ERROR;
}
if (input->buffer_id >= in_port->n_buffers) {
input->status = SPA_RESULT_INVALID_BUFFER_ID;
return SPA_RESULT_ERROR;
}
if (output->buffer_id >= out_port->n_buffers) {
dbuf = find_free_buffer (this, out_port);
} else {
dbuf = out_port->buffers[output->buffer_id].outbuf;
}
if (dbuf == NULL)
return SPA_RESULT_OUT_OF_BUFFERS;
sbuf = in_port->buffers[input->buffer_id].outbuf;
input->buffer_id = SPA_ID_INVALID;
input->status = SPA_RESULT_OK;
do_volume (this, sbuf, dbuf);
this->in_ports[0].buffer = NULL;
output->buffer_id = dbuf->id;
output->status = SPA_RESULT_OK;
input->flags |= SPA_PORT_STATUS_FLAG_NEED_INPUT;
output->flags &= ~SPA_PORT_STATUS_FLAG_HAVE_OUTPUT;
if (sbuf != dbuf)
release_buffer (this, sbuf);
return SPA_RESULT_OK;
return SPA_RESULT_HAVE_OUTPUT;
}
static SpaResult
spa_volume_node_process_output (SpaNode *node)
{
return SPA_RESULT_NEED_INPUT;
}
static const SpaNode volume_node = {
@ -707,9 +834,11 @@ volume_init (const SpaHandleFactory *factory,
this->in_ports[0].info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS |
SPA_PORT_INFO_FLAG_IN_PLACE;
this->out_ports[0].info.flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS |
SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS |
spa_list_init (&this->in_ports[0].empty);
this->out_ports[0].info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS |
SPA_PORT_INFO_FLAG_NO_REF;
spa_list_init (&this->out_ports[0].empty);
return SPA_RESULT_OK;
}