Improve state handling

Improve the state handling in v4l2
Send buffers in one message
Update design doc
This commit is contained in:
Wim Taymans 2016-08-25 17:07:40 +02:00
parent fbd6304663
commit 7e858ff694
8 changed files with 202 additions and 453 deletions

View file

@ -27,16 +27,9 @@ Daemon1: the main pinos daemon
Client1: a connected client, the result object from call
Daemon1.ConnectClient
/org/pinos/client*
Device1: a physical device on the machine, devices can provide
processing nodes
/org/pinos/device*
Node1: a processing node, this can be a source, sink or transform
element.
element. Nodes have ports
/org/pinos/node*
Port1: a port on a Node1, ports can be input or output ports
/org/pinos/node*/port*
Channel1: a communication channel between a client and port
/org/pinos/client/channel*
Link1: a link between 2 ports
/org/pinos/link*
@ -47,138 +40,23 @@ DBus protocol
The main daemon is registered on the session bus with name: org.pinos
Various Node1 objects are registered in the server based on the available
sources or sinks of content. Source1 has properties and has format descriptions of
what it can provide.
sources or sinks of content. Node1 has properties and its ports have format
descriptions of what it can provide or consume.
First a client needs to register with pinos by calling
org.pinos.Daemon1.ConnectClient(). This creates a new Client1 object that
the client must use for further communication.
First a client needs to register a Node1 with pinos by calling
org.pinos.Daemon1.CreateClientNode(). This creates a new Client1 object and
a Node 1 object that the client must use for further communication.
A client can then do org.pinos.Client1.CreateChannel() to create a
new Channel to retrieve/send data from/to a node. It can specify a node/port
explicitly or let the server choose a port. The client must provide a list
of formats it can handle along with extra properties that can help with
selecting an appropriate port.
A client can then call org.pinos.Channel1.Start() to negotiate the final
media format and start the data transfer. A new fd is returned to the client
along with the negotiated format and properties.
All further media transport is then performed on the fd. The client will read
from the fd to get data and metadata from the server. The wire format is
generic and extensible and allows for inline serialized events such as
property changes and format changes.
A client then needs to use the pinos protocol to control the Node1. It will
also receive commands and notifications from pinos.
fd management
-------------
Pinos shares data between clients by using fd passing. Sometimes the memory
referenced by the fd needs to be reused. It is important that all pinos
clients lost their reference to the fd before it can be reused.
What follows are some scenarios of how the lifecycle of the fds are
managed.
* server side
v4l2src pinossocketsink
| |
| |
make buffer |--------->|
| | (1)
| | (2) ----->
| | (3)
|... | ...
| |
| | (4) <-----
|<---------|
(1) pinossocketsink generates the pinos message from the v4l2 input
buffer. It is assumed in the next steps that the sink
receives fd-memory from v4l2src and that the memory is only
freed again when no clients are looking at the fd.
(2) pinossocketsink sends the buffer to N Pinos clients
(3) for each client that is sent a buffer, pinossocketsink uses the
fdmanager object to map the fd-index that was
sent, to the buffer and client. The buffer is reffed and kept
alive for as long as the client is using the buffer.
(4) when a message is received from a client, pinossocketsink
parses the message and instructs the fdmanager to release
the fd-index again. When all clients release the fd, the buffer
will be unreffed and v4l2src can reuse the memory.
* client consumer
pinossrc xvimagesink
| |
-------> (1)|------------------->| (2)
| |
| (3) |
|<-------------------|
<------- (4)| |
| |
(1) pinossrc receives a buffer from Pinos and wraps the fd with data
in an fd-memory. It remembers the fd-index as qdata on the memory.
It has a special DestroyNotify on the qdata.
(2) xvimagesink renders the buffer and frees the buffer.
(3) freeing the buffer causes the qdata DestoyNotify to be called.
(4) pinossrc constructs a release-fd message and sends it to Pinos
* client producer
videotestsrc pinossink
| |
(1)|------------------->|
| | (2) ----->
| |
| (4) | (3) <-----
|<-------------------|
(1) videotestsrc produces a buffer
(2) pinossink makes a PinosBuffer from the input buffer. It will also
keep the buffer in a hash table indexed by the fd-index.
(3) pinossink receives an fd-release message from Pinos. It removes
the fd-index from the hashtable and
the hashtable and the buffer is unreffed
(4) the fd is returned to videotestsrc for reuse.
* client producer, server side
socketsrc pinossocketsink
| |
------> (1) | (2)|
|----------->|
| | (3) -------->
| | ....
| (4)|
| | ....
| |
| (6)| (5) <--------
<------- (7)|<-----------|
| |
(1) pinos buffer arrives from a client. socketsrc wraps the
fd
(2) pinossocketsink sets a weak-ref on the buffer to know when it is
freed.
(3) pinossocketsink sends the buffer to the clients
(4) for each buffer that is sent, the sink uses the fdmanager to map the
fd-index to a buffer and a client. it keeps a ref on the buffer
(5) release-fd is received from a client
(6) pinossocketsink removes the fd-index from the fdmanager. If all
clients released the fd, the buffer will be freeds, triggering
the DestroyNotify. This will then trigger an event with a release-fd
message to the source.
(7) the source sends the release-fd message to Pinos
Clients receive fds with buffers and memory after a format was negotiated.
Updates to these buffers are notified by a message containing the id of the
buffer.
* client remove
@ -187,194 +65,11 @@ that it received. Pinos will force a release and will reuse the fd-indexes
when the client disconnects.
Wire
----
Fixed header
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Version |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Flags |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Version : 4 bytes : message version
Flags : 4 bytes : extra flags
Length : 4 bytes : total message length
Followed by 1 or more type-length-data sections
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Type | Len ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Data .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
<type> : 1 byte
<length> : variable length, 7 bits, high bit is continuation marker
<data> : <length> bytes, see below for contents based on <type>
Types:
1: continuation section
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Offset ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Size ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Rest of the commands can be found in the shared memory region at
@offset and @size. A shared memory region is negotiated when the client
connects to the server.
<offset> : 8 bytes : offset
<size> : 8 bytes : size
2: header
Header for payload
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| flags |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| seq |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| PTS ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| DTS-offset ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
<flags> : 4 bytes : buffer flags
<seq> : 4 bytes : sequence number
<pts> : 8 bytes : presentation time
<dts-offset> : 8 bytes : dts-offset
3: fd-payload section
Used to send a block of data between client and server. The type of fd and
the possible operations on it are negotiated when the client connects.
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| offset ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| size ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| fd-index |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
<id> : 4 bytes : id of the fd-payload
<offset> : 8 bytes : offset
<size> : 8 bytes : size
<fd-index> : 4 bytes : index of fd
4: release fd-payload
Release a fd-payload with <id>
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
<id> : 4 bytes : the id number of the released fd-payload
5: format change
Perform an in-band format change. The following data blocks will be in this
new format.
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| id | format ..... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ...... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
<format-id> : 1 byte : format id
<format> : 0-terminated : contains serialized format
6: property changes
Notify a property change.
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| key .... .. |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ...... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| value .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ...... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
<key> : 0-terminated : key
<value> : 0-terminated : value
... : more key/values to fill length, 0 key or
message length is end
7: refresh request
Request a new refresh point
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| last-id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| request-type |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| PTS ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
<last-id> : the last seen id
<request-type> : the type of request
0 = keyframe
1 = keyframe+full headers
<PTS> : the timestamp when the refresh should be,
0 for as soon as possible
communication channel
---------------------
The wire protocol for the node control channel is a serialization of
structures.
+-----+ +----+ +----+

