diff --git a/src/client/pv-stream.c b/src/client/pv-stream.c index 77df500e5..c4bd7ad70 100644 --- a/src/client/pv-stream.c +++ b/src/client/pv-stream.c @@ -688,13 +688,16 @@ on_socket_condition (GSocket *socket, if (priv->info.message) g_object_unref (priv->info.message); + if (num_messages == 0) + break; + priv->info.flags = msg.flags; priv->info.seq = msg.seq; priv->info.pts = msg.pts; priv->info.dts_offset = msg.dts_offset; priv->info.offset = msg.offset; priv->info.size = msg.size; - priv->info.message = num_messages > 0 ? messages[0] : NULL; + priv->info.message = messages[0]; g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL); break; diff --git a/src/gst/gstpvsink.c b/src/gst/gstpvsink.c index 1a2cc6414..0ae63df76 100644 --- a/src/gst/gstpvsink.c +++ b/src/gst/gstpvsink.c @@ -257,6 +257,21 @@ gst_pulsevideo_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) g_mutex_unlock (&pvsink->lock); pv_stream_start (pvsink->stream, format, PV_STREAM_MODE_BUFFER); + + g_mutex_lock (&pvsink->lock); + while (TRUE) { + PvStreamState state = pv_stream_get_state (pvsink->stream); + + if (state == PV_STREAM_STATE_STREAMING) + break; + + if (state == PV_STREAM_STATE_ERROR) + goto connect_error; + + g_cond_wait (&pvsink->cond, &pvsink->lock); + } + g_mutex_unlock (&pvsink->lock); + pvsink->negotiated = TRUE; return TRUE; diff --git a/src/gst/gstpvsrc.c b/src/gst/gstpvsrc.c index 578127bdf..0fdc072e4 100644 --- a/src/gst/gstpvsrc.c +++ b/src/gst/gstpvsrc.c @@ -368,8 +368,16 @@ gst_pulsevideo_src_create (GstPushSrc * psrc, GstBuffer ** buffer) goto not_negotiated; g_mutex_lock (&pvsrc->lock); - g_cond_wait (&pvsrc->cond, &pvsrc->lock); - pv_stream_capture_buffer (pvsrc->stream, &info); + while (TRUE) { + g_cond_wait (&pvsrc->cond, &pvsrc->lock); + + if (pv_stream_get_state (pvsrc->stream) != PV_STREAM_STATE_STREAMING) + goto streaming_stopped; + + pv_stream_capture_buffer (pvsrc->stream, &info); + if (info.message != NULL) + break; + } g_mutex_unlock (&pvsrc->lock); *buffer = gst_buffer_new (); @@ -393,6 +401,11 @@ not_negotiated: { return GST_FLOW_NOT_NEGOTIATED; } +streaming_stopped: + { + g_mutex_unlock (&pvsrc->lock); + return GST_FLOW_FLUSHING; + } } static gboolean diff --git a/src/modules/v4l2/pv-v4l2-source.c b/src/modules/v4l2/pv-v4l2-source.c index 1d6fb3175..b21208577 100644 --- a/src/modules/v4l2/pv-v4l2-source.c +++ b/src/modules/v4l2/pv-v4l2-source.c @@ -141,7 +141,20 @@ v4l2_set_state (PvSource *source, PvSourceState state) static GBytes * v4l2_get_capabilities (PvSource *source, GBytes *filter) { - return NULL; + GstCaps *caps, *cfilter; + gchar *str; + + cfilter = gst_caps_from_string (g_bytes_get_data (filter, NULL)); + if (cfilter == NULL) + return NULL; + + caps = collect_caps (source, cfilter); + if (caps == NULL) + return NULL; + + str = gst_caps_to_string (caps); + + return g_bytes_new_take (str, strlen (str) + 1); } static void diff --git a/src/server/pv-client-source.c b/src/server/pv-client-source.c index f6bb74bfe..f7097e136 100644 --- a/src/server/pv-client-source.c +++ b/src/server/pv-client-source.c @@ -17,6 +17,7 @@ * Boston, MA 02110-1301, USA. */ +#include #include #include @@ -92,18 +93,20 @@ setup_pipeline (PvClientSource *source) gst_object_unref (bus); } -static void -collect_capabilities (PvSource * source) +static GstCaps * +collect_caps (PvSource * source, GstCaps *filter) { PvClientSourcePrivate *priv = PV_CLIENT_SOURCE (source)->priv; GstCaps *res; GstQuery *query; query = gst_query_new_caps (NULL); - gst_element_query (priv->src, query); + gst_element_query (priv->filter, query); gst_query_parse_caps_result (query, &res); - g_print ("client source caps: %s\n", gst_caps_to_string (res)); + gst_caps_ref (res); gst_query_unref (query); + + return res; } static gboolean @@ -118,7 +121,6 @@ client_set_state (PvSource *source, PvSourceState state) case PV_SOURCE_STATE_INIT: gst_element_set_state (priv->pipeline, GST_STATE_READY); - collect_capabilities (source); break; case PV_SOURCE_STATE_IDLE: @@ -139,7 +141,20 @@ client_set_state (PvSource *source, PvSourceState state) static GBytes * client_get_capabilities (PvSource *source, GBytes *filter) { - return NULL; + GstCaps *caps, *cfilter; + gchar *str; + + cfilter = gst_caps_from_string (g_bytes_get_data (filter, NULL)); + if (cfilter == NULL) + return NULL; + + caps = collect_caps (source, cfilter); + if (caps == NULL) + return NULL; + + str = gst_caps_to_string (caps); + + return g_bytes_new_take (str, strlen (str) + 1); } static void @@ -151,31 +166,34 @@ on_socket_notify (GObject *gobject, PvClientSourcePrivate *priv = source->priv; GSocket *socket; guint num_handles; - GBytes *requested_format; g_object_get (gobject, "socket", &socket, NULL); g_print ("source socket %p\n", socket); if (socket == NULL) { - if (priv->socket) - g_signal_emit_by_name (priv->sink, "remove", priv->socket); + GSocket *prev_socket = g_object_get_data (gobject, "last-socket"); + if (prev_socket) { + g_signal_emit_by_name (priv->sink, "remove", prev_socket); + } } else { g_signal_emit_by_name (priv->sink, "add", socket); } - priv->socket = socket; - - /* force format on input */ - g_object_get (priv->input, "format", &requested_format, NULL); - g_assert (requested_format != NULL); - g_print ("final format %s\n", (gchar *) g_bytes_get_data (requested_format, NULL)); - g_object_set (gobject, "format", requested_format, NULL); - g_bytes_unref (requested_format); + g_object_set_data (gobject, "last-socket", socket); g_object_get (priv->sink, "num-handles", &num_handles, NULL); + g_print ("num handles %d\n", num_handles); if (num_handles == 0) { gst_element_set_state (priv->pipeline, GST_STATE_READY); - } else { + } else if (socket) { + GBytes *format; + + /* suggest what we provide */ + g_object_get (priv->input, "format", &format, NULL); + g_print ("final format %s\n", (gchar *) g_bytes_get_data (format, NULL)); + g_object_set (gobject, "format", format, NULL); + g_bytes_unref (format); + gst_element_set_state (priv->pipeline, GST_STATE_PLAYING); } } @@ -237,17 +255,22 @@ on_input_socket_notify (GObject *gobject, g_object_get (gobject, "socket", &socket, NULL); g_print ("input socket %p\n", socket); - g_object_get (gobject, "requested-format", &requested_format, NULL); - g_assert (requested_format != NULL); - g_print ("final format %s\n", (gchar *) g_bytes_get_data (requested_format, NULL)); - g_object_set (gobject, "format", requested_format, NULL); - - caps = gst_caps_from_string (g_bytes_get_data (requested_format, NULL)); - g_assert (caps != NULL); - g_object_set (priv->filter, "caps", caps, NULL); - gst_caps_unref (caps); - g_bytes_unref (requested_format); + if (socket) { + /* requested format is final format */ + g_object_get (gobject, "requested-format", &requested_format, NULL); + g_assert (requested_format != NULL); + g_print ("final format %s\n", (gchar *) g_bytes_get_data (requested_format, NULL)); + g_object_set (gobject, "format", requested_format, NULL); + /* and set as caps on the filter */ + caps = gst_caps_from_string (g_bytes_get_data (requested_format, NULL)); + g_assert (caps != NULL); + g_object_set (priv->filter, "caps", caps, NULL); + gst_caps_unref (caps); + g_bytes_unref (requested_format); + } else { + g_object_set (priv->filter, "caps", NULL, NULL); + } g_object_set (priv->src, "socket", socket, NULL); }