socketsink: track buffers with the fdmanager

This commit is contained in:
Wim Taymans 2016-04-27 16:05:02 +02:00
parent 45c5e4c47f
commit 1c16fd5533

View file

@ -35,6 +35,7 @@
#include <gst/allocators/gstfdmemory.h>
#include <gst/net/gstnetcontrolmessagemeta.h>
#include <gst/video/video.h>
#include "gstpinossocketsink.h"
#include "gsttmpfileallocator.h"
@ -497,6 +498,107 @@ open_failed:
}
}
static void
myreader_receive_buffer (GstPinosSocketSink *this, MyReader *myreader)
{
MySource *mysource = myreader->source;
gssize navail, nread, maxmem;
GstEvent *ev;
gchar *mem;
PinosBuffer pbuf;
PinosBufferIter it;
PinosBufferBuilder b;
const gchar *client_path;
gboolean have_out = FALSE;
navail = g_socket_get_available_bytes (myreader->socket);
maxmem = MAX (navail, 1);
mem = g_malloc (maxmem);
nread = g_socket_receive (myreader->socket, mem, maxmem, NULL, NULL);
if (nread <= 0) {
GST_DEBUG ("client closed");
mysource->condition &= ~G_IO_IN;
g_source_modify_unix_fd ((GSource *)mysource, mysource->tag, mysource->condition);
g_free (mem);
return;
}
client_path = g_object_get_data (G_OBJECT (myreader->socket), "pinos-client-path");
if (client_path == NULL)
return;
if (this->pinos_input) {
pinos_buffer_builder_init (&b);
}
pinos_buffer_init_data (&pbuf, mem, maxmem, NULL);
pinos_buffer_iter_init (&it, &pbuf);
while (pinos_buffer_iter_next (&it)) {
switch (pinos_buffer_iter_get_type (&it)) {
case PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD:
{
PinosPacketReleaseFDPayload p;
gint id;
if (!pinos_buffer_iter_parse_release_fd_payload (&it, &p))
continue;
id = p.id;
GST_LOG ("fd index %d for client %s is released", id, client_path);
pinos_fd_manager_remove (this->fdmanager, client_path, id);
break;
}
case PINOS_PACKET_TYPE_REFRESH_REQUEST:
{
PinosPacketRefreshRequest p;
if (!pinos_buffer_iter_parse_refresh_request (&it, &p))
continue;
GST_LOG ("refresh request");
if (!this->pinos_input) {
gst_pad_push_event (GST_BASE_SINK_PAD (this),
gst_video_event_new_upstream_force_key_unit (p.pts,
p.request_type == 1, 0));
} else {
pinos_buffer_builder_add_refresh_request (&b, &p);
have_out = TRUE;
}
break;
}
default:
break;
}
}
pinos_buffer_clear (&pbuf);
g_free (mem);
if (this->pinos_input) {
GstBuffer *outbuf;
gsize size;
gpointer data;
if (have_out) {
pinos_buffer_builder_end (&b, &pbuf);
data = pinos_buffer_steal (&pbuf, &size, NULL);
outbuf = gst_buffer_new_wrapped (data, size);
ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("GstNetworkMessage",
"object", G_TYPE_OBJECT, this,
"buffer", GST_TYPE_BUFFER, outbuf, NULL));
gst_buffer_unref (outbuf);
gst_pad_push_event (GST_BASE_SINK_PAD (this), ev);
} else {
pinos_buffer_builder_clear (&b);
}
}
}
static void
myreader_callback (GstBurstCache *cache,
GstBurstCacheReader *reader,
@ -510,56 +612,73 @@ myreader_callback (GstBurstCache *cache,
g_source_modify_unix_fd ((GSource *)mysource, mysource->tag, mysource->condition);
}
static int
map_n_memory_output_vector (GstBuffer * buf, GOutputVector * vectors,
GstMapInfo * mapinfo, guint num_vectors)
#define VEC_MAX 8
#define CMSG_MAX 255
static void
myreader_send_buffer (GstPinosSocketSink *this, MyReader *myreader, GstBuffer *buf)
{
guint mem_len;
guint i;
GstMapInfo maps[VEC_MAX];
GOutputVector vec[VEC_MAX];
GSocketControlMessage *cmsgs[CMSG_MAX];
guint i, mem_len;
gpointer iter_state = NULL;
GstMeta *meta;
gsize msg_count = 0;
gssize wrote;
mem_len = gst_buffer_n_memory (buf);
mem_len = MIN (gst_buffer_n_memory (buf), VEC_MAX);
for (i = 0; i < mem_len && i < num_vectors; i++) {
for (i = 0; i < mem_len; i++) {
GstMapInfo map = { 0 };
GstMemory *mem = gst_buffer_peek_memory (buf, i);
if (!gst_memory_map (mem, &map, GST_MAP_READ))
g_error ("Unable to map memory %p. This should never happen.", mem);
vectors[i].buffer = map.data;
vectors[i].size = map.size;
vec[i].buffer = map.data;
vec[i].size = map.size;
mapinfo[i] = map;
maps[i] = map;
}
return i;
}
static void
unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings)
{
gint i;
for (i = 0; i < num_mappings; i++)
gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]);
}
static gsize
gst_buffer_get_cmsg_list (GstBuffer * buf, GSocketControlMessage ** msgs,
gsize msg_space)
{
gpointer iter_state = NULL;
GstMeta *meta;
gsize msg_count = 0;
while ((meta = gst_buffer_iterate_meta (buf, &iter_state)) != NULL
&& msg_count < msg_space) {
&& msg_count < CMSG_MAX) {
if (meta->info->api == GST_NET_CONTROL_MESSAGE_META_API_TYPE)
msgs[msg_count++] = ((GstNetControlMessageMeta *) meta)->message;
cmsgs[msg_count++] = ((GstNetControlMessageMeta *) meta)->message;
}
return msg_count;
}
#define CMSG_MAX 255
wrote = g_socket_send_message (myreader->socket, NULL, vec, mem_len, cmsgs, msg_count, 0,
NULL, NULL);
for (i = 0; i < mem_len; i++)
gst_memory_unmap (maps[i].memory, &maps[i]);
if (wrote < 0) {
GST_DEBUG_OBJECT (this, "error sending to reader");
} else {
GArray *fdids;
const gchar *client_path;
fdids = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf), fdids_quark);
if (fdids == NULL)
return;
/* get the client path of this socket */
client_path = g_object_get_data (G_OBJECT (myreader->socket), "pinos-client-path");
if (client_path == NULL)
return;
for (i = 0; i < fdids->len; i++) {
gint id = g_array_index (fdids, guint32, i);
/* now store the id/client-path/buffer in the fdmanager */
GST_LOG ("fd index %d, client %s increment refcount of buffer %p", id, client_path, buf);
pinos_fd_manager_add (this->fdmanager,
client_path, id,
gst_buffer_ref (buf),
(GDestroyNotify) gst_buffer_unref);
}
}
}
static gboolean
myreader_source_func (GstBurstCacheReader *reader, GIOCondition condition, gpointer user_data)
@ -575,31 +694,7 @@ myreader_source_func (GstBurstCacheReader *reader, GIOCondition condition, gpoin
return FALSE;
}
if (condition & G_IO_IN) {
gssize navail, nread, maxmem;
GstBuffer *buf;
GstEvent *ev;
gchar *mem;
navail = g_socket_get_available_bytes (myreader->socket);
maxmem = MAX (navail, 1);
mem = g_malloc (maxmem);
nread = g_socket_receive (myreader->socket, mem, maxmem, NULL, NULL);
if (nread > 0) {
buf = gst_buffer_new_wrapped (mem, navail);
ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("GstNetworkMessage",
"object", G_TYPE_OBJECT, myreader->socket,
"buffer", GST_TYPE_BUFFER, buf, NULL));
gst_buffer_unref (buf);
gst_pad_push_event (GST_BASE_SINK_PAD (this), ev);
} else {
GST_DEBUG ("client closed");
mysource->condition &= ~G_IO_IN;
g_source_modify_unix_fd ((GSource *)mysource, mysource->tag, mysource->condition);
g_free (mem);
}
myreader_receive_buffer (this, myreader);
}
if (condition & G_IO_OUT) {
GstBuffer *buf = NULL;
@ -621,19 +716,7 @@ myreader_source_func (GstBurstCacheReader *reader, GIOCondition condition, gpoin
break;
}
if (buf) {
GstMapInfo maps[8];
GOutputVector vec[8];
guint mems_mapped;
GSocketControlMessage *cmsgs[CMSG_MAX];
gsize msg_count;
mems_mapped = map_n_memory_output_vector (buf, vec, maps, 8);
msg_count = gst_buffer_get_cmsg_list (buf, cmsgs, CMSG_MAX);
g_socket_send_message (myreader->socket, NULL, vec, mems_mapped, cmsgs, msg_count, 0,
NULL, NULL);
unmap_n_memorys (maps, mems_mapped);
myreader_send_buffer (this, myreader, buf);
gst_buffer_unref (buf);
}
}