Rework buffer API a little

Use BufferIter and BufferBuilder instead of Packet*
Make separate method to set the header so you can set it later and not
only at builder init time.
Add method to clear the builder if you want to abort.
Add method to add fd to builder instead of in _add_fd_payload. This
would make it easier to add multiple fd-payloads using data from the
same fd.
Pass PinosPacketFDPayload to add_fd_payload to make it symetric with the
parsing code. We should be able to get the size from the VERSION passed
when the builder was made.
Add ideas about releasing the fds back to the server.
This commit is contained in:
Wim Taymans 2015-08-26 12:46:28 +02:00
parent 4e92d32540
commit 0e03080002
8 changed files with 211 additions and 100 deletions

View file

@ -71,17 +71,26 @@ Types:
0: fd-payload section
Used to send a buffer between client and server.
<id> : 4 bytes : id of the fd-payload
<offset> : 8 bytes : offset
<size> : 8 bytes : size
<fd-index> : 4 bytes : index of fd
1: format change
1: release fd-payload
Release a fd-payload with <id>
<id> : 4 bytes : the id number of the released fd-payload
2: format change
<format-id> : 1 byte : format id
<format> : 0-terminated : contains serialized format
2: property changes
3: property changes
<key> : 0-terminated : key
<value> : 0-terminated : value

View file

