From 8d1ad2ea63cbc1c424461f32a53c8b02391b86e7 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 31 Aug 2015 16:47:32 +0200 Subject: [PATCH] More work on wire protocol Make separate payload for the header. Make release-fd payloads capture_buffer -> peek_buffer to avoid a copy remove release-buffer, we really need to release each fd in the buffer separately. provide_buffer -> send_buffer so that we can also use this to send the release-fd messages. in pinossrc, send back release-fd messages when the fd is no longer in use. --- doc/design.txt | 39 +++++++----- src/client/buffer.c | 136 +++++++++++++++++++++++++++++++--------- src/client/buffer.h | 117 ++++++++++++++++++++++++++++------ src/client/private.h | 1 - src/client/stream.c | 78 ++++++++--------------- src/client/stream.h | 9 +-- src/gst/gstpinosdepay.c | 24 +++++-- src/gst/gstpinospay.c | 4 +- src/gst/gstpinossink.c | 6 +- src/gst/gstpinossrc.c | 92 ++++++++++++++++++++------- 10 files changed, 351 insertions(+), 155 deletions(-) diff --git a/doc/design.txt b/doc/design.txt index 242320d31..682e3b847 100644 --- a/doc/design.txt +++ b/doc/design.txt @@ -55,11 +55,8 @@ Wire Fixed header - : 4 bytes : buffer flags - : 4 bytes : sequence number - : 8 bytes : presentation time - : 8 bytes : dts-offset - : 8 bytes : total message length + : 4 bytes : total message length + : 4 bytes : total message length Followed by 1 or more type-length-data sections @@ -69,28 +66,36 @@ Followed by 1 or more type-length-data sections Types: - 0: fd-payload section + 1: header + + Header for payload + + : 4 bytes : buffer flags + : 4 bytes : sequence number + : 8 bytes : presentation time + : 8 bytes : dts-offset + + 2: fd-payload section Used to send a buffer between client and server. - : 4 bytes : id of the fd-payload - : 8 bytes : offset - : 8 bytes : size - : 4 bytes : index of fd + : 4 bytes : id of the fd-payload + : 8 bytes : offset + : 8 bytes : size + : 4 bytes : index of fd - 1: release fd-payload + 3: release fd-payload Release a fd-payload with - : 4 bytes : the id number of the released fd-payload + : 4 bytes : the id number of the released fd-payload - 2: format change + 4: format change - : 1 byte : format id - : 0-terminated : contains serialized format + : 1 byte : format id + : 0-terminated : contains serialized format - - 3: property changes + 5: property changes : 0-terminated : key : 0-terminated : value diff --git a/src/client/buffer.c b/src/client/buffer.c index 76a43b662..b27568a84 100644 --- a/src/client/buffer.c +++ b/src/client/buffer.c @@ -72,8 +72,16 @@ pinos_buffer_clear (PinosBuffer *buffer) g_clear_object (&sb->message); } -const PinosBufferHeader * -pinos_buffer_get_header (PinosBuffer *buffer, guint32 *version) +/** + * pinos_buffer_get_version + * @buffer: a #PinosBuffer + * + * Get the buffer version + * + * Returns: the buffer version. + */ +guint32 +pinos_buffer_get_version (PinosBuffer *buffer) { PinosStackBuffer *sb = PSB (buffer); PinosStackHeader *hdr; @@ -82,10 +90,7 @@ pinos_buffer_get_header (PinosBuffer *buffer, guint32 *version) hdr = sb->data; - if (version) - *version = hdr->version; - - return (const PinosBufferHeader *) &hdr->header; + return hdr->version; } /** @@ -134,7 +139,7 @@ not_found: } /** - * pinos_buffer_steal_data: + * pinos_buffer_steal: * @buffer: a #PinosBuffer * @size: output size * @message: output #GSocketControlMessage @@ -431,25 +436,6 @@ pinos_buffer_builder_end (PinosBufferBuilder *builder, sb->magic = 0; } -/** - * pinos_buffer_builder_set_header: - * @builder: a #PinosBufferBuilder - * @header: a #PinosBufferHeader - * - * Set @header in @builder. - */ -void -pinos_buffer_builder_set_header (PinosBufferBuilder *builder, - const PinosBufferHeader *header) -{ - struct stack_builder *sb = PPSB (builder); - - g_return_if_fail (is_valid_builder (builder)); - g_return_if_fail (header != NULL); - - sb->sh->header = *header; -} - /** * pinos_buffer_builder_add_fd: * @builder: a #PinosBufferBuilder @@ -518,6 +504,58 @@ builder_add_packet (struct stack_builder *sb, PinosPacketType type, gsize size) return p; } +/* header packets */ +/** + * pinos_buffer_iter_get_header: + * @iter: a #PinosBufferIter + * @header: a #PinosPacketHeader + * + * Get the #PinosPacketHeader. @iter must be positioned on a packet of + * type #PINOS_PACKET_TYPE_HEADER + * + * Returns: %TRUE if @header contains valid data. + */ +gboolean +pinos_buffer_iter_parse_header (PinosBufferIter *iter, + PinosPacketHeader *header) +{ + struct stack_iter *si = PPSI (iter); + + g_return_val_if_fail (is_valid_iter (iter), FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_HEADER, FALSE); + + if (si->size < sizeof (PinosPacketHeader)) + return FALSE; + + *header = *((PinosPacketHeader *) si->data); + + return TRUE; +} + +/** + * pinos_buffer_builder_add_header: + * @builder: a #PinosBufferBuilder + * @header: a #PinosPacketHeader + * + * Add a #PINOS_PACKET_TYPE_HEADER to @builder with data from @header. + * + * Returns: %TRUE on success. + */ +gboolean +pinos_buffer_builder_add_header (PinosBufferBuilder *builder, + PinosPacketHeader *header) +{ + struct stack_builder *sb = PPSB (builder); + PinosPacketHeader *h; + + g_return_val_if_fail (is_valid_builder (builder), FALSE); + + h = builder_add_packet (sb, PINOS_PACKET_TYPE_HEADER, sizeof (PinosPacketHeader)); + *h = *header; + + return TRUE; +} + /* fd-payload packets */ /** * pinos_buffer_iter_get_fd_payload: @@ -526,17 +564,24 @@ builder_add_packet (struct stack_builder *sb, PinosPacketType type, gsize size) * * Get the #PinosPacketFDPayload. @iter must be positioned on a packet of * type #PINOS_PACKET_TYPE_FD_PAYLOAD + * + * Returns: %TRUE if @payload contains valid data. */ -void +gboolean pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter, PinosPacketFDPayload *payload) { struct stack_iter *si = PPSI (iter); - g_return_if_fail (is_valid_iter (iter)); - g_return_if_fail (si->type == PINOS_PACKET_TYPE_FD_PAYLOAD); + g_return_val_if_fail (is_valid_iter (iter), FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_FD_PAYLOAD, FALSE); + + if (si->size < sizeof (PinosPacketFDPayload)) + return FALSE; *payload = *((PinosPacketFDPayload *) si->data); + + return TRUE; } /** @@ -564,3 +609,36 @@ pinos_buffer_builder_add_fd_payload (PinosBufferBuilder *builder, return TRUE; } +gboolean +pinos_buffer_iter_parse_release_fd_payload (PinosBufferIter *iter, + PinosPacketReleaseFDPayload *payload) +{ + struct stack_iter *si = PPSI (iter); + + g_return_val_if_fail (is_valid_iter (iter), FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD, FALSE); + + if (si->size < sizeof (PinosPacketReleaseFDPayload)) + return FALSE; + + *payload = *((PinosPacketReleaseFDPayload *) si->data); + + return TRUE; +} + +gboolean +pinos_buffer_builder_add_release_fd_payload (PinosBufferBuilder *builder, + PinosPacketReleaseFDPayload *payload) +{ + struct stack_builder *sb = PPSB (builder); + PinosPacketReleaseFDPayload *p; + + g_return_val_if_fail (is_valid_builder (builder), FALSE); + + p = builder_add_packet (sb, + PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD, + sizeof (PinosPacketReleaseFDPayload)); + *p = *payload; + + return TRUE; +} diff --git a/src/client/buffer.h b/src/client/buffer.h index b980d3cc7..167ac2ee5 100644 --- a/src/client/buffer.h +++ b/src/client/buffer.h @@ -26,19 +26,11 @@ G_BEGIN_DECLS typedef struct _PinosBuffer PinosBuffer; -typedef struct _PinosBufferInfo PinosBufferInfo; typedef struct _PinosBufferIter PinosBufferIter; typedef struct _PinosBufferBuilder PinosBufferBuilder; #define PINOS_BUFFER_VERSION 0 -typedef struct { - guint32 flags; - guint32 seq; - gint64 pts; - gint64 dts_offset; -} PinosBufferHeader; - struct _PinosBuffer { /*< private >*/ gsize x[16]; @@ -51,9 +43,7 @@ void pinos_buffer_init_data (PinosBuffer *buffer, void pinos_buffer_clear (PinosBuffer *buffer); -const PinosBufferHeader * - pinos_buffer_get_header (PinosBuffer *buffer, - guint32 *version); +guint32 pinos_buffer_get_version (PinosBuffer *buffer); int pinos_buffer_get_fd (PinosBuffer *buffer, gint index, GError **error); @@ -63,12 +53,27 @@ gpointer pinos_buffer_steal (PinosBuffer *buffer, GSocketControlMessage **message); +/** + * PinosPacketType: + * @PINOS_PACKET_TYPE_INVALID: invalid packet type, ignore + * @PINOS_PACKET_TYPE_HEADER: common packet header + * @PINOS_PACKET_TYPE_FD_PAYLOAD: packet contains fd-payload. An fd-payload contains + * the media data as a file descriptor + * @PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD: packet contains release fd-payload. Notifies + * that a previously received fd-payload is no longer in use. + * @PINOS_PACKET_TYPE_FORMAT_CHANGE: a format change. + * @PINOS_PACKET_TYPE_PROPERTY_CHANGE: one or more property changes. + * + * The possible packet types. + */ typedef enum { - PINOS_PACKET_TYPE_INVALID = 0, + PINOS_PACKET_TYPE_INVALID = 0, - PINOS_PACKET_TYPE_FD_PAYLOAD = 1, - PINOS_PACKET_TYPE_FORMAT_CHANGE = 2, - PINOS_PACKET_TYPE_PROPERTY_CHANGE = 3, + PINOS_PACKET_TYPE_HEADER = 1, + PINOS_PACKET_TYPE_FD_PAYLOAD = 2, + PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD = 3, + PINOS_PACKET_TYPE_FORMAT_CHANGE = 4, + PINOS_PACKET_TYPE_PROPERTY_CHANGE = 5, } PinosPacketType; @@ -104,11 +109,31 @@ void pinos_buffer_builder_clear (PinosBufferBuilder *builder) void pinos_buffer_builder_end (PinosBufferBuilder *builder, PinosBuffer *buffer); -void pinos_buffer_builder_set_header (PinosBufferBuilder *builder, - const PinosBufferHeader *header); gint pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, int fd, GError **error); +/* header packets */ +/** + * PinosPacketHeader + * @flags: header flags + * @seq: sequence number + * @pts: presentation timestamp in nanoseconds + * @dts_offset: offset to presentation timestamp in nanoseconds to get decode timestamp + * + * A Packet that contains the header. + */ +typedef struct { + guint32 flags; + guint32 seq; + gint64 pts; + gint64 dts_offset; +} PinosPacketHeader; + +gboolean pinos_buffer_iter_parse_header (PinosBufferIter *iter, + PinosPacketHeader *header); +gboolean pinos_buffer_builder_add_header (PinosBufferBuilder *builder, + PinosPacketHeader *header); + /* fd-payload packets */ /** * PinosPacketFDPayload: @@ -127,10 +152,66 @@ typedef struct { guint64 size; } PinosPacketFDPayload; -void pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter, +gboolean pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter, PinosPacketFDPayload *payload); gboolean pinos_buffer_builder_add_fd_payload (PinosBufferBuilder *builder, PinosPacketFDPayload *payload); +/* release fd-payload packets */ +/** + * PinosPacketReleaseFDPayload: + * @id: the unique id of the fd-payload to release + * + * Release the payload with @id + */ +typedef struct { + guint32 id; +} PinosPacketReleaseFDPayload; + +gboolean pinos_buffer_iter_parse_release_fd_payload (PinosBufferIter *iter, + PinosPacketReleaseFDPayload *payload); +gboolean pinos_buffer_builder_add_release_fd_payload (PinosBufferBuilder *builder, + PinosPacketReleaseFDPayload *payload); + + +/* format change packets */ +/** + * PinosPacketFormatChange: + * @id: the id of the new format + * @format: the new format + * + * A new format. + */ +typedef struct { + guint8 id; + gchar *format; +} PinosPacketFormatChange; + +gboolean pinos_buffer_iter_parse_format_change (PinosBufferIter *iter, + PinosPacketFormatChange *payload); +gboolean pinos_buffer_builder_add_format_change (PinosBufferBuilder *builder, + PinosPacketFormatChange *payload); + + +/* property change packets */ +/** + * PinosPacketPropertyChange: + * @key: the key of the property + * @value: the new value + * + * A new property change. + */ +typedef struct { + gchar *key; + gchar *value; +} PinosPacketPropertyChange; + +gboolean pinos_buffer_iter_parse_property_change (PinosBufferIter *iter, + guint idx, + PinosPacketPropertyChange *payload); +gboolean pinos_buffer_builder_add_property_change (PinosBufferBuilder *builder, + PinosPacketPropertyChange *payload); + + #endif /* __PINOS_BUFFER_H__ */ diff --git a/src/client/private.h b/src/client/private.h index 43424e843..3d23f97ba 100644 --- a/src/client/private.h +++ b/src/client/private.h @@ -57,7 +57,6 @@ GDBusProxy * pinos_subscribe_get_proxy_finish (PinosSubscribe *subsc typedef struct { - PinosBufferHeader header; guint32 version; guint32 length; } PinosStackHeader; diff --git a/src/client/stream.c b/src/client/stream.c index 907e780ef..f7e453a1b 100644 --- a/src/client/stream.c +++ b/src/client/stream.c @@ -856,6 +856,10 @@ on_socket_condition (GSocket *socket, priv->buffer.magic = PSB_MAGIC; g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL); + + priv->buffer.magic = 0; + priv->buffer.size = 0; + g_clear_object (&priv->buffer.message); break; } case G_IO_OUT: @@ -1131,18 +1135,18 @@ pinos_stream_stop (PinosStream *stream) } /** - * pinos_stream_capture_buffer: + * pinos_stream_peek_buffer: * @stream: a #PinosStream * @buffer: a #PinosBuffer * - * Capture the next buffer from @stream. This function should be called every - * time after the new-buffer callback has been emitted. + * Peek the next buffer from @stream. This function should be called from + * the new-buffer signal callback. * * Returns: %TRUE when @buffer contains valid information */ gboolean -pinos_stream_capture_buffer (PinosStream *stream, - PinosBuffer *buffer) +pinos_stream_peek_buffer (PinosStream *stream, + PinosBuffer **buffer) { PinosStreamPrivate *priv; @@ -1153,65 +1157,29 @@ pinos_stream_capture_buffer (PinosStream *stream, g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE); g_return_val_if_fail (is_valid_buffer (&priv->buffer), FALSE); - memcpy (buffer, &priv->buffer, sizeof (PinosStackBuffer)); - - priv->buffer.data = NULL; - priv->buffer.allocated_size = 0; - priv->buffer.size = 0; - priv->buffer.message = NULL; - priv->buffer.magic = 0; + *buffer = (PinosBuffer *) &priv->buffer; return TRUE; } /** - * pinos_stream_release_buffer: + * pinos_stream_send_buffer: * @stream: a #PinosStream * @buffer: a #PinosBuffer * - * Release @buffer back to @stream. This function should be called whenever the - * buffer is processed. @buffer should not be used anymore after calling this - * function. - */ -void -pinos_stream_release_buffer (PinosStream *stream, - PinosBuffer *buffer) -{ - PinosStackBuffer *sb = (PinosStackBuffer *) buffer; - PinosStreamPrivate *priv; - - g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); - g_return_val_if_fail (is_valid_buffer (buffer), FALSE); - - priv = stream->priv; - - if (priv->buffer.data == NULL) { - priv->buffer.data = sb->data; - priv->buffer.allocated_size = sb->allocated_size; - priv->buffer.size = 0; - } - else - g_free (sb->data); - - if (sb->message) - g_object_unref (sb->message); - - sb->magic = 0; -} - -/** - * pinos_stream_provide_buffer: - * @stream: a #PinosStream - * @buffer: a #PinosBuffer + * Send a buffer to @stream. * - * Provide the next buffer from @stream. This function should be called every - * time a new frame becomes available. + * For provider streams, this function should be called whenever there is a new frame + * available. + * + * For capture streams, this functions should be called for each fd-payload that + * should be released. * * Returns: %TRUE when @buffer was handled */ gboolean -pinos_stream_provide_buffer (PinosStream *stream, - PinosBuffer *buffer) +pinos_stream_send_buffer (PinosStream *stream, + PinosBuffer *buffer) { PinosStreamPrivate *priv; gssize len; @@ -1219,6 +1187,7 @@ pinos_stream_provide_buffer (PinosStream *stream, GOutputVector ovec[1]; gint flags = 0; GError *error = NULL; + gint n_msg; g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); g_return_val_if_fail (buffer != NULL, FALSE); @@ -1229,12 +1198,17 @@ pinos_stream_provide_buffer (PinosStream *stream, ovec[0].buffer = sb->data; ovec[0].size = sb->size; + if (sb->message) + n_msg = 1; + else + n_msg = 0; + len = g_socket_send_message (priv->socket, NULL, ovec, 1, &sb->message, - 1, + n_msg, flags, NULL, &error); diff --git a/src/client/stream.h b/src/client/stream.h index 37a31808b..b5ee277ad 100644 --- a/src/client/stream.h +++ b/src/client/stream.h @@ -104,12 +104,9 @@ gboolean pinos_stream_start (PinosStream *stream, PinosStreamMode mode); gboolean pinos_stream_stop (PinosStream *stream); -gboolean pinos_stream_capture_buffer (PinosStream *stream, - PinosBuffer *buffer); -void pinos_stream_release_buffer (PinosStream *stream, - PinosBuffer *buffer); - -gboolean pinos_stream_provide_buffer (PinosStream *stream, +gboolean pinos_stream_peek_buffer (PinosStream *stream, + PinosBuffer **buffer); +gboolean pinos_stream_send_buffer (PinosStream *stream, PinosBuffer *buffer); G_END_DECLS diff --git a/src/gst/gstpinosdepay.c b/src/gst/gstpinosdepay.c index 757df8735..41ed8739b 100644 --- a/src/gst/gstpinosdepay.c +++ b/src/gst/gstpinosdepay.c @@ -86,17 +86,21 @@ gst_pinos_depay_chain (GstPad *pad, GstObject * parent, GstBuffer * buffer) PinosBufferIter it; GstNetControlMessageMeta * meta; GSocketControlMessage *msg = NULL; - const PinosBufferHeader *hdr; GError *err = NULL; meta = ((GstNetControlMessageMeta*) gst_buffer_get_meta ( buffer, GST_NET_CONTROL_MESSAGE_META_API_TYPE)); if (meta) { - msg = meta->message; + msg = g_object_ref (meta->message); gst_buffer_remove_meta (buffer, (GstMeta *) meta); meta = NULL; } + if (msg == NULL) { + gst_buffer_unref (buffer); + return GST_FLOW_OK; + } + outbuf = gst_buffer_new (); gst_buffer_map (buffer, &info, GST_MAP_READ); @@ -105,13 +109,24 @@ gst_pinos_depay_chain (GstPad *pad, GstObject * parent, GstBuffer * buffer) pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { + case PINOS_PACKET_TYPE_HEADER: + { + PinosPacketHeader hdr; + + if (!pinos_buffer_iter_parse_header (&it, &hdr)) + goto error; + + GST_BUFFER_OFFSET (outbuf) = hdr.seq; + break; + } case PINOS_PACKET_TYPE_FD_PAYLOAD: { GstMemory *fdmem = NULL; PinosPacketFDPayload p; int fd; - pinos_buffer_iter_parse_fd_payload (&it, &p); + if (!pinos_buffer_iter_parse_fd_payload (&it, &p)) + goto error; fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &err); if (fd == -1) goto error; @@ -126,8 +141,6 @@ gst_pinos_depay_chain (GstPad *pad, GstObject * parent, GstBuffer * buffer) break; } } - hdr = pinos_buffer_get_header (&pbuf, NULL); - GST_BUFFER_OFFSET (buffer) = hdr->seq; pinos_buffer_clear (&pbuf); gst_buffer_unmap (buffer, &info); gst_buffer_unref (buffer); @@ -139,6 +152,7 @@ error: GST_ELEMENT_ERROR (depay, RESOURCE, SETTINGS, (NULL), ("can't get fd: %s", err->message)); g_clear_error (&err); + gst_buffer_unref (outbuf); return GST_FLOW_ERROR; } } diff --git a/src/gst/gstpinospay.c b/src/gst/gstpinospay.c index 9c016d548..c84e74290 100644 --- a/src/gst/gstpinospay.c +++ b/src/gst/gstpinospay.c @@ -155,7 +155,7 @@ gst_pinos_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GstBuffer *outbuf; PinosBuffer pbuf; PinosBufferBuilder builder; - PinosBufferHeader hdr; + PinosPacketHeader hdr; PinosPacketFDPayload p; gsize size; gpointer data; @@ -167,7 +167,7 @@ gst_pinos_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) hdr.dts_offset = 0; pinos_buffer_builder_init (&builder); - pinos_buffer_builder_set_header (&builder, &hdr); + pinos_buffer_builder_add_header (&builder, &hdr); fdmem = gst_pinos_pay_get_fd_memory (pay, buffer); p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem), &err); diff --git a/src/gst/gstpinossink.c b/src/gst/gstpinossink.c index a36e92c75..90a0b8db8 100644 --- a/src/gst/gstpinossink.c +++ b/src/gst/gstpinossink.c @@ -333,7 +333,7 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) PinosBufferBuilder builder; GstMemory *mem = NULL; GstClockTime pts, dts, base; - PinosBufferHeader hdr; + PinosPacketHeader hdr; PinosPacketFDPayload p; gsize size; GError *err = NULL; @@ -376,7 +376,7 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) } pinos_buffer_builder_init (&builder); - pinos_buffer_builder_set_header (&builder, &hdr); + pinos_buffer_builder_add_header (&builder, &hdr); p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (mem), &err); if (p.fd_index == -1) @@ -393,7 +393,7 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) pinos_main_loop_lock (pinossink->loop); if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING) goto streaming_error; - pinos_stream_provide_buffer (pinossink->stream, &pbuf); + pinos_stream_send_buffer (pinossink->stream, &pbuf); pinos_buffer_clear (&pbuf); pinos_main_loop_unlock (pinossink->loop); diff --git a/src/gst/gstpinossrc.c b/src/gst/gstpinossrc.c index 0cc24ec80..936816ec5 100644 --- a/src/gst/gstpinossrc.c +++ b/src/gst/gstpinossrc.c @@ -43,6 +43,7 @@ #include +static GQuark fdpayload_data_quark; GST_DEBUG_CATEGORY_STATIC (pinos_src_debug); #define GST_CAT_DEFAULT pinos_src_debug @@ -190,6 +191,8 @@ gst_pinos_src_class_init (GstPinosSrcClass * klass) GST_DEBUG_CATEGORY_INIT (pinos_src_debug, "pinossrc", 0, "Pinos Source"); + + fdpayload_data_quark = g_quark_from_static_string ("GstPinosSrcFDPayloadQuark"); } static void @@ -246,55 +249,100 @@ gst_pinos_src_src_fixate (GstBaseSrc * bsrc, GstCaps * caps) return caps; } +typedef struct { + GstPinosSrc *src; + PinosPacketFDPayload p; +} FDPayloadData; + +static void +fdpayload_data_destroy (gpointer user_data) +{ + FDPayloadData *data = user_data; + GstPinosSrc *pinossrc = data->src; + PinosBufferBuilder b; + PinosPacketReleaseFDPayload r; + PinosBuffer pbuf; + + r.id = data->p.id; + + GST_DEBUG_OBJECT (pinossrc, "destroy %d", r.id); + + pinos_buffer_builder_init (&b); + pinos_buffer_builder_add_release_fd_payload (&b, &r); + pinos_buffer_builder_end (&b, &pbuf); + + GST_OBJECT_LOCK (pinossrc); + if (pinossrc->stream) + pinos_stream_send_buffer (pinossrc->stream, &pbuf); + GST_OBJECT_UNLOCK (pinossrc); + + pinos_buffer_clear (&pbuf); + + gst_object_unref (pinossrc); + g_slice_free (FDPayloadData, data); +} + static void on_new_buffer (GObject *gobject, gpointer user_data) { GstPinosSrc *pinossrc = user_data; - PinosBuffer pbuf; - const PinosBufferHeader *hdr; + PinosBuffer *pbuf; PinosBufferIter it; GstBuffer *buf; GError *error = NULL; GST_LOG_OBJECT (pinossrc, "got new buffer"); - if (!pinos_stream_capture_buffer (pinossrc->stream, &pbuf)) { + if (!pinos_stream_peek_buffer (pinossrc->stream, &pbuf)) { g_warning ("failed to capture buffer"); return; } buf = gst_buffer_new (); - hdr = pinos_buffer_get_header (&pbuf, NULL); - - if (GST_CLOCK_TIME_IS_VALID (hdr->pts)) { - if (hdr->pts > GST_ELEMENT_CAST (pinossrc)->base_time) - GST_BUFFER_PTS (buf) = hdr->pts - GST_ELEMENT_CAST (pinossrc)->base_time; - - if (GST_BUFFER_PTS (buf) + hdr->dts_offset > 0) - GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + hdr->dts_offset; - } - GST_BUFFER_OFFSET (buf) = hdr->seq; - - pinos_buffer_iter_init (&it, &pbuf); + pinos_buffer_iter_init (&it, pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { + case PINOS_PACKET_TYPE_HEADER: + { + PinosPacketHeader hdr; + + if (!pinos_buffer_iter_parse_header (&it, &hdr)) + goto no_fds; + + if (GST_CLOCK_TIME_IS_VALID (hdr.pts)) { + if (hdr.pts > GST_ELEMENT_CAST (pinossrc)->base_time) + GST_BUFFER_PTS (buf) = hdr.pts - GST_ELEMENT_CAST (pinossrc)->base_time; + + if (GST_BUFFER_PTS (buf) + hdr.dts_offset > 0) + GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + hdr.dts_offset; + } + GST_BUFFER_OFFSET (buf) = hdr.seq; + break; + } case PINOS_PACKET_TYPE_FD_PAYLOAD: { GstMemory *fdmem = NULL; - PinosPacketFDPayload p; + FDPayloadData data; int fd; GST_DEBUG ("got fd payload"); - pinos_buffer_iter_parse_fd_payload (&it, &p); - fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &error); + if (!pinos_buffer_iter_parse_fd_payload (&it, &data.p)) + goto no_fds; + fd = pinos_buffer_get_fd (pbuf, data.p.fd_index, &error); if (fd == -1) goto no_fds; fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, fd, - p.offset + p.size, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (fdmem, p.offset, p.size); + data.p.offset + data.p.size, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (fdmem, data.p.offset, data.p.size); gst_buffer_append_memory (buf, fdmem); + + data.src = gst_object_ref (pinossrc); + gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (fdmem), + fdpayload_data_quark, + g_slice_dup (FDPayloadData, &data), + fdpayload_data_destroy); break; } default: @@ -305,8 +353,6 @@ on_new_buffer (GObject *gobject, gst_buffer_unref (pinossrc->current); pinossrc->current = buf; - pinos_stream_release_buffer (pinossrc->stream, &pbuf); - pinos_main_loop_signal (pinossrc->loop, FALSE); return; @@ -678,7 +724,9 @@ gst_pinos_src_close (GstPinosSrc * pinossrc) g_clear_object (&pinossrc->loop); g_clear_object (&pinossrc->ctx); g_main_context_unref (pinossrc->context); + GST_OBJECT_LOCK (pinossrc); g_clear_object (&pinossrc->stream); + GST_OBJECT_UNLOCK (pinossrc); if (pinossrc->current) gst_buffer_unref (pinossrc->current);