View file

@ -38,7 +38,7 @@
#include "pinos/client/private.h"
#define MAX_BUFFER_SIZE 4096
#define MAX_FDS 16
#define MAX_FDS 32
typedef struct {
bool cleanup;
@ -808,9 +808,6 @@ parse_control (PinosStream *stream,
mem->fd = fd;
mem->ptr = NULL;
mem->size = p.size;
} else {
g_debug ("duplicated mem %d,%d, %d, %d", p.mem.pool_id, p.mem.id, fd, p.flags);
close (fd);
}
break;
}

View file

@ -68,10 +68,6 @@
<property name='Sender' type='s' access='read' />
<!-- Name: Properties of the client -->
<property name='Properties' type='a{sv}' access='read' />
</interface>
<!--

View file

@ -58,6 +58,7 @@ typedef enum {
SPA_RESULT_UNEXPECTED = -26,
SPA_RESULT_NO_BUFFERS = -27,
SPA_RESULT_INVALID_BUFFER_ID = -28,
SPA_RESULT_WRONG_STATE = -29,
} SpaResult;
typedef enum {

View file

@ -723,11 +723,16 @@ spa_control_builder_add_fd (SpaControlBuilder *builder,
bool close)
{
struct stack_builder *sb = SCSB (builder);
int index;
int index, i;
if (!is_valid_builder (builder) || fd < 0)
return -1;
for (i = 0; i < sb->control.n_fds; i++) {
if (sb->control.fds[i] == fd || sb->control.fds[i] == -fd)
return i;
}
if (sb->control.n_fds >= sb->control.max_fds) {
int new_size = sb->control.max_fds + 8;
fprintf (stderr, "builder %p: realloc control fds %d -> %d\n",

View file

@ -626,21 +626,14 @@ spa_proxy_node_port_get_status (SpaNode *node,
}
static SpaResult
add_buffer (SpaProxy *this, uint32_t port_id, SpaBuffer *buffer)
add_buffer (SpaProxy *this, SpaControlBuilder *builder, uint32_t port_id, SpaBuffer *buffer)
{
SpaControl control;
SpaControlBuilder builder;
uint8_t buf[1024];
int fds[16];
SpaControlCmdAddMem am;
SpaControlCmdAddBuffer ab;
int i;
SpaResult res;
SpaBuffer *b;
SpaMemory *bmem;
spa_control_builder_init_into (&builder, buf, sizeof (buf), fds, sizeof (fds));
if (buffer->mem.mem.id == SPA_ID_INVALID) {
fprintf (stderr, "proxy %p: alloc buffer space %zd\n", this, buffer->mem.size);
bmem = spa_memory_alloc_with_fd (SPA_MEMORY_POOL_SHARED, buffer, buffer->mem.size);
@ -655,10 +648,10 @@ add_buffer (SpaProxy *this, uint32_t port_id, SpaBuffer *buffer)
am.port_id = port_id;
am.mem = bmem->mem;
am.mem_type = 0;
am.fd_index = spa_control_builder_add_fd (&builder, bmem->fd, false);
am.fd_index = spa_control_builder_add_fd (builder, bmem->fd, false);
am.flags = bmem->flags;
am.size = bmem->size;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_ADD_MEM, &am);
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_ADD_MEM, &am);
for (i = 0; i < b->n_datas; i++) {
SpaData *d = &SPA_BUFFER_DATAS (b)[i];
@ -672,60 +665,42 @@ add_buffer (SpaProxy *this, uint32_t port_id, SpaBuffer *buffer)
am.port_id = port_id;
am.mem = mem->mem;
am.mem_type = 0;
am.fd_index = spa_control_builder_add_fd (&builder, mem->fd, false);
am.fd_index = spa_control_builder_add_fd (builder, mem->fd, false);
am.flags = mem->flags;
am.size = mem->size;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_ADD_MEM, &am);
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_ADD_MEM, &am);
}
ab.port_id = port_id;
ab.buffer_id = b->id;
ab.mem.mem = bmem->mem;
ab.mem.offset = b->mem.offset;
ab.mem.size = b->mem.size;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_ADD_BUFFER, &ab);
spa_control_builder_end (&builder, &control);
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
fprintf (stderr, "proxy %p: error writing control\n", this);
spa_control_clear (&control);
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_ADD_BUFFER, &ab);
return SPA_RESULT_OK;
}
static SpaResult
remove_buffer (SpaProxy *this, uint32_t port_id, SpaBuffer *buffer)
remove_buffer (SpaProxy *this, SpaControlBuilder *builder, uint32_t port_id, SpaBuffer *buffer)
{
SpaControl control;
SpaControlBuilder builder;
uint8_t buf[1024];
SpaControlCmdRemoveBuffer rb;
SpaControlCmdRemoveMem rm;
unsigned int i;
SpaResult res;
spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0);
rb.port_id = port_id;
rb.buffer_id = buffer->id;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REMOVE_BUFFER, &rb);
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_REMOVE_BUFFER, &rb);
rm.port_id = port_id;
rm.mem = buffer->mem.mem;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REMOVE_MEM, &rm);
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_REMOVE_MEM, &rm);
for (i = 0; i < buffer->n_datas; i++) {
SpaData *d = &SPA_BUFFER_DATAS (buffer)[i];
rm.port_id = port_id;
rm.mem = d->mem.mem;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REMOVE_MEM, &rm);
spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_REMOVE_MEM, &rm);
}
spa_control_builder_end (&builder, &control);
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
fprintf (stderr, "proxy %p: error writing control\n", this);
spa_control_clear (&control);
return SPA_RESULT_OK;
}
@ -740,6 +715,11 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
SpaProxy *this;
SpaProxyPort *port;
unsigned int i;
SpaControl control;
SpaControlBuilder builder;
uint8_t buf[4096];
int fds[32];
SpaResult res;
if (node == NULL || node->handle == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
@ -755,8 +735,10 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
if (!port->format)
return SPA_RESULT_NO_FORMAT;
spa_control_builder_init_into (&builder, buf, sizeof (buf), fds, sizeof (fds));
for (i = 0; i < port->n_buffers; i++)
remove_buffer (this, port_id, port->buffers[i]);
remove_buffer (this, &builder, port_id, port->buffers[i]);
if (buffers == NULL || n_buffers == 0) {
port->buffers = NULL;
@ -766,7 +748,14 @@ spa_proxy_node_port_use_buffers (SpaNode *node,
port->n_buffers = n_buffers;
}
for (i = 0; i < port->n_buffers; i++)
add_buffer (this, port_id, port->buffers[i]);
add_buffer (this, &builder, port_id, port->buffers[i]);
spa_control_builder_end (&builder, &control);
if ((res = spa_control_write (&control, this->fds[0].fd)) < 0)
fprintf (stderr, "proxy %p: error writing control\n", this);
spa_control_clear (&control);
return SPA_RESULT_OK;
}

View file

@ -119,6 +119,7 @@ typedef struct {
struct _SpaV4l2Source {
SpaHandle handle;
SpaNode node;
SpaNodeState node_state;
SpaV4l2SourceProps props[2];
@ -201,11 +202,16 @@ spa_v4l2_source_node_set_props (SpaNode *node,
}
static void
send_state_change (SpaV4l2Source *this, SpaNodeState state)
update_state (SpaV4l2Source *this, SpaNodeState state)
{
SpaEvent event;
SpaEventStateChange sc;
if (this->node_state == state)
return;
this->node_state = state;
event.type = SPA_EVENT_TYPE_STATE_CHANGE;
event.port_id = -1;
event.data = &sc;
@ -219,6 +225,7 @@ spa_v4l2_source_node_send_command (SpaNode *node,
SpaCommand *command)
{
SpaV4l2Source *this;
SpaResult res;
if (node == NULL || node->handle == NULL || command == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
@ -230,15 +237,37 @@ spa_v4l2_source_node_send_command (SpaNode *node,
return SPA_RESULT_INVALID_COMMAND;
case SPA_COMMAND_START:
spa_v4l2_start (this);
{
SpaV4l2State *state = &this->state[0];
send_state_change (this, SPA_NODE_STATE_STREAMING);
if (state->current_format == NULL)
return SPA_RESULT_NO_FORMAT;
if (!state->have_buffers)
return SPA_RESULT_NO_BUFFERS;
if ((res = spa_v4l2_start (this)) < 0)
return res;
update_state (this, SPA_NODE_STATE_STREAMING);
break;
}
case SPA_COMMAND_PAUSE:
spa_v4l2_stop (this);
{
SpaV4l2State *state = &this->state[0];
send_state_change (this, SPA_NODE_STATE_PAUSED);
if (state->current_format == NULL)
return SPA_RESULT_NO_FORMAT;
if (!state->have_buffers)
return SPA_RESULT_NO_BUFFERS;
if ((res = spa_v4l2_pause (this)) < 0)
return res;
update_state (this, SPA_NODE_STATE_PAUSED);
break;
}
case SPA_COMMAND_FLUSH:
case SPA_COMMAND_DRAIN:
@ -263,7 +292,7 @@ spa_v4l2_source_node_set_event_callback (SpaNode *node,
this->event_cb = event;
this->user_data = user_data;
send_state_change (this, SPA_NODE_STATE_CONFIGURE);
update_state (this, SPA_NODE_STATE_CONFIGURE);
return SPA_RESULT_OK;
}
@ -385,6 +414,8 @@ spa_v4l2_source_node_port_set_format (SpaNode *node,
state = &this->state[port_id];
if (format == NULL) {
spa_v4l2_clear_buffers (this);
spa_v4l2_close (this);
state->current_format = NULL;
return SPA_RESULT_OK;
}
@ -410,6 +441,11 @@ spa_v4l2_source_node_port_set_format (SpaNode *node,
f->size.width == state->current_format->size.width &&
f->size.height == state->current_format->size.height)
return SPA_RESULT_OK;
if (!(flags & SPA_PORT_FORMAT_FLAG_TEST_ONLY)) {
spa_v4l2_use_buffers (this, NULL, 0);
state->current_format = NULL;
}
}
if (spa_v4l2_set_format (this, f, flags & SPA_PORT_FORMAT_FLAG_TEST_ONLY) < 0)
@ -419,7 +455,7 @@ spa_v4l2_source_node_port_set_format (SpaNode *node,
memcpy (tf, f, fs);
state->current_format = tf;
send_state_change (this, SPA_NODE_STATE_READY);
update_state (this, SPA_NODE_STATE_READY);
}
return SPA_RESULT_OK;
@ -494,6 +530,8 @@ spa_v4l2_source_node_port_use_buffers (SpaNode *node,
uint32_t n_buffers)
{
SpaV4l2Source *this;
SpaV4l2State *state;
SpaResult res;
if (node == NULL || node->handle == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
@ -503,9 +541,26 @@ spa_v4l2_source_node_port_use_buffers (SpaNode *node,
if (port_id != 0)
return SPA_RESULT_INVALID_PORT;
spa_v4l2_import_buffers (this, buffers, n_buffers);
state = &this->state[port_id];
return SPA_RESULT_OK;
if (state->current_format == NULL)
return SPA_RESULT_NO_FORMAT;
if (state->have_buffers) {
if ((res = spa_v4l2_clear_buffers (this)) < 0)
return res;
}
if (buffers != NULL) {
if ((res = spa_v4l2_use_buffers (this, buffers, n_buffers)) < 0)
return res;
}
if (state->have_buffers)
update_state (this, SPA_NODE_STATE_PAUSED);
else
update_state (this, SPA_NODE_STATE_READY);
return res;
}
static SpaResult
@ -517,6 +572,8 @@ spa_v4l2_source_node_port_alloc_buffers (SpaNode *node,
unsigned int *n_buffers)
{
SpaV4l2Source *this;
SpaV4l2State *state;
SpaResult res;
if (node == NULL || node->handle == NULL || buffers == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
@ -526,9 +583,17 @@ spa_v4l2_source_node_port_alloc_buffers (SpaNode *node,
if (port_id != 0)
return SPA_RESULT_INVALID_PORT;
spa_v4l2_alloc_buffers (this, params, n_params, buffers, n_buffers);
state = &this->state[port_id];
return SPA_RESULT_OK;
if (state->current_format == NULL)
return SPA_RESULT_NO_FORMAT;
res = spa_v4l2_alloc_buffers (this, params, n_params, buffers, n_buffers);
if (state->have_buffers)
update_state (this, SPA_NODE_STATE_PAUSED);
return res;
}
static SpaResult
@ -537,6 +602,8 @@ spa_v4l2_source_node_port_reuse_buffer (SpaNode *node,
uint32_t buffer_id)
{
SpaV4l2Source *this;
SpaV4l2State *state;
SpaResult res;
if (node == NULL || node->handle == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
@ -546,9 +613,17 @@ spa_v4l2_source_node_port_reuse_buffer (SpaNode *node,
if (port_id != 0)
return SPA_RESULT_INVALID_PORT;
spa_v4l2_buffer_recycle (this, buffer_id);
state = &this->state[port_id];
return SPA_RESULT_OK;
if (!state->have_buffers)
return SPA_RESULT_NO_BUFFERS;
if (buffer_id >= state->reqbuf.count)
return SPA_RESULT_INVALID_BUFFER_ID;
res = spa_v4l2_buffer_recycle (this, buffer_id);
return res;
}
static SpaResult
@ -710,6 +785,8 @@ v4l2_source_init (const SpaHandleFactory *factory,
this->state[0].export_buf = true;
this->node_state = SPA_NODE_STATE_INIT;
return SPA_RESULT_OK;
}

View file

@ -66,6 +66,57 @@ spa_v4l2_open (SpaV4l2Source *this)
return 0;
}
static SpaResult
spa_v4l2_buffer_recycle (SpaV4l2Source *this, uint32_t buffer_id)
{
SpaV4l2State *state = &this->state[0];
V4l2Buffer *b = &state->alloc_buffers[buffer_id];
if (!b->outstanding)
return SPA_RESULT_OK;
b->outstanding = false;
if (xioctl (state->fd, VIDIOC_QBUF, &b->v4l2_buffer) < 0) {
perror ("VIDIOC_QBUF");
}
return SPA_RESULT_OK;
}
static SpaResult
spa_v4l2_clear_buffers (SpaV4l2Source *this)
{
SpaV4l2State *state = &this->state[0];
int i;
if (!state->have_buffers)
return SPA_RESULT_OK;
for (i = 0; i < state->reqbuf.count; i++) {
V4l2Buffer *b;
SpaMemory *mem;
b = &state->alloc_buffers[i];
if (b->outstanding) {
fprintf (stderr, "queueing outstanding buffer %p\n", b);
spa_v4l2_buffer_recycle (this, i);
}
mem = spa_memory_find (&b->datas[0].mem.mem);
if (state->export_buf) {
close (mem->fd);
} else {
munmap (mem->ptr, mem->size);
}
spa_memory_unref (&mem->mem);
}
if (state->alloc_mem)
spa_memory_unref (&state->alloc_mem->mem);
state->have_buffers = false;
return SPA_RESULT_OK;
}
static int
spa_v4l2_close (SpaV4l2Source *this)
{
@ -74,6 +125,9 @@ spa_v4l2_close (SpaV4l2Source *this)
if (!state->opened)
return 0;
if (state->have_buffers)
return 0;
fprintf (stderr, "close\n");
if (close(state->fd))
perror ("close");
@ -491,66 +545,13 @@ v4l2_on_fd_events (SpaPollNotifyData *data)
return 0;
}
static void
spa_v4l2_buffer_recycle (SpaV4l2Source *this, uint32_t buffer_id)
{
SpaV4l2State *state = &this->state[0];
V4l2Buffer *b = &state->alloc_buffers[buffer_id];
if (!b->outstanding)
return;
b->outstanding = false;
if (xioctl (state->fd, VIDIOC_QBUF, &b->v4l2_buffer) < 0) {
perror ("VIDIOC_QBUF");
}
}
static void
clear_buffers (SpaV4l2Source *this)
{
SpaV4l2State *state = &this->state[0];
int i;
if (!state->have_buffers)
return;
for (i = 0; i < state->reqbuf.count; i++) {
V4l2Buffer *b;
SpaMemory *mem;
b = &state->alloc_buffers[i];
if (b->outstanding) {
fprintf (stderr, "queueing outstanding buffer %p\n", b);
spa_v4l2_buffer_recycle (this, i);
}
mem = spa_memory_find (&b->datas[0].mem.mem);
if (state->export_buf) {
close (mem->fd);
} else {
munmap (mem->ptr, mem->size);
}
spa_memory_unref (&mem->mem);
}
if (state->alloc_mem)
spa_memory_unref (&state->alloc_mem->mem);
state->have_buffers = false;
}
static SpaResult
spa_v4l2_import_buffers (SpaV4l2Source *this, SpaBuffer **buffers, uint32_t n_buffers)
spa_v4l2_use_buffers (SpaV4l2Source *this, SpaBuffer **buffers, uint32_t n_buffers)
{
SpaV4l2State *state = &this->state[0];
struct v4l2_requestbuffers reqbuf;
int i;
if (buffers == NULL) {
clear_buffers (this);
return SPA_RESULT_OK;
}
state->memtype = V4L2_MEMORY_USERPTR;
CLEAR(reqbuf);
@ -797,11 +798,11 @@ spa_v4l2_start (SpaV4l2Source *this)
enum v4l2_buf_type type;
SpaEvent event;
if (spa_v4l2_open (this) < 0)
type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (xioctl (state->fd, VIDIOC_STREAMON, &type) < 0) {
perror ("VIDIOC_STREAMON");
return SPA_RESULT_ERROR;
if (!state->have_buffers)
return SPA_RESULT_NO_BUFFERS;
}
event.type = SPA_EVENT_TYPE_ADD_POLL;
event.port_id = 0;
@ -821,39 +822,27 @@ spa_v4l2_start (SpaV4l2Source *this)
state->poll.user_data = this;
this->event_cb (&this->node, &event, this->user_data);
type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (xioctl (state->fd, VIDIOC_STREAMON, &type) < 0) {
perror ("VIDIOC_STREAMON");
return SPA_RESULT_ERROR;
}
return SPA_RESULT_OK;
}
static SpaResult
spa_v4l2_stop (SpaV4l2Source *this)
spa_v4l2_pause (SpaV4l2Source *this)
{
SpaV4l2State *state = &this->state[0];
enum v4l2_buf_type type;
SpaEvent event;
if (!state->opened)
return SPA_RESULT_OK;
type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (xioctl (state->fd, VIDIOC_STREAMOFF, &type) < 0) {
perror ("VIDIOC_STREAMOFF");
return SPA_RESULT_ERROR;
}
clear_buffers (this);
event.type = SPA_EVENT_TYPE_REMOVE_POLL;
event.port_id = 0;
event.data = &state->poll;
event.size = sizeof (state->poll);
this->event_cb (&this->node, &event, this->user_data);
spa_v4l2_close (this);
type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (xioctl (state->fd, VIDIOC_STREAMOFF, &type) < 0) {
perror ("VIDIOC_STREAMOFF");
return SPA_RESULT_ERROR;
}
return SPA_RESULT_OK;
}