@ -176,9 +176,9 @@ pinos_buffer_store (PinosBuffer *buffer,
}
/**
* PinosPacketIter:
* PinosBufferIter:
*
* #PinosPacketIter is an opaque data structure and can only be accessed
* #PinosBufferIter is an opaque data structure and can only be accessed
* using the following functions.
*/
struct stack_iter {
@ -194,7 +194,7 @@ struct stack_iter {
guint item;
};
G_STATIC_ASSERT (sizeof (struct stack_iter) <= sizeof (PinosPacketIter));
G_STATIC_ASSERT (sizeof (struct stack_iter) <= sizeof (PinosBufferIter));
#define PPSI(i) ((struct stack_iter *) (i))
#define PPSI_MAGIC ((gsize) 6739527471u)
@ -202,14 +202,14 @@ G_STATIC_ASSERT (sizeof (struct stack_iter) <= sizeof (PinosPacketIter));
PPSI(i)->magic == PPSI_MAGIC)
/**
* pinos_packet_iter_init:
* @iter: a #PinosPacketIter
* pinos_buffer_iter_init:
* @iter: a #PinosBufferIter
* @buffer: a #PinosBuffer
*
* Initialize @iter to iterate the packets in @buffer.
*/
void
pinos_packet_iter_init_full (PinosPacketIter *iter,
pinos_buffer_iter_init_full (PinosBufferIter *iter,
PinosBuffer *buffer,
guint32 version)
{
@ -255,15 +255,15 @@ read_length (guint8 * data, guint size, gsize * length, gsize * skip)
/**
* pinos_packet_iter_next:
* @iter: a #PinosPacketIter
* pinos_buffer_iter_next:
* @iter: a #PinosBufferIter
*
* Move to the next packet in @iter.
*
* Returns: %TRUE if more packets are available.
*/
gboolean
pinos_packet_iter_next (PinosPacketIter *iter)
pinos_buffer_iter_next (PinosBufferIter *iter)
{
struct stack_iter *si = PPSI (iter);
gsize len, size, skip;
@ -302,7 +302,7 @@ pinos_packet_iter_next (PinosPacketIter *iter)
}
PinosPacketType
pinos_packet_iter_get_type (PinosPacketIter *iter)
pinos_buffer_iter_get_type (PinosBufferIter *iter)
{
struct stack_iter *si = PPSI (iter);
@ -312,7 +312,7 @@ pinos_packet_iter_get_type (PinosPacketIter *iter)
}
gpointer
pinos_packet_iter_get_data (PinosPacketIter *iter, gsize *size)
pinos_buffer_iter_get_data (PinosBufferIter *iter, gsize *size)
{
struct stack_iter *si = PPSI (iter);
@ -326,7 +326,7 @@ pinos_packet_iter_get_data (PinosPacketIter *iter, gsize *size)
/**
* PinosPacketBuilder:
* PinosBufferBuilder:
* @buffer: owner #PinosBuffer
*/
struct stack_builder {
@ -338,10 +338,10 @@ struct stack_builder {
PinosPacketType type;
gsize offset;
guint n_sockets;
guint n_fds;
};
G_STATIC_ASSERT (sizeof (struct stack_builder) <= sizeof (PinosPacketBuilder));
G_STATIC_ASSERT (sizeof (struct stack_builder) <= sizeof (PinosBufferBuilder));
#define PPSB(b) ((struct stack_builder *) (b))
#define PPSB_MAGIC ((gsize) 8103647428u)
@ -349,10 +349,16 @@ G_STATIC_ASSERT (sizeof (struct stack_builder) <= sizeof (PinosPacketBuilder));
PPSB(b)->magic == PPSB_MAGIC)
/**
* pinos_buffer_builder_init_full:
* @builder: a #PinosBufferBuilder
* @version: a version
*
* Initialize a stack allocated @builder and set the @version.
*/
void
pinos_packet_builder_init_full (PinosPacketBuilder *builder,
guint32 version,
const PinosBufferHeader *header)
pinos_buffer_builder_init_full (PinosBufferBuilder *builder,
guint32 version)
{
struct stack_builder *sb = PPSB (builder);
PinosStackHeader *sh;
@ -367,15 +373,47 @@ pinos_packet_builder_init_full (PinosPacketBuilder *builder,
sh = sb->sh = sb->buf.data;
sh->version = version;
sh->header = *header;
sh->length = 0;
sb->type = 0;
sb->offset = 0;
sb->n_fds = 0;
}
/**
* pinos_buffer_builder_clear:
* @builder: a #PinosBufferBuilder
*
* Clear the memory used by @builder. This can be used to abort building the
* buffer.
*
* @builder becomes invalid after this function and can be reused with
* pinos_buffer_builder_init()
*/
void
pinos_packet_builder_end (PinosPacketBuilder *builder,
pinos_buffer_builder_clear (PinosBufferBuilder *builder)
{
struct stack_builder *sb = PPSB (builder);
g_return_if_fail (is_valid_builder (builder));
sb->magic = 0;
g_free (sb->buf.data);
}
/**
* pinos_buffer_builder_end:
* @builder: a #PinosBufferBuilder
* @buffer: a #PinosBuffer
*
* Ends the building process and fills @buffer with the constructed
* #PinosBuffer.
*
* @builder becomes invalid after this function and can be reused with
* pinos_buffer_builder_init()
*/
void
pinos_buffer_builder_end (PinosBufferBuilder *builder,
PinosBuffer *buffer)
{
struct stack_builder *sb = PPSB (builder);
@ -396,7 +434,58 @@ pinos_packet_builder_end (PinosPacketBuilder *builder,
sb->buf.size = 0;
sb->buf.allocated_size = 0;
sb->buf.message = NULL;
sb->buf.magic = 0;
sb->magic = 0;
}
/**
* pinos_buffer_builder_set_header:
* @builder: a #PinosBufferBuilder
* @header: a #PinosBufferHeader
*
* Set @header in @builder.
*/
void
pinos_buffer_builder_set_header (PinosBufferBuilder *builder,
const PinosBufferHeader *header)
{
struct stack_builder *sb = PPSB (builder);
g_return_if_fail (is_valid_builder (builder));
g_return_if_fail (header != NULL);
sb->sh->header = *header;
}
/**
* pinos_buffer_builder_add_fd:
* @builder: a #PinosBufferBuilder
* @fd: a valid fd
* @error: a #GError or %NULL
*
* Add the file descriptor @fd to @builder.
*
* Returns: the index of the file descriptor in @builder. The file descriptor
* is duplicated using dup(). You keep your copy of the descriptor and the copy
* contained in @buffer will be closed when @buffer is freed.
* -1 is returned on error and @error is set.
*/
gint
pinos_buffer_builder_add_fd (PinosBufferBuilder *builder,
int fd,
GError **error)
{
struct stack_builder *sb = PPSB (builder);
g_return_val_if_fail (is_valid_builder (builder), -1);
g_return_val_if_fail (fd > 0, -1);
if (sb->buf.message == NULL)
sb->buf.message = g_unix_fd_message_new ();
if (!g_unix_fd_message_append_fd ((GUnixFDMessage*)sb->buf.message, fd, error))
return -1;
return sb->n_fds++;
}
static gpointer
@ -437,15 +526,15 @@ builder_add_packet (struct stack_builder *sb, PinosPacketType type, gsize size)
/* fd-payload packets */
/**
* pinos_packet_iter_get_fd_payload:
* @iter: a #PinosPacketIter
* pinos_buffer_iter_get_fd_payload:
* @iter: a #PinosBufferIter
* @payload: a #PinosPacketFDPayload
*
* Get the #PinosPacketFDPayload. @iter must be positioned on a packet of
* type #PINOS_PACKET_TYPE_FD_PAYLOAD
*/
void
pinos_packet_iter_parse_fd_payload (PinosPacketIter *iter,
pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter,
PinosPacketFDPayload *payload)
{
struct stack_iter *si = PPSI (iter);
@ -457,42 +546,26 @@ pinos_packet_iter_parse_fd_payload (PinosPacketIter *iter,
}
/**
* pinos_packet_builder_add_fd_payload:
* @builder: a #PinosPacketBuilder
* @offset: an offset
* @size: a size
* @fd: a file descriptor
* @error: a #GError or %NULL
* pinos_buffer_builder_add_fd_payload:
* @builder: a #PinosBufferBuilder
* @payload: a #PinosPacketFDPayload
*
* Add a #PINOS_PACKET_TYPE_FD_PAYLOAD to @builder.
* Add a #PINOS_PACKET_TYPE_FD_PAYLOAD to @builder with data from @payload.
*
* Returns: %TRUE on success. When %FALSE is returned, @error contains more
* information.
* Returns: %TRUE on success.
*/
gboolean
pinos_packet_builder_add_fd_payload (PinosPacketBuilder *builder,
gint64 offset, gint64 size, int fd,
GError **error)
pinos_buffer_builder_add_fd_payload (PinosBufferBuilder *builder,
PinosPacketFDPayload *payload)
{
struct stack_builder *sb = PPSB (builder);
PinosPacketFDPayload *p;
g_return_if_fail (is_valid_builder (builder));
g_return_if_fail (size > 0);
g_return_if_fail (offset >= 0);
g_return_if_fail (fd != -1);
if (sb->buf.message == NULL) {
sb->buf.message = g_unix_fd_message_new ();
sb->n_sockets = 0;
}
if (!g_unix_fd_message_append_fd ((GUnixFDMessage*)sb->buf.message, fd, error))
return FALSE;
g_return_val_if_fail (is_valid_builder (builder), FALSE);
g_return_val_if_fail (payload->size > 0, FALSE);
p = builder_add_packet (sb, PINOS_PACKET_TYPE_FD_PAYLOAD, sizeof (PinosPacketFDPayload));
p->offset = offset;
p->size = size;
p->fd_index = sb->n_sockets++;
*p = *payload;
return TRUE;
}

View file

@ -27,8 +27,8 @@ G_BEGIN_DECLS
typedef struct _PinosBuffer PinosBuffer;
typedef struct _PinosBufferInfo PinosBufferInfo;
typedef struct _PinosPacketIter PinosPacketIter;
typedef struct _PinosPacketBuilder PinosPacketBuilder;
typedef struct _PinosBufferIter PinosBufferIter;
typedef struct _PinosBufferBuilder PinosBufferBuilder;
#define PINOS_BUFFER_VERSION 0
@ -75,40 +75,46 @@ typedef enum {
/* iterating packets */
struct _PinosPacketIter {
struct _PinosBufferIter {
/*< private >*/
gsize x[16];
};
void pinos_packet_iter_init_full (PinosPacketIter *iter,
void pinos_buffer_iter_init_full (PinosBufferIter *iter,
PinosBuffer *buffer,
guint32 version);
#define pinos_packet_iter_init(i,b) pinos_packet_iter_init_full(i,b, PINOS_BUFFER_VERSION);
#define pinos_buffer_iter_init(i,b) pinos_buffer_iter_init_full(i,b, PINOS_BUFFER_VERSION);
gboolean pinos_packet_iter_next (PinosPacketIter *iter);
gboolean pinos_buffer_iter_next (PinosBufferIter *iter);
PinosPacketType pinos_packet_iter_get_type (PinosPacketIter *iter);
gpointer pinos_packet_iter_get_data (PinosPacketIter *iter, gsize *size);
PinosPacketType pinos_buffer_iter_get_type (PinosBufferIter *iter);
gpointer pinos_buffer_iter_get_data (PinosBufferIter *iter, gsize *size);
/**
* PinosPacketBuilder:
* PinosBufferBuilder:
*/
struct _PinosPacketBuilder {
struct _PinosBufferBuilder {
/*< private >*/
gsize x[16];
};
void pinos_packet_builder_init_full (PinosPacketBuilder *builder,
guint32 version,
const PinosBufferHeader *header);
#define pinos_packet_builder_init(b,h) pinos_packet_builder_init_full(b, PINOS_BUFFER_VERSION,h);
void pinos_buffer_builder_init_full (PinosBufferBuilder *builder,
guint32 version);
#define pinos_buffer_builder_init(b) pinos_buffer_builder_init_full(b, PINOS_BUFFER_VERSION);
void pinos_packet_builder_end (PinosPacketBuilder *builder,
PinosBuffer *buffer);
void pinos_buffer_builder_clear (PinosBufferBuilder *builder);
void pinos_buffer_builder_end (PinosBufferBuilder *builder,
PinosBuffer *buffer);
void pinos_buffer_builder_set_header (PinosBufferBuilder *builder,
const PinosBufferHeader *header);
gint pinos_buffer_builder_add_fd (PinosBufferBuilder *builder,
int fd,
GError **error);
/* fd-payload packets */
/**
* PinosPacketFDPayload:
* @id: the unique id of this payload
* @fd_index: the index of the fd with the data
* @offset: the offset of the data
* @size: the size of the data
@ -117,16 +123,16 @@ void pinos_packet_builder_end (PinosPacketBuilder *builder,
* @size.
*/
typedef struct {
guint32 id;
gint32 fd_index;
gint64 offset;
gint64 size;
guint64 offset;
guint64 size;
} PinosPacketFDPayload;
void pinos_packet_iter_parse_fd_payload (PinosPacketIter *iter,
void pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter,
PinosPacketFDPayload *payload);
gboolean pinos_buffer_builder_add_fd_payload (PinosBufferBuilder *builder,
PinosPacketFDPayload *payload);
gboolean pinos_packet_builder_add_fd_payload (PinosPacketBuilder *builder,
gint64 offset, gint64 size, int fd,
GError **error);
#endif /* __PINOS_BUFFER_H__ */

View file

@ -1210,7 +1210,7 @@ pinos_stream_provide_buffer (PinosStream *stream,
if (len == -1)
goto send_error;
g_assert (len == sb->size);
g_assert (len == (gssize) sb->size);
return TRUE;

View file

@ -169,7 +169,7 @@ gst_fddepay_transform_ip (GstBaseTransform * trans, GstBuffer * buf)
{
GstFddepay *fddepay = GST_FDDEPAY (trans);
PinosBuffer pbuf;
PinosPacketIter it;
PinosBufferIter it;
GstNetControlMessageMeta * meta;
GSocketControlMessage *msg = NULL;
const PinosBufferHeader *hdr;
@ -194,16 +194,16 @@ gst_fddepay_transform_ip (GstBaseTransform * trans, GstBuffer * buf)
gst_buffer_remove_all_memory (buf);
pinos_packet_iter_init (&it, &pbuf);
while (pinos_packet_iter_next (&it)) {
switch (pinos_packet_iter_get_type (&it)) {
pinos_buffer_iter_init (&it, &pbuf);
while (pinos_buffer_iter_next (&it)) {
switch (pinos_buffer_iter_get_type (&it)) {
case PINOS_PACKET_TYPE_FD_PAYLOAD:
{
GstMemory *fdmem = NULL;
PinosPacketFDPayload p;
int fd;
pinos_packet_iter_parse_fd_payload (&it, &p);
pinos_buffer_iter_parse_fd_payload (&it, &p);
fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &err);
if (fd == -1)
goto error;

View file

@ -242,8 +242,9 @@ gst_fdpay_transform (GstBaseTransform * trans, GstBuffer * inbuf,
GstMapInfo info;
GError *err = NULL;
PinosBuffer pbuf;
PinosPacketBuilder builder;
PinosBufferBuilder builder;
PinosBufferHeader hdr;
PinosPacketFDPayload p;
gsize size;
GST_DEBUG_OBJECT (fdpay, "transform_ip");
@ -255,15 +256,18 @@ gst_fdpay_transform (GstBaseTransform * trans, GstBuffer * inbuf,
hdr.pts = GST_BUFFER_TIMESTAMP (inbuf) + GST_ELEMENT_CAST (trans)->base_time;
hdr.dts_offset = 0;
pinos_packet_builder_init (&builder, &hdr);
if (!pinos_packet_builder_add_fd_payload (&builder,
fdmem->offset,
fdmem->size,
gst_fd_memory_get_fd (fdmem),
&err))
goto append_fd_failed;
pinos_buffer_builder_init (&builder);
pinos_buffer_builder_set_header (&builder, &hdr);
pinos_packet_builder_end (&builder, &pbuf);
p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem), &err);
if (p.fd_index == -1)
goto add_fd_failed;
p.id = 0;
p.offset = fdmem->offset;
p.size = fdmem->size;
pinos_buffer_builder_add_fd_payload (&builder, &p);
pinos_buffer_builder_end (&builder, &pbuf);
gst_memory_unref(fdmem);
fdmem = NULL;
@ -282,9 +286,9 @@ gst_fdpay_transform (GstBaseTransform * trans, GstBuffer * inbuf,
return GST_FLOW_OK;
/* ERRORS */
append_fd_failed:
add_fd_failed:
{
GST_WARNING_OBJECT (trans, "Appending fd failed: %s", err->message);
GST_WARNING_OBJECT (trans, "Adding fd failed: %s", err->message);
gst_memory_unref(fdmem);
g_clear_error (&err);

View file

@ -330,11 +330,13 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
{
GstPinosSink *pinossink;
PinosBuffer pbuf;
PinosPacketBuilder builder;
PinosBufferBuilder builder;
GstMemory *mem = NULL;
GstClockTime pts, dts, base;
PinosBufferHeader hdr;
PinosPacketFDPayload p;
gsize size;
GError *err = NULL;
pinossink = GST_PINOS_SINK (bsink);
@ -357,8 +359,6 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
size = gst_buffer_get_size (buffer);
pinos_packet_builder_init (&builder, &hdr);
if (gst_buffer_n_memory (buffer) == 1
&& gst_is_fd_memory (gst_buffer_peek_memory (buffer, 0))) {
mem = gst_buffer_get_memory (buffer, 0);
@ -375,10 +375,20 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
gst_memory_unmap (mem, &minfo);
}
pinos_packet_builder_add_fd_payload (&builder, 0, size, gst_fd_memory_get_fd (mem), NULL);
pinos_buffer_builder_init (&builder);
pinos_buffer_builder_set_header (&builder, &hdr);
p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (mem), &err);
if (p.fd_index == -1)
goto add_fd_failed;
p.id = 0;
p.offset = 0;
p.size = size;
pinos_buffer_builder_add_fd_payload (&builder, &p);
gst_memory_unref (mem);
pinos_packet_builder_end (&builder, &pbuf);
pinos_buffer_builder_end (&builder, &pbuf);
pinos_main_loop_lock (pinossink->loop);
if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING)
@ -394,6 +404,15 @@ not_negotiated:
}
map_error:
{
GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED,
("failed to map buffer"), (NULL));
return GST_FLOW_ERROR;
}
add_fd_failed:
{
GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED,
("failed to add fd: %s", err->message), (NULL));
pinos_buffer_builder_clear (&builder);
return GST_FLOW_ERROR;
}
streaming_error:

View file

@ -253,7 +253,7 @@ on_new_buffer (GObject *gobject,
GstPinosSrc *pinossrc = user_data;
PinosBuffer pbuf;
const PinosBufferHeader *hdr;
PinosPacketIter it;
PinosBufferIter it;
GstBuffer *buf;
GError *error = NULL;
@ -276,9 +276,9 @@ on_new_buffer (GObject *gobject,
}
GST_BUFFER_OFFSET (buf) = hdr->seq;
pinos_packet_iter_init (&it, &pbuf);
while (pinos_packet_iter_next (&it)) {
switch (pinos_packet_iter_get_type (&it)) {
pinos_buffer_iter_init (&it, &pbuf);
while (pinos_buffer_iter_next (&it)) {
switch (pinos_buffer_iter_get_type (&it)) {
case PINOS_PACKET_TYPE_FD_PAYLOAD:
{
GstMemory *fdmem = NULL;
@ -286,7 +286,7 @@ on_new_buffer (GObject *gobject,
int fd;
GST_DEBUG ("got fd payload");
pinos_packet_iter_parse_fd_payload (&it, &p);
pinos_buffer_iter_parse_fd_payload (&it, &p);
fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &error);
if (fd == -1)
goto no_fds;