diff --git a/doc/design.txt b/doc/design.txt index a5ea69922..838705c7b 100644 --- a/doc/design.txt +++ b/doc/design.txt @@ -197,10 +197,13 @@ Fixed header +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Version | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Flags | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Version : 4 bytes : message version +Flags : 4 bytes : extra flags Length : 4 bytes : total message length diff --git a/pinos/Makefile.am b/pinos/Makefile.am index 1201c86a9..88ffdf52e 100644 --- a/pinos/Makefile.am +++ b/pinos/Makefile.am @@ -238,6 +238,8 @@ libgstpinos_la_SOURCES = \ gst/gstburstcache.c \ gst/gstpinos.c \ gst/gstpinossocketsink.c \ + gst/gstpinosportsink.c \ + gst/gstpinosportsrc.c \ gst/gstpinospay.c \ gst/gstpinosdepay.c \ gst/gstpinosdeviceprovider.c \ @@ -251,7 +253,9 @@ libgstpinos_la_LIBADD = $(GST_BASE_LIBS) $(GST_LIBS) $(GLIB_LIBS) $(LIBM) -lgst libgstpinos_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS) noinst_HEADERS = gst/gstburstcache.h gst/gstpinossrc.h \ - gst/gstpinossocketsink.h gst/gstpinossink.h gst/gstpinospay.h \ + gst/gstpinossocketsink.h gst/gstpinosportsink.h \ + gst/gstpinosportsrc.h \ + gst/gstpinossink.h gst/gstpinospay.h \ gst/gstpinosdepay.h gst/gstpinosdeviceprovider.h ################################### diff --git a/pinos/client/buffer.c b/pinos/client/buffer.c index 7d1943d91..3bef337ae 100644 --- a/pinos/client/buffer.c +++ b/pinos/client/buffer.c @@ -65,12 +65,16 @@ void pinos_buffer_clear (PinosBuffer *buffer) { PinosStackBuffer *sb = PSB (buffer); + gint i; g_return_if_fail (is_valid_buffer (buffer)); sb->magic = 0; g_free (sb->free_data); + for (i = 0; i < sb->n_fds; i++) + close (sb->fds[i]); g_free (sb->free_fds); + sb->n_fds = 0; } /** @@ -94,6 +98,27 @@ pinos_buffer_get_version (PinosBuffer *buffer) return hdr->version; } +/** + * pinos_buffer_get_flags + * @buffer: a #PinosBuffer + * + * Get the buffer flags + * + * Returns: the buffer flags. + */ +PinosBufferFlags +pinos_buffer_get_flags (PinosBuffer *buffer) +{ + PinosStackBuffer *sb = PSB (buffer); + PinosStackHeader *hdr; + + g_return_val_if_fail (is_valid_buffer (buffer), -1); + + hdr = sb->data; + + return hdr->flags; +} + /** * pinos_buffer_get_fd: * @buffer: a #PinosBuffer @@ -397,12 +422,30 @@ pinos_buffer_builder_init_full (PinosBufferBuilder *builder, sh = sb->sh = sb->buf.data; sh->version = version; + sh->flags = 0; sh->length = 0; sb->type = 0; sb->offset = 0; } +/** + * pinos_buffer_builder_set_flags: + * @builder: a #PinosBufferBuilder + * @flags: flags to set + * + * Set the flags on the buffer from @builder. + */ +void +pinos_buffer_builder_set_flags (PinosBufferBuilder *builder, PinosBufferFlags flags) +{ + struct stack_builder *sb = PPSB (builder); + + g_return_if_fail (is_valid_builder (builder)); + + sb->sh->flags = flags; +} + /** * pinos_buffer_builder_clear: * @builder: a #PinosBufferBuilder @@ -742,6 +785,7 @@ pinos_buffer_builder_add_format_change (PinosBufferBuilder *builder, len); *p++ = payload->id; strcpy (p, payload->format); + sb->sh->flags |= PINOS_BUFFER_FLAG_CONTROL; return TRUE; } diff --git a/pinos/client/buffer.h b/pinos/client/buffer.h index b828ad029..abfcfec64 100644 --- a/pinos/client/buffer.h +++ b/pinos/client/buffer.h @@ -31,6 +31,19 @@ typedef struct _PinosBufferBuilder PinosBufferBuilder; #define PINOS_BUFFER_VERSION 0 +/** + * PinosBufferFlags: + * @PINOS_BUFFER_FLAG_NONE: no flags + * @PINOS_BUFFER_FLAG_CONTROL: the buffer contains control info such + * as new format or properties. + * + * The possible buffer flags. + */ +typedef enum { + PINOS_BUFFER_FLAG_NONE = 0, + PINOS_BUFFER_FLAG_CONTROL = (1 << 0), +} PinosBufferFlags; + struct _PinosBuffer { /*< private >*/ gsize x[16]; @@ -45,6 +58,7 @@ void pinos_buffer_init_data (PinosBuffer *buffer, void pinos_buffer_clear (PinosBuffer *buffer); guint32 pinos_buffer_get_version (PinosBuffer *buffer); +PinosBufferFlags pinos_buffer_get_flags (PinosBuffer *buffer); int pinos_buffer_get_fd (PinosBuffer *buffer, gint index); @@ -116,6 +130,8 @@ void pinos_buffer_builder_init_full (PinosBufferBuilder *bui #define pinos_buffer_builder_init_into(b,d,md,f,mf) pinos_buffer_builder_init_full(b, PINOS_BUFFER_VERSION,d,md,f,mf); #define pinos_buffer_builder_init(b) pinos_buffer_builder_init_into(b, NULL, 0, NULL, 0); +void pinos_buffer_builder_set_flags (PinosBufferBuilder *builder, + PinosBufferFlags flags); void pinos_buffer_builder_clear (PinosBufferBuilder *builder); void pinos_buffer_builder_end (PinosBufferBuilder *builder, PinosBuffer *buffer); diff --git a/pinos/client/client-node.c b/pinos/client/client-node.c index 8c3156362..9fe97f081 100644 --- a/pinos/client/client-node.c +++ b/pinos/client/client-node.c @@ -18,11 +18,14 @@ */ #include +#include #include "pinos/client/pinos.h" +#include "pinos/client/subscribe.h" #include "pinos/client/enumtypes.h" #include "pinos/client/context.h" +#include "pinos/client/private.h" #include "pinos/client/client-node.h" #include "pinos/client/client-port.h" @@ -51,25 +54,163 @@ client_node_set_state (PinosNode *node, return FALSE; } +typedef struct { + PinosDirection direction; + gchar *name; + PinosProperties *properties; + GBytes *possible_formats; + GSocket *socket; +} CreatePortData; + +static void +create_port_data_free (CreatePortData *data) +{ + g_free (data->name); + if (data->properties) + pinos_properties_free (data->properties); + g_clear_object (&data->socket); + g_slice_free (CreatePortData, data); +} + +static void +on_port_proxy (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GTask *task = user_data; + CreatePortData *data = g_task_get_task_data (task); + PinosClientNode *node = g_task_get_source_object (task); + PinosClientNodePrivate *priv = node->priv; + PinosContext *context = priv->context; + GError *error = NULL; + GDBusProxy *proxy; + PinosClientPort *port; + + proxy = pinos_subscribe_get_proxy_finish (context->priv->subscribe, + res, + &error); + if (proxy == NULL) + goto port_failed; + + port = pinos_client_port_new (node, + proxy, + data->socket); + g_task_return_pointer (task, port, (GDestroyNotify) g_object_unref); + g_object_unref (task); + + return; + +port_failed: + { + g_warning ("failed to get port proxy: %s", error->message); + g_task_return_error (task, error); + g_object_unref (task); + return; + } +} + +static void +on_port_created (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GTask *task = user_data; + PinosClientNode *node = g_task_get_source_object (task); + CreatePortData *data = g_task_get_task_data (task); + PinosClientNodePrivate *priv = node->priv; + PinosContext *context = priv->context; + GVariant *ret; + GError *error = NULL; + const gchar *port_path; + GUnixFDList *fdlist; + gint fd, fd_idx; + + g_assert (priv->proxy == G_DBUS_PROXY (source_object)); + + ret = g_dbus_proxy_call_with_unix_fd_list_finish (priv->proxy, &fdlist, res, &error); + if (ret == NULL) + goto create_failed; + + g_variant_get (ret, "(&oh)", &port_path, &fd_idx); + + fd = g_unix_fd_list_get (fdlist, fd_idx, &error); + g_object_unref (fdlist); + if (fd == -1) + goto create_failed; + + data->socket = g_socket_new_from_fd (fd, &error); + if (data->socket == NULL) + goto create_failed; + + pinos_subscribe_get_proxy (context->priv->subscribe, + PINOS_DBUS_SERVICE, + port_path, + "org.pinos.Port1", + NULL, + on_port_proxy, + task); + g_variant_unref (ret); + + return; + + /* ERRORS */ +create_failed: + { + g_warning ("failed to create port: %s", error->message); + g_task_return_error (task, error); + g_object_unref (task); + return; + } +} + + +static gboolean +do_create_port (GTask *task) +{ + PinosClientNode *node = g_task_get_source_object (task); + PinosClientNodePrivate *priv = node->priv; + CreatePortData *data = g_task_get_task_data (task); + + g_dbus_proxy_call (priv->proxy, + "CreatePort", + g_variant_new ("(us@a{sv}s)", + data->direction, + data->name, + pinos_properties_to_variant (data->properties), + g_bytes_get_data (data->possible_formats, NULL)), + G_DBUS_CALL_FLAGS_NONE, + -1, + NULL, /* GCancellable *cancellable */ + on_port_created, + task); + return FALSE; +} + + static void client_node_create_port (PinosNode *node, PinosDirection direction, const gchar *name, GBytes *possible_formats, - PinosProperties *props, + PinosProperties *properties, GTask *task) { - PinosPort *port; + PinosClientNodePrivate *priv = PINOS_CLIENT_NODE (node)->priv; + PinosContext *context = priv->context; + CreatePortData *data; - port = g_object_new (PINOS_TYPE_CLIENT_PORT, - "node", node, - "direction", direction, - "name", name, - "possible-formats", possible_formats, - "properties", props, - NULL); + data = g_slice_new (CreatePortData); + data->direction = direction; + data->name = g_strdup (name); + data->possible_formats = possible_formats ? g_bytes_ref (possible_formats) : NULL; + data->properties = properties ? pinos_properties_copy (properties) : NULL; + + g_task_set_task_data (task, data, (GDestroyNotify) create_port_data_free); + + g_main_context_invoke (context->priv->context, + (GSourceFunc) do_create_port, + task); - g_task_return_pointer (task, port, (GDestroyNotify) g_object_unref); } static void diff --git a/pinos/client/client-port.c b/pinos/client/client-port.c index f5701f861..c9f5609c7 100644 --- a/pinos/client/client-port.c +++ b/pinos/client/client-port.c @@ -32,7 +32,7 @@ struct _PinosClientPortPrivate { - gint foo; + GDBusProxy *proxy; }; G_DEFINE_TYPE (PinosClientPort, pinos_client_port, PINOS_TYPE_PORT); @@ -40,6 +40,7 @@ G_DEFINE_TYPE (PinosClientPort, pinos_client_port, PINOS_TYPE_PORT); enum { PROP_0, + PROP_PROXY, }; static void @@ -49,8 +50,13 @@ pinos_client_port_get_property (GObject *_object, GParamSpec *pspec) { PinosClientPort *port = PINOS_CLIENT_PORT (_object); + PinosClientPortPrivate *priv = port->priv; switch (prop_id) { + case PROP_PROXY: + g_value_set_object (value, priv->proxy); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (port, prop_id, pspec); break; @@ -64,20 +70,126 @@ pinos_client_port_set_property (GObject *_object, GParamSpec *pspec) { PinosClientPort *port = PINOS_CLIENT_PORT (_object); + PinosClientPortPrivate *priv = port->priv; switch (prop_id) { + case PROP_PROXY: + priv->proxy = g_value_dup_object (value); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (port, prop_id, pspec); break; } } +static void +proxy_g_properties_changed (GDBusProxy *_proxy, + GVariant *changed_properties, + GStrv invalidated_properties, + gpointer user_data) +{ + PinosClientPort *port = user_data; + GVariantIter *iter; + gchar *key; + + g_variant_get (changed_properties, "a{sv}", &iter); + while (g_variant_iter_next (iter, "{&sv}", &key, NULL)) { + GVariant *variant; + gsize size; + gpointer data; + GBytes *bytes; + + g_debug ("changed %s", key); + + variant = g_dbus_proxy_get_cached_property (_proxy, key); + if (variant == NULL) + continue; + + if (strcmp (key, "Format") == 0) { + data = g_variant_dup_string (variant, &size); + bytes = g_bytes_new_take (data, size); + g_object_set (port, "format", bytes, NULL); + } + g_variant_unref (variant); + } + g_variant_iter_free (iter); +} + +#if 0 +static void +proxy_set_property_cb (GDBusProxy *proxy, + GAsyncResult *res, + gpointer user_data) +{ + GVariant *ret; + GError *error = NULL; + + ret = g_dbus_proxy_call_finish (proxy, res, &error); + if (ret == NULL) { + g_warning ("Error setting property: %s", error->message); + g_error_free (error); + } else + g_variant_unref (ret); +} + + +static void +on_property_notify (GObject *obj, + GParamSpec *pspec, + gpointer user_data) +{ + PinosPort *port = PINOS_PORT (obj); + PinosClientPortPrivate *priv = PINOS_CLIENT_PORT (port)->priv; + const gchar *prop_name = NULL; + GVariant *variant; + + g_debug ("update %s", pspec ? g_param_spec_get_name (pspec) : "NULL"); + + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "properties") == 0) { + PinosProperties *props = pinos_port_get_properties (port); + prop_name = "Properties"; + variant = props ? pinos_properties_to_variant (props) : NULL; + } + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "possible-formats") == 0) { + GBytes *bytes = pinos_port_get_possible_formats (port); + prop_name = "PossibleFormats"; + variant = bytes ? g_variant_new_string (g_bytes_get_data (bytes, NULL)) : NULL; + } + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "format") == 0) { + GBytes *bytes = pinos_port_get_format (port); + prop_name = "Format"; + variant = bytes ? g_variant_new_string (g_bytes_get_data (bytes, NULL)) : NULL; + } + if (prop_name) { + g_dbus_proxy_call (G_DBUS_PROXY (priv->proxy), + "org.freedesktop.DBus.Properties.Set", + g_variant_new ("(ssv)", + "org.pinos.Port1", + prop_name, + variant), + G_DBUS_CALL_FLAGS_NONE, + -1, + NULL, + (GAsyncReadyCallback) proxy_set_property_cb, + port); + if (variant) + g_variant_unref (variant); + } +} +#endif + static void pinos_client_port_constructed (GObject * object) { PinosClientPort *port = PINOS_CLIENT_PORT (object); + PinosClientPortPrivate *priv = port->priv; g_debug ("client-port %p: constructed", port); + g_signal_connect (priv->proxy, + "g-properties-changed", + (GCallback) proxy_g_properties_changed, + port); G_OBJECT_CLASS (pinos_client_port_parent_class)->constructed (object); } @@ -92,12 +204,15 @@ pinos_client_port_dispose (GObject * object) G_OBJECT_CLASS (pinos_client_port_parent_class)->dispose (object); } + static void pinos_client_port_finalize (GObject * object) { PinosClientPort *port = PINOS_CLIENT_PORT (object); + PinosClientPortPrivate *priv = port->priv; g_debug ("client-port %p: finalize", port); + g_clear_object (&priv->proxy); G_OBJECT_CLASS (pinos_client_port_parent_class)->finalize (object); } @@ -114,6 +229,16 @@ pinos_client_port_class_init (PinosClientPortClass * klass) gobject_class->finalize = pinos_client_port_finalize; gobject_class->set_property = pinos_client_port_set_property; gobject_class->get_property = pinos_client_port_get_property; + + g_object_class_install_property (gobject_class, + PROP_PROXY, + g_param_spec_object ("proxy", + "Proxy", + "The Proxy", + G_TYPE_DBUS_PROXY, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); } static void @@ -121,3 +246,72 @@ pinos_client_port_init (PinosClientPort * port) { port->priv = PINOS_CLIENT_PORT_GET_PRIVATE (port); } + +/** + * pinos_client_port_new: + * @node: a #PinosClientNode + * @id: an id + * @socket: a socket with the server port + * + * Create a new client port. + * + * Returns: a new client port + */ +PinosClientPort * +pinos_client_port_new (PinosClientNode *node, + gpointer id, + GSocket *socket) +{ + PinosClientPort *port; + GDBusProxy *proxy = id; + GVariant *variant; + PinosDirection direction = PINOS_DIRECTION_INVALID; + const gchar *name = "unknown"; + GBytes *possible_formats = NULL; + GBytes *format = NULL; + PinosProperties *properties = NULL; + + variant = g_dbus_proxy_get_cached_property (G_DBUS_PROXY (proxy), "Direction"); + if (variant != NULL) { + direction = g_variant_get_uint32 (variant); + g_variant_unref (variant); + } + variant = g_dbus_proxy_get_cached_property (G_DBUS_PROXY (proxy), "Name"); + if (variant != NULL) { + name = g_variant_get_string (variant, NULL); + g_variant_unref (variant); + } + variant = g_dbus_proxy_get_cached_property (G_DBUS_PROXY (proxy), "PossibleFormats"); + if (variant != NULL) { + gsize size; + gpointer data; + data = g_variant_dup_string (variant, &size); + possible_formats = g_bytes_new_take (data, size); + g_variant_unref (variant); + } + variant = g_dbus_proxy_get_cached_property (G_DBUS_PROXY (proxy), "Format"); + if (variant != NULL) { + gsize size; + gpointer data; + data = g_variant_dup_string (variant, &size); + format = g_bytes_new_take (data, size); + g_variant_unref (variant); + } + variant = g_dbus_proxy_get_cached_property (G_DBUS_PROXY (proxy), "Properties"); + if (variant != NULL) { + properties = pinos_properties_from_variant (variant); + g_variant_unref (variant); + } + + port = g_object_new (PINOS_TYPE_CLIENT_PORT, + "node", node, + "direction", direction, + "name", name, + "possible-formats", possible_formats, + "format", format, + "properties", properties, + "proxy", proxy, + "socket", socket, + NULL); + return port; +} diff --git a/pinos/client/client-port.h b/pinos/client/client-port.h index 84ceade7e..2d856e4fd 100644 --- a/pinos/client/client-port.h +++ b/pinos/client/client-port.h @@ -30,6 +30,7 @@ typedef struct _PinosClientPortPrivate PinosClientPortPrivate; #include #include +#include #define PINOS_TYPE_CLIENT_PORT (pinos_client_port_get_type ()) #define PINOS_IS_CLIENT_PORT(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), PINOS_TYPE_CLIENT_PORT)) @@ -66,6 +67,10 @@ struct _PinosClientPortClass { /* normal GObject stuff */ GType pinos_client_port_get_type (void); +PinosClientPort * pinos_client_port_new (PinosClientNode *node, + gpointer id, + GSocket *socket); + GBytes * pinos_client_port_get_formats (PinosClientPort *port, GBytes *filter, GError **error); diff --git a/pinos/client/context.c b/pinos/client/context.c index 316b1dfd9..b018075fe 100644 --- a/pinos/client/context.c +++ b/pinos/client/context.c @@ -792,6 +792,7 @@ on_node_proxy (GObject *source_object, NULL); g_task_return_pointer (task, node, (GDestroyNotify) g_object_unref); + g_object_unref (task); return; @@ -799,6 +800,7 @@ node_failed: { g_warning ("failed to get node proxy: %s", error->message); g_task_return_error (task, error); + g_object_unref (task); return; } } @@ -839,6 +841,7 @@ create_failed: { g_warning ("failed to create node: %s", error->message); g_task_return_error (task, error); + g_object_unref (task); return; } } diff --git a/pinos/client/introspect.c b/pinos/client/introspect.c index 3adee04d1..041f3bc2e 100644 --- a/pinos/client/introspect.c +++ b/pinos/client/introspect.c @@ -433,6 +433,7 @@ port_fill_info (PinosPortInfo *info, GDBusProxy *proxy) SET_STRING ("Name", name, 0); SET_PROPERTIES ("Properties", properties, 1); SET_BYTES ("PossibleFormats", possible_formats, 2); + SET_BYTES ("Format", format, 3); if (changed) g_hash_table_remove_all (changed); @@ -445,6 +446,8 @@ port_clear_info (PinosPortInfo *info) pinos_properties_free (info->properties); if (info->possible_formats) g_bytes_unref (info->possible_formats); + if (info->format) + g_bytes_unref (info->format); } /** diff --git a/pinos/client/introspect.h b/pinos/client/introspect.h index 654126cb3..04fb5f4ac 100644 --- a/pinos/client/introspect.h +++ b/pinos/client/introspect.h @@ -267,7 +267,8 @@ void pinos_context_get_node_info_by_id (PinosContext *context, * @change_mask: bitfield of changed fields since last call * @name: name the port, suitable for display * @properties: the properties of the port - * @possible formats: the possible formats this port can consume + * @possible_formats: the possible formats this port can consume + * @format: the current format on this port * * The port information. Extra information can be added in later * versions. @@ -281,6 +282,7 @@ typedef struct { const char *name; PinosProperties *properties; GBytes *possible_formats; + GBytes *format; } PinosPortInfo; /** diff --git a/pinos/client/mainloop.c b/pinos/client/mainloop.c index 912e79872..ddb5d2ce0 100644 --- a/pinos/client/mainloop.c +++ b/pinos/client/mainloop.c @@ -109,6 +109,7 @@ pinos_main_loop_constructed (GObject * object) PinosMainLoopPrivate *priv = loop->priv; priv->mainloop = g_main_loop_new (priv->maincontext, FALSE); + g_debug ("mainloop %p: contructed %p %p", loop, priv->maincontext, priv->mainloop); G_OBJECT_CLASS (pinos_main_loop_parent_class)->constructed (object); } @@ -267,7 +268,9 @@ handle_mainloop (PinosMainLoop *loop) g_main_context_set_poll_func (priv->maincontext, do_poll); g_main_context_push_thread_default (priv->maincontext); + g_debug ("mainloop %p: run mainloop %p context %p", loop, priv->mainloop, priv->maincontext); g_main_loop_run (priv->mainloop); + g_debug ("mainloop %p: done", loop); g_main_context_pop_thread_default (priv->maincontext); g_main_context_set_poll_func (priv->maincontext, priv->poll_func); diff --git a/pinos/client/node.c b/pinos/client/node.c index dc9261add..9bc6e05d4 100644 --- a/pinos/client/node.c +++ b/pinos/client/node.c @@ -130,6 +130,7 @@ pinos_node_dispose (GObject * obj) PinosNodePrivate *priv = node->priv; g_debug ("node %p: dispose", node); + pinos_node_set_state (node, PINOS_NODE_STATE_SUSPENDED); g_list_free_full (priv->ports, (GDestroyNotify) g_object_unref); priv->ports = NULL; @@ -215,6 +216,63 @@ pinos_node_init (PinosNode * node) priv->state = PINOS_NODE_STATE_SUSPENDED; } +/** + * pinos_node_get_name: + * @node: a #PinosNode + * + * Get the name of @node + * + * Returns: the name of @node + */ +const gchar * +pinos_node_get_name (PinosNode *node) +{ + PinosNodePrivate *priv; + + g_return_val_if_fail (PINOS_IS_NODE (node), NULL); + priv = node->priv; + + return priv->name; +} + +/** + * pinos_node_get_state: + * @node: a #PinosNode + * + * Get the state of @node + * + * Returns: the state of @node + */ +PinosNodeState +pinos_node_get_state (PinosNode *node) +{ + PinosNodePrivate *priv; + + g_return_val_if_fail (PINOS_IS_NODE (node), PINOS_NODE_STATE_ERROR); + priv = node->priv; + + return priv->state; +} + +/** + * pinos_node_get_properties: + * @node: a #PinosNode + * + * Get the properties of @node + * + * Returns: the properties of @node + */ +PinosProperties * +pinos_node_get_properties (PinosNode *node) +{ + PinosNodePrivate *priv; + + g_return_val_if_fail (PINOS_IS_NODE (node), NULL); + priv = node->priv; + + return priv->properties; +} + /** * pinos_node_remove: * @node: a #PinosNode @@ -227,7 +285,7 @@ pinos_node_remove (PinosNode *node) { g_return_if_fail (PINOS_IS_NODE (node)); - g_debug ("node %p: remove", node); + g_debug ("node %p: remove %d", node, G_OBJECT (node)->ref_count); g_signal_emit (node, signals[SIGNAL_REMOVE], 0, NULL); } @@ -269,7 +327,9 @@ pinos_node_create_port (PinosNode *node, if (!klass->create_port) return; + g_debug ("node %p %d: create port", node, G_OBJECT (node)->ref_count); task = g_task_new (node, cancellable, callback, user_data); + g_debug ("node %p %d: create port", node, G_OBJECT (node)->ref_count); klass->create_port (node, direction, name, possible_formats, props, task); } @@ -289,6 +349,7 @@ pinos_node_create_port_finish (PinosNode *node, priv->ports = g_list_append (priv->ports, port); g_signal_connect (port, "remove", (GCallback) handle_remove_port, node); } + g_debug ("node %p %d: created port %p", node, G_OBJECT (node)->ref_count, port); return port; } diff --git a/pinos/client/node.h b/pinos/client/node.h index ff899e2be..46f15dab4 100644 --- a/pinos/client/node.h +++ b/pinos/client/node.h @@ -79,6 +79,10 @@ struct _PinosNodeClass { /* normal GObject stuff */ GType pinos_node_get_type (void); +const gchar * pinos_node_get_name (PinosNode *node); +PinosNodeState pinos_node_get_state (PinosNode *node); +PinosProperties * pinos_node_get_properties (PinosNode *node); + void pinos_node_remove (PinosNode *node); void pinos_node_create_port (PinosNode *node, diff --git a/pinos/client/pinos.c b/pinos/client/pinos.c index 89a86b9c0..a169a1e78 100644 --- a/pinos/client/pinos.c +++ b/pinos/client/pinos.c @@ -104,3 +104,13 @@ pinos_fill_stream_properties (PinosProperties *properties) { g_return_if_fail (properties != NULL); } + +PinosDirection +pinos_direction_reverse (PinosDirection direction) +{ + if (direction == PINOS_DIRECTION_INPUT) + return PINOS_DIRECTION_OUTPUT; + else if (direction == PINOS_DIRECTION_OUTPUT) + return PINOS_DIRECTION_INPUT; + return direction; +} diff --git a/pinos/client/pinos.h b/pinos/client/pinos.h index 3d7d233fb..f0a3c52cd 100644 --- a/pinos/client/pinos.h +++ b/pinos/client/pinos.h @@ -43,4 +43,6 @@ gchar *pinos_client_name (void); void pinos_fill_context_properties (PinosProperties *properties); void pinos_fill_stream_properties (PinosProperties *properties); +PinosDirection pinos_direction_reverse (PinosDirection direction); + #endif /* __PINOS_H__ */ diff --git a/pinos/client/port.c b/pinos/client/port.c index cf7609cc1..21d98bc08 100644 --- a/pinos/client/port.c +++ b/pinos/client/port.c @@ -17,13 +17,18 @@ * Boston, MA 02110-1301, USA. */ +#include #include #include #include +#include +#include + #include "pinos/client/pinos.h" #include "pinos/client/enumtypes.h" +#include "pinos/client/private.h" #include "pinos/client/port.h" #include "pinos/client/node.h" @@ -36,9 +41,23 @@ struct _PinosPortPrivate PinosNode *node; gchar *name; + GSocket *sockets[2]; PinosDirection direction; GBytes *possible_formats; + GBytes *format; PinosProperties *properties; + + GSource *socket_source; + + PinosBuffer net_buffer; + + PinosBuffer *buffer; + PinosPort *peers[16]; + gint n_peers; + + PinosReceivedBufferCallback received_buffer_cb; + gpointer received_buffer_data; + GDestroyNotify received_buffer_notify; }; G_DEFINE_ABSTRACT_TYPE (PinosPort, pinos_port, G_TYPE_OBJECT); @@ -47,230 +66,44 @@ enum { PROP_0, PROP_NODE, + PROP_SOCKET, + PROP_MAIN_CONTEXT, PROP_NAME, PROP_DIRECTION, PROP_POSSIBLE_FORMATS, - PROP_PROPERTIES + PROP_FORMAT, + PROP_PROPERTIES, }; enum { SIGNAL_FORMAT_REQUEST, SIGNAL_REMOVE, + SIGNAL_LINKED, + SIGNAL_UNLINKED, LAST_SIGNAL }; static guint signals[LAST_SIGNAL] = { 0 }; -static void -pinos_port_get_property (GObject *_object, - guint prop_id, - GValue *value, - GParamSpec *pspec) +void +pinos_port_set_received_buffer_cb (PinosPort *port, + PinosReceivedBufferCallback cb, + gpointer user_data, + GDestroyNotify notify) { - PinosPort *port = PINOS_PORT (_object); - PinosPortPrivate *priv = port->priv; + PinosPortPrivate *priv; - switch (prop_id) { - case PROP_NODE: - g_value_set_object (value, priv->node); - break; + g_return_if_fail (PINOS_IS_PORT (port)); + priv = port->priv; - case PROP_NAME: - g_value_set_string (value, priv->name); - break; + g_debug ("port %p: set callback", port); - case PROP_DIRECTION: - g_value_set_enum (value, priv->direction); - break; - - case PROP_POSSIBLE_FORMATS: - g_value_set_boxed (value, priv->possible_formats); - break; - - case PROP_PROPERTIES: - g_value_set_boxed (value, priv->properties); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (port, prop_id, pspec); - break; - } -} - -static void -pinos_port_set_property (GObject *_object, - guint prop_id, - const GValue *value, - GParamSpec *pspec) -{ - PinosPort *port = PINOS_PORT (_object); - PinosPortPrivate *priv = port->priv; - - switch (prop_id) { - case PROP_NODE: - priv->node = g_value_get_object (value); - break; - - case PROP_NAME: - priv->name = g_value_dup_string (value); - break; - - case PROP_DIRECTION: - priv->direction = g_value_get_enum (value); - break; - - case PROP_POSSIBLE_FORMATS: - { - if (priv->possible_formats) - g_bytes_unref (priv->possible_formats); - priv->possible_formats = g_value_dup_boxed (value); - break; - } - - case PROP_PROPERTIES: - if (priv->properties) - pinos_properties_free (priv->properties); - priv->properties = g_value_dup_boxed (value); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (port, prop_id, pspec); - break; - } -} - -static void -pinos_port_constructed (GObject * object) -{ - PinosPort *port = PINOS_PORT (object); - - g_debug ("port %p: constructed", port); - - G_OBJECT_CLASS (pinos_port_parent_class)->constructed (object); -} - -static void -pinos_port_dispose (GObject * object) -{ - PinosPort *port = PINOS_PORT (object); - - g_debug ("port %p: dispose", port); - - G_OBJECT_CLASS (pinos_port_parent_class)->dispose (object); -} - -static void -pinos_port_finalize (GObject * object) -{ - PinosPort *port = PINOS_PORT (object); - PinosPortPrivate *priv = port->priv; - - g_debug ("port %p: finalize", port); - - g_free (priv->name); - if (priv->possible_formats) - g_bytes_unref (priv->possible_formats); - if (priv->properties) - pinos_properties_free (priv->properties); - - G_OBJECT_CLASS (pinos_port_parent_class)->finalize (object); -} - -static void -pinos_port_class_init (PinosPortClass * klass) -{ - GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - - g_type_class_add_private (klass, sizeof (PinosPortPrivate)); - - gobject_class->constructed = pinos_port_constructed; - gobject_class->dispose = pinos_port_dispose; - gobject_class->finalize = pinos_port_finalize; - gobject_class->set_property = pinos_port_set_property; - gobject_class->get_property = pinos_port_get_property; - - g_object_class_install_property (gobject_class, - PROP_NODE, - g_param_spec_object ("node", - "Node", - "The Node", - PINOS_TYPE_NODE, - G_PARAM_READWRITE | - G_PARAM_CONSTRUCT_ONLY | - G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, - PROP_NAME, - g_param_spec_string ("name", - "Name", - "The port name", - NULL, - G_PARAM_READWRITE | - G_PARAM_CONSTRUCT_ONLY | - G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, - PROP_DIRECTION, - g_param_spec_enum ("direction", - "Direction", - "The direction of the port", - PINOS_TYPE_DIRECTION, - PINOS_DIRECTION_INVALID, - G_PARAM_READWRITE | - G_PARAM_CONSTRUCT_ONLY | - G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, - PROP_POSSIBLE_FORMATS, - g_param_spec_boxed ("possible-formats", - "Possible Formats", - "The possbile formats of the port", - G_TYPE_BYTES, - G_PARAM_READWRITE | - G_PARAM_CONSTRUCT | - G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, - PROP_PROPERTIES, - g_param_spec_boxed ("properties", - "Properties", - "The properties of the port", - PINOS_TYPE_PROPERTIES, - G_PARAM_READWRITE | - G_PARAM_CONSTRUCT | - G_PARAM_STATIC_STRINGS)); - - - signals[SIGNAL_FORMAT_REQUEST] = g_signal_new ("format-request", - G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, - 0, - NULL, - NULL, - g_cclosure_marshal_generic, - G_TYPE_NONE, - 0, - G_TYPE_NONE); - - signals[SIGNAL_REMOVE] = g_signal_new ("remove", - G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, - 0, - NULL, - NULL, - g_cclosure_marshal_generic, - G_TYPE_NONE, - 0, - G_TYPE_NONE); -} - -static void -pinos_port_init (PinosPort * port) -{ - PinosPortPrivate *priv = port->priv = PINOS_PORT_GET_PRIVATE (port); - - priv->direction = PINOS_DIRECTION_INVALID; + if (priv->received_buffer_notify) + priv->received_buffer_notify (priv->received_buffer_data);; + priv->received_buffer_cb = cb; + priv->received_buffer_data = user_data; + priv->received_buffer_notify = notify; } /** @@ -308,7 +141,122 @@ pinos_port_get_node (PinosPort *port) } /** - * pinos_port_get_formats: + * pinos_port_get_socket: + * @port: a #PinosPort + * + * Get the socket of @port + * + * Returns: the socket or %NULL + */ +GSocket * +pinos_port_get_socket (PinosPort *port) +{ + PinosPortPrivate *priv; + + g_return_val_if_fail (PINOS_IS_PORT (port), NULL); + priv = port->priv; + + return priv->sockets[0]; +} + +/** + * pinos_port_get_name: + * @port: a #PinosPort + * + * Get the name of @port + * + * Returns: the name or %NULL + */ +const gchar * +pinos_port_get_name (PinosPort *port) +{ + PinosPortPrivate *priv; + + g_return_val_if_fail (PINOS_IS_PORT (port), NULL); + priv = port->priv; + + return priv->name; +} + +/** + * pinos_port_get_direction: + * @port: a #PinosPort + * + * Get the direction of @port + * + * Returns: the direction or %NULL + */ +PinosDirection +pinos_port_get_direction (PinosPort *port) +{ + PinosPortPrivate *priv; + + g_return_val_if_fail (PINOS_IS_PORT (port), PINOS_DIRECTION_INVALID); + priv = port->priv; + + return priv->direction; +} + +/** + * pinos_port_get_possible_formats: + * @port: a #PinosPort + * + * Get the possible formats of @port + * + * Returns: the possible formats or %NULL + */ +GBytes * +pinos_port_get_possible_formats (PinosPort *port) +{ + PinosPortPrivate *priv; + + g_return_val_if_fail (PINOS_IS_PORT (port), NULL); + priv = port->priv; + + g_signal_emit (port, signals[SIGNAL_FORMAT_REQUEST], 0, NULL); + return priv->possible_formats; +} + +/** + * pinos_port_get_format: + * @port: a #PinosPort + * + * Get the format of @port + * + * Returns: the format or %NULL + */ +GBytes * +pinos_port_get_format (PinosPort *port) +{ + PinosPortPrivate *priv; + + g_return_val_if_fail (PINOS_IS_PORT (port), NULL); + priv = port->priv; + + return priv->format; +} + +/** + * pinos_port_get_properties: + * @port: a #PinosPort + * + * Get the properties of @port + * + * Returns: the properties or %NULL + */ +PinosProperties * +pinos_port_get_properties (PinosPort *port) +{ + PinosPortPrivate *priv; + + g_return_val_if_fail (PINOS_IS_PORT (port), NULL); + priv = port->priv; + + return priv->properties; +} + +/** + * pinos_port_filter_formats: * @port: a #PinosPort * @filter: a #GBytes * @error: a #GError or %NULL @@ -320,13 +268,14 @@ pinos_port_get_node (PinosPort *port) * be set. */ GBytes * -pinos_port_get_formats (PinosPort *port, - GBytes *filter, - GError **error) +pinos_port_filter_formats (PinosPort *port, + GBytes *filter, + GError **error) { GstCaps *tmp, *caps, *cfilter; gchar *str; PinosPortPrivate *priv; + GBytes *res; g_return_val_if_fail (PINOS_IS_PORT (port), NULL); priv = port->priv; @@ -348,31 +297,43 @@ pinos_port_get_formats (PinosPort *port, if (caps && cfilter) { tmp = gst_caps_intersect_full (caps, cfilter, GST_CAPS_INTERSECT_FIRST); - g_clear_pointer (&cfilter, gst_caps_unref); gst_caps_take (&caps, tmp); } + g_clear_pointer (&cfilter, gst_caps_unref); + if (caps == NULL || gst_caps_is_empty (caps)) goto no_format; str = gst_caps_to_string (caps); gst_caps_unref (caps); + res = g_bytes_new_take (str, strlen (str) + 1); - return g_bytes_new_take (str, strlen (str) + 1); + if (priv->direction == PINOS_DIRECTION_OUTPUT) { + gint i; + for (i = 0; i < priv->n_peers; i++) { + PinosPort *peer = priv->peers[i]; + if (peer == NULL) + continue; + res = pinos_port_filter_formats (peer, res, error); + } + } + + return res; invalid_filter: { - if (error) - *error = g_error_new (G_IO_ERROR, - G_IO_ERROR_INVALID_ARGUMENT, - "Invalid filter received"); + g_set_error (error, + G_IO_ERROR, + G_IO_ERROR_INVALID_ARGUMENT, + "Invalid filter received"); return NULL; } no_format: { - if (error) - *error = g_error_new (G_IO_ERROR, - G_IO_ERROR_NOT_FOUND, - "No compatible format found"); + g_set_error (error, + G_IO_ERROR, + G_IO_ERROR_NOT_FOUND, + "No compatible format found"); if (cfilter) gst_caps_unref (cfilter); if (caps) @@ -380,3 +341,867 @@ no_format: return NULL; } } + +/** + * pinos_port_link: + * @source: a source #PinosPort + * @destination: a destination #PinosPort + * + * Link two ports together. + * + * Returns: %TRUE if ports could be linked. + */ +gboolean +pinos_port_link (PinosPort *source, PinosPort *destination) +{ + g_return_val_if_fail (PINOS_IS_PORT (source), FALSE); + g_return_val_if_fail (PINOS_IS_PORT (destination), FALSE); + g_return_val_if_fail (source->priv->direction != destination->priv->direction, FALSE); + + if (source->priv->direction != PINOS_DIRECTION_OUTPUT) { + PinosPort *tmp; + tmp = source; + source = destination; + destination = tmp; + } + + source->priv->peers[source->priv->n_peers++] = destination; + destination->priv->peers[destination->priv->n_peers++] = source; + g_object_set (destination, "format", source->priv->format, NULL); + + g_debug ("port %p: linked to %p", source, destination); + g_signal_emit (source, signals[SIGNAL_LINKED], 0, destination); + g_signal_emit (destination, signals[SIGNAL_LINKED], 0, source); + + return TRUE; +} + +/** + * pinos_port_unlink: + * @source: a source #PinosPort + * @destination: a destination #PinosPort + * + * Link two ports together. + * + * Returns: %TRUE if ports could be linked. + */ +gboolean +pinos_port_unlink (PinosPort *source, PinosPort *destination) +{ + gint i; + + g_return_val_if_fail (PINOS_IS_PORT (source), FALSE); + g_return_val_if_fail (PINOS_IS_PORT (destination), FALSE); + + for (i = 0; i < source->priv->n_peers; i++) { + if (source->priv->peers[i] == destination) + source->priv->peers[i] = NULL; + } + for (i = 0; i < destination->priv->n_peers; i++) { + if (destination->priv->peers[i] == source) + destination->priv->peers[i] = NULL; + } + + g_debug ("port %p: unlinked from %p", source, destination); + g_signal_emit (source, signals[SIGNAL_UNLINKED], 0, destination); + g_signal_emit (destination, signals[SIGNAL_UNLINKED], 0, source); + + return TRUE; +} + +static void +pinos_port_unlink_all (PinosPort *port) +{ + gint i; + + for (i = 0; i < port->priv->n_peers; i++) { + PinosPort *peer = port->priv->peers[i]; + if (peer == NULL) + continue; + if (peer->priv->peers[i] == port) + peer->priv->peers[i] = NULL; + port->priv->peers[i] = NULL; + peer->priv->n_peers--; + g_signal_emit (port, signals[SIGNAL_UNLINKED], 0, peer); + g_signal_emit (peer, signals[SIGNAL_UNLINKED], 0, port); + } + port->priv->n_peers = 0; +} + +/** + * pinos_port_get_n_links: + * @port: a #PinosPort + * + * Get the number of links on this port + * + * Returns: the number of links + */ +gint +pinos_port_get_n_links (PinosPort *port) +{ + g_return_val_if_fail (PINOS_IS_PORT (port), -1); + + return port->priv->n_peers; +} + +static gboolean +read_buffer (GSocket *socket, + PinosBuffer *buffer, + GError **error) +{ + gssize len; + GInputVector ivec; + PinosStackHeader *hdr; + GSocketControlMessage **messages = NULL; + PinosStackBuffer *sb = (PinosStackBuffer *) buffer; + gint num_messages = 0; + gint flags = 0; + gsize need; + gint i; + + need = sizeof (PinosStackHeader); + + if (sb->max_size < need) { + sb->max_size = need; + sb->data = sb->free_data = g_realloc (sb->free_data, need); + } + + hdr = sb->data; + + /* read header first */ + ivec.buffer = hdr; + ivec.size = sizeof (PinosStackHeader); + + len = g_socket_receive_message (socket, + NULL, + &ivec, + 1, + &messages, + &num_messages, + &flags, + NULL, + error); + if (len == -1) + return FALSE; + + g_assert (len == sizeof (PinosStackHeader)); + + /* now we know the total length */ + need += hdr->length; + + if (sb->max_size < need) { + sb->max_size = need; + hdr = sb->data = sb->free_data = g_realloc (sb->free_data, need); + } + sb->size = need; + + if (hdr->length > 0) { + /* read data */ + len = g_socket_receive (socket, + (gchar *)sb->data + sizeof (PinosStackHeader), + hdr->length, + NULL, + error); + if (len == -1) + return FALSE; + + g_assert (len == hdr->length); + } + + if (sb->max_fds < num_messages) { + sb->max_fds = num_messages; + sb->fds = sb->free_fds = g_realloc (sb->free_fds, num_messages * sizeof (int)); + } + + /* handle control messages */ + for (i = 0; i < num_messages; i++) { + GSocketControlMessage *msg = messages[i]; + gint *fds, n_fds, j; + + if (g_socket_control_message_get_msg_type (msg) != SCM_RIGHTS) + continue; + + fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (msg), &n_fds); + for (j = 0; j < n_fds; j++) + sb->fds[i] = fds[i]; + g_free (fds); + g_object_unref (msg); + } + g_free (messages); + + sb->magic = PSB_MAGIC; + + return TRUE; +} + + +static gboolean +write_buffer (GSocket *socket, + PinosBuffer *buffer, + GError **error) +{ + gssize len; + PinosStackBuffer *sb = (PinosStackBuffer *) buffer; + GOutputVector ovec[1]; + GSocketControlMessage *msg = NULL; + gint n_msg, i, flags = 0; + + g_return_val_if_fail (buffer != NULL, FALSE); + + ovec[0].buffer = sb->data; + ovec[0].size = sb->size; + + if (sb->n_fds) { + msg = g_unix_fd_message_new (); + for (i = 0; i < sb->n_fds; i++) { + if (!g_unix_fd_message_append_fd (G_UNIX_FD_MESSAGE (msg), sb->fds[i], error)) + goto append_failed; + } + n_msg = 1; + } + else { + n_msg = 0; + } + + len = g_socket_send_message (socket, + NULL, + ovec, + 1, + &msg, + n_msg, + flags, + NULL, + error); + g_clear_object (&msg); + + if (len == -1) + goto send_error; + + g_assert (len == (gssize) sb->size); + + return TRUE; + +append_failed: + { + g_warning ("failed to append fd: %s", error ? (*error)->message : "unknown reason" ); + return FALSE; + } +send_error: + { + g_warning ("failed to send message: %s", error ? (*error)->message : "unknown reason" ); + return FALSE; + } +} + + +static gboolean +on_socket_condition (GSocket *socket, + GIOCondition condition, + gpointer user_data) +{ + PinosPort *port = user_data; + PinosPortPrivate *priv = port->priv; + GError *error = NULL; + + switch (condition) { + case G_IO_IN: + if (!read_buffer (socket, &priv->net_buffer, &error)) + goto read_failed; + g_debug ("port %p: received buffer", port); + + if (priv->direction == PINOS_DIRECTION_INPUT) { + priv->buffer = &priv->net_buffer; + if (priv->received_buffer_cb) + priv->received_buffer_cb (port, priv->received_buffer_data); + } else { + gint i; + for (i = 0; i < priv->n_peers; i++) { + PinosPort *peer = priv->peers[i]; + if (peer == NULL) + continue; + + g_debug ("port %p: send buffer %p to peer %p", port, &priv->net_buffer, peer); + if (!pinos_port_receive_buffer (peer, &priv->net_buffer, &error)) + goto read_failed; + } + } + break; + + case G_IO_OUT: + g_warning ("can do IO OUT\n"); + break; + + default: + break; + } + return TRUE; + +read_failed: + { + g_warning ("failed to read buffer: %s", error->message); + g_clear_error (&error); + return TRUE; + } +} + + +static void +handle_socket (PinosPort *port, GSocket *socket) +{ + PinosPortPrivate *priv = port->priv; + GMainContext *context = g_main_context_get_thread_default(); + + g_debug ("port %p: handle socket in context %p", port, context); + priv->socket_source = g_socket_create_source (socket, G_IO_IN, NULL); + g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, port, NULL); + g_source_attach (priv->socket_source, context); +} + +static void +unhandle_socket (PinosPort *port) +{ + PinosPortPrivate *priv = port->priv; + + g_debug ("port %p: unhandle socket", port); + if (priv->socket_source) { + g_source_destroy (priv->socket_source); + g_clear_pointer (&priv->socket_source, g_source_unref); + } +} + +/** + * pinos_port_recieve_buffer: + * @port: a #PinosPort + * @buffer: a #PinosBuffer + * @error: a #GError or %NULL + * + * Receive a buffer on @port. + * + * Returns: %TRUE if the buffer could be accepted. %FALSE if port + * already has an unconsumed buffer. + */ +gboolean +pinos_port_receive_buffer (PinosPort *port, + PinosBuffer *buffer, + GError **error) +{ + PinosPortPrivate *priv; + gboolean res; + + g_return_val_if_fail (PINOS_IS_PORT (port), FALSE); + priv = port->priv; + + if (priv->buffer) + goto buffer_queued; + + if (priv->sockets[0]) { + g_debug ("port %p: receive buffer %p write to socket", port, buffer); + res = write_buffer (priv->sockets[0], buffer, error); + } + else { + g_debug ("port %p: receive buffer %p signal", port, buffer); + res = TRUE; + priv->buffer = buffer; + if (priv->received_buffer_cb) + priv->received_buffer_cb (port, priv->received_buffer_data); + } + + return res; + + /* ERRORS */ +buffer_queued: + { + g_set_error (error, + G_IO_ERROR, + G_IO_ERROR_NOT_FOUND, + "buffer was already queued on port"); + return FALSE; + } +} + +/** + * pinos_port_peek_buffer: + * @port: a #PinosPort + * + * Check if there is a buffer on @port and peek it without dequeueing it + * from the port. + * + * Returns: a #PinosBuffer or %NULL when no buffer has arrived on the pad. + */ +PinosBuffer * +pinos_port_peek_buffer (PinosPort *port) +{ + PinosPortPrivate *priv; + + g_return_val_if_fail (PINOS_IS_PORT (port), NULL); + priv = port->priv; + + return priv->buffer; +} + +/** + * pinos_port_get_buffer: + * @port: a #PinosPort + * + * Get the buffer on @port. The buffer will no longer be queued on the port. + * + * Returns: a #PinosBuffer or %NULL when no buffer has arrived on the pad. + */ +PinosBuffer * +pinos_port_get_buffer (PinosPort *port) +{ + PinosPortPrivate *priv; + PinosBuffer *res; + + g_return_val_if_fail (PINOS_IS_PORT (port), NULL); + priv = port->priv; + + res = priv->buffer; + priv->buffer = NULL; + + return res; +} + +/** + * pinos_port_send_buffer: + * @port: a #PinosPort + * @buffer: a #PinosBuffer + * @error: a #GError or %NULL + * + * Send @buffer to ports connected to @port + * + * Returns: %TRUE on success. @error is set when %FALSE is returned. + */ +gboolean +pinos_port_send_buffer (PinosPort *port, + PinosBuffer *buffer, + GError **error) +{ + PinosPortPrivate *priv; + PinosPort *peer; + gboolean res = TRUE; + gint i; + + g_return_val_if_fail (PINOS_IS_PORT (port), FALSE); + priv = port->priv; + + if (priv->direction == PINOS_DIRECTION_OUTPUT && priv->sockets[0]) { + g_debug ("port %p: send buffer %p write to socket", port, buffer); + res = write_buffer (priv->sockets[0], buffer, error); + } + for (i = 0; i < priv->n_peers; i++) { + peer = priv->peers[i]; + if (peer == NULL) + continue; + g_debug ("port %p: send buffer %p to peer %p", port, buffer, peer); + res = pinos_port_receive_buffer (peer, buffer, error); + } + return res; +} + +/** + * pinos_port_get_socket_pair: + * @port: a #PinosPort + * @error: a #GError + * + * Create or return a previously create socket pair for @port. The + * Socket for the other end is returned. + * + * Returns: a #GSocket that can be used to send buffers to port. + */ +GSocket * +pinos_port_get_socket_pair (PinosPort *port, + GError **error) +{ + PinosPortPrivate *priv; + + g_return_val_if_fail (PINOS_IS_PORT (port), FALSE); + priv = port->priv; + + if (priv->sockets[1] == NULL) { + int fd[2]; + + if (socketpair (AF_UNIX, SOCK_STREAM, 0, fd) != 0) + goto no_sockets; + + priv->sockets[0] = g_socket_new_from_fd (fd[0], error); + if (priv->sockets[0] == NULL) + goto create_failed; + + priv->sockets[1] = g_socket_new_from_fd (fd[1], error); + if (priv->sockets[1] == NULL) + goto create_failed; + + handle_socket (port, priv->sockets[0]); + } + return g_object_ref (priv->sockets[1]); + + /* ERRORS */ +no_sockets: + { + g_set_error (error, + G_IO_ERROR, + g_io_error_from_errno (errno), + "could not create socketpair: %s", strerror (errno)); + return NULL; + } +create_failed: + { + g_clear_object (&priv->sockets[0]); + g_clear_object (&priv->sockets[1]); + return NULL; + } +} + +static void +set_format (PinosPort *port, GBytes *format) +{ + PinosPortPrivate *priv = port->priv; + + if (priv->format) + g_bytes_unref (priv->format); + priv->format = format; + + g_debug ("port %p: set format", port); + if (priv->direction == PINOS_DIRECTION_OUTPUT) { + gint i; + + for (i = 0; i < priv->n_peers; i++) { + PinosPort *peer = priv->peers[i]; + if (peer == NULL) + continue; + set_format (peer, g_bytes_ref (format)); + g_object_notify (G_OBJECT (peer), "format"); + } + } +} + +/** + * pinos_port_update_format: + * @port: a #PinosPort + * @format: a new format + * @error: a #GError or %NULL + * + * Update the format on @port. + * + * Returns: %TRUE on succes, @error is set when %FALSE is returned. + */ +gboolean +pinos_port_update_format (PinosPort *port, GBytes *format, GError **error) +{ + PinosPortPrivate *priv; + gboolean res = TRUE; + + g_return_val_if_fail (PINOS_IS_PORT (port), FALSE); + priv = port->priv; + + if (priv->format) + g_bytes_unref (priv->format); + priv->format = format; + + g_debug ("port %p: update format", port); + if (priv->direction == PINOS_DIRECTION_INPUT && priv->sockets[0]) { + PinosBufferBuilder builder; + PinosBuffer pbuf; + PinosPacketFormatChange fc; + + pinos_buffer_builder_init (&builder); + fc.id = 0; + fc.format = g_bytes_get_data (format, NULL); + pinos_buffer_builder_add_format_change (&builder, &fc); + pinos_buffer_builder_end (&builder, &pbuf); + + g_debug ("port %p: send format message %s", port, fc.format); + res = write_buffer (priv->sockets[0], &pbuf, error); + pinos_buffer_clear (&pbuf); + } else if (priv->direction == PINOS_DIRECTION_OUTPUT) { + gint i; + + for (i = 0; i < priv->n_peers; i++) { + PinosPort *peer = priv->peers[i]; + if (peer == NULL) + continue; + res = pinos_port_update_format (peer, g_bytes_ref (format), error); + } + } + return res; +} + +static void +pinos_port_get_property (GObject *_object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + PinosPort *port = PINOS_PORT (_object); + PinosPortPrivate *priv = port->priv; + + switch (prop_id) { + case PROP_NODE: + g_value_set_object (value, priv->node); + break; + + case PROP_SOCKET: + g_value_set_object (value, priv->sockets[0]); + break; + + case PROP_NAME: + g_value_set_string (value, priv->name); + break; + + case PROP_DIRECTION: + g_value_set_enum (value, priv->direction); + break; + + case PROP_POSSIBLE_FORMATS: + g_value_set_boxed (value, priv->possible_formats); + break; + + case PROP_FORMAT: + g_value_set_boxed (value, priv->format); + break; + + case PROP_PROPERTIES: + g_value_set_boxed (value, priv->properties); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (port, prop_id, pspec); + break; + } +} + +static void +pinos_port_set_property (GObject *_object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + PinosPort *port = PINOS_PORT (_object); + PinosPortPrivate *priv = port->priv; + + switch (prop_id) { + case PROP_NODE: + priv->node = g_value_get_object (value); + g_debug ("port %p: set node %p %d", port, priv->node, G_OBJECT (priv->node)->ref_count); + break; + + case PROP_SOCKET: + priv->sockets[0] = g_value_dup_object (value); + break; + + case PROP_NAME: + priv->name = g_value_dup_string (value); + break; + + case PROP_DIRECTION: + priv->direction = g_value_get_enum (value); + break; + + case PROP_POSSIBLE_FORMATS: + if (priv->possible_formats) + g_bytes_unref (priv->possible_formats); + priv->possible_formats = g_value_dup_boxed (value); + break; + + case PROP_FORMAT: + set_format (port, g_value_dup_boxed (value)); + break; + + case PROP_PROPERTIES: + if (priv->properties) + pinos_properties_free (priv->properties); + priv->properties = g_value_dup_boxed (value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (port, prop_id, pspec); + break; + } +} + +static void +pinos_port_constructed (GObject * object) +{ + PinosPort *port = PINOS_PORT (object); + PinosPortPrivate *priv = port->priv; + + g_debug ("port %p: %s port constructed, node %p %d", + port, pinos_direction_as_string (priv->direction), priv->node, G_OBJECT (priv->node)->ref_count); + + if (priv->sockets[0]) + handle_socket (port, priv->sockets[0]); + + G_OBJECT_CLASS (pinos_port_parent_class)->constructed (object); +} + +static void +pinos_port_dispose (GObject * object) +{ + PinosPort *port = PINOS_PORT (object); + PinosPortPrivate *priv = port->priv; + + g_debug ("port %p: dispose", port); + if (priv->sockets[0]) { + unhandle_socket (port); + g_clear_object (&priv->sockets[0]); + } + g_clear_object (&priv->sockets[1]); + pinos_port_unlink_all (port); + + G_OBJECT_CLASS (pinos_port_parent_class)->dispose (object); +} + +static void +pinos_port_finalize (GObject * object) +{ + PinosPort *port = PINOS_PORT (object); + PinosPortPrivate *priv = port->priv; + + g_debug ("port %p: finalize", port); + + g_free (priv->name); + g_clear_pointer (&priv->possible_formats, g_bytes_unref); + g_clear_pointer (&priv->format, g_bytes_unref); + g_clear_pointer (&priv->properties, pinos_properties_free); + if (priv->received_buffer_notify) + priv->received_buffer_notify (priv->received_buffer_data); + + G_OBJECT_CLASS (pinos_port_parent_class)->finalize (object); +} + +static void +pinos_port_class_init (PinosPortClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + g_type_class_add_private (klass, sizeof (PinosPortPrivate)); + + gobject_class->constructed = pinos_port_constructed; + gobject_class->dispose = pinos_port_dispose; + gobject_class->finalize = pinos_port_finalize; + gobject_class->set_property = pinos_port_set_property; + gobject_class->get_property = pinos_port_get_property; + + g_object_class_install_property (gobject_class, + PROP_NODE, + g_param_spec_object ("node", + "Node", + "The Node", + PINOS_TYPE_NODE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_SOCKET, + g_param_spec_object ("socket", + "Socket", + "The socket for this port", + G_TYPE_SOCKET, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_NAME, + g_param_spec_string ("name", + "Name", + "The port name", + NULL, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_DIRECTION, + g_param_spec_enum ("direction", + "Direction", + "The direction of the port", + PINOS_TYPE_DIRECTION, + PINOS_DIRECTION_INVALID, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_POSSIBLE_FORMATS, + g_param_spec_boxed ("possible-formats", + "Possible Formats", + "The possbile formats of the port", + G_TYPE_BYTES, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_FORMAT, + g_param_spec_boxed ("format", + "Format", + "The format of the port", + G_TYPE_BYTES, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_PROPERTIES, + g_param_spec_boxed ("properties", + "Properties", + "The properties of the port", + PINOS_TYPE_PROPERTIES, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT | + G_PARAM_STATIC_STRINGS)); + + + signals[SIGNAL_FORMAT_REQUEST] = g_signal_new ("format-request", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 0, + G_TYPE_NONE); + + signals[SIGNAL_REMOVE] = g_signal_new ("remove", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 0, + G_TYPE_NONE); + signals[SIGNAL_LINKED] = g_signal_new ("linked", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 1, + PINOS_TYPE_PORT); + signals[SIGNAL_UNLINKED] = g_signal_new ("unlinked", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 1, + PINOS_TYPE_PORT); +} + +static void +pinos_port_init (PinosPort * port) +{ + PinosPortPrivate *priv = port->priv = PINOS_PORT_GET_PRIVATE (port); + + priv->direction = PINOS_DIRECTION_INVALID; +} diff --git a/pinos/client/port.h b/pinos/client/port.h index 736f785ad..7d48e399b 100644 --- a/pinos/client/port.h +++ b/pinos/client/port.h @@ -29,6 +29,7 @@ typedef struct _PinosPortClass PinosPortClass; typedef struct _PinosPortPrivate PinosPortPrivate; #include +#include #define PINOS_TYPE_PORT (pinos_port_get_type ()) #define PINOS_IS_PORT(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), PINOS_TYPE_PORT)) @@ -59,15 +60,52 @@ struct _PinosPortClass { GObjectClass parent_class; }; +typedef void (*PinosReceivedBufferCallback) (PinosPort *port, gpointer user_data); + + /* normal GObject stuff */ GType pinos_port_get_type (void); -void pinos_port_remove (PinosPort *port); +void pinos_port_set_received_buffer_cb (PinosPort *port, + PinosReceivedBufferCallback cb, + gpointer user_data, + GDestroyNotify notify); -PinosNode * pinos_port_get_node (PinosPort *port); -GBytes * pinos_port_get_formats (PinosPort *port, - GBytes *filter, - GError **error); +void pinos_port_remove (PinosPort *port); + +PinosNode * pinos_port_get_node (PinosPort *port); +GSocket * pinos_port_get_socket (PinosPort *port); +const gchar * pinos_port_get_name (PinosPort *port); +PinosDirection pinos_port_get_direction (PinosPort *port); +GBytes * pinos_port_get_possible_formats (PinosPort *port); +GBytes * pinos_port_get_format (PinosPort *port); +PinosProperties * pinos_port_get_properties (PinosPort *port); + +GBytes * pinos_port_filter_formats (PinosPort *port, + GBytes *filter, + GError **error); +gboolean pinos_port_update_format (PinosPort *port, + GBytes *format, + GError **error); + +GSocket * pinos_port_get_socket_pair (PinosPort *port, + GError **error); + +gboolean pinos_port_link (PinosPort *source, + PinosPort *destination); +gboolean pinos_port_unlink (PinosPort *source, + PinosPort *destination); +gint pinos_port_get_n_links (PinosPort *port); + +gboolean pinos_port_receive_buffer (PinosPort *port, + PinosBuffer *buffer, + GError **error); +PinosBuffer * pinos_port_peek_buffer (PinosPort *port); +PinosBuffer * pinos_port_get_buffer (PinosPort *port); + +gboolean pinos_port_send_buffer (PinosPort *port, + PinosBuffer *buffer, + GError **error); G_END_DECLS diff --git a/pinos/client/private.h b/pinos/client/private.h index ce9df41b1..011e9fced 100644 --- a/pinos/client/private.h +++ b/pinos/client/private.h @@ -59,6 +59,7 @@ GDBusProxy * pinos_subscribe_get_proxy_finish (PinosSubscribe *subsc typedef struct { guint32 version; + guint32 flags; guint32 length; } PinosStackHeader; diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 3de28d4ec..b491fe95f 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -54,10 +54,6 @@ struct _PinosStreamPrivate gboolean disconnecting; PinosStreamMode mode; - GSocket *socket; - GSource *socket_source; - - PinosStackBuffer buffer; }; #define PINOS_STREAM_GET_PRIVATE(obj) \ @@ -74,7 +70,6 @@ enum PROP_STATE, PROP_POSSIBLE_FORMATS, PROP_FORMAT, - PROP_SOCKET, }; enum @@ -119,10 +114,6 @@ pinos_stream_get_property (GObject *_object, g_value_set_boxed (value, priv->format); break; - case PROP_SOCKET: - g_value_set_object (value, priv->socket); - break; - default: G_OBJECT_WARN_INVALID_PROPERTY_ID (stream, prop_id, pspec); break; @@ -153,6 +144,13 @@ pinos_stream_set_property (GObject *_object, priv->properties = g_value_dup_boxed (value); break; + case PROP_FORMAT: + if (priv->format) + g_bytes_unref (priv->format); + priv->format = g_value_dup_boxed (value); + g_object_set (priv->port, "format", priv->format, NULL); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (stream, prop_id, pspec); break; @@ -210,6 +208,9 @@ subscription_cb (PinosSubscribe *subscribe, } break; + case PINOS_SUBSCRIPTION_FLAG_PORT: + break; + default: break; } @@ -237,9 +238,7 @@ pinos_stream_finalize (GObject * object) g_debug ("free stream %p", stream); - g_clear_object (&priv->socket); g_clear_object (&priv->node); - g_clear_object (&priv->port); if (priv->possible_formats) g_bytes_unref (priv->possible_formats); @@ -258,9 +257,6 @@ pinos_stream_finalize (GObject * object) g_clear_object (&priv->context); g_free (priv->name); - g_free (priv->buffer.free_data); - g_free (priv->buffer.free_fds); - G_OBJECT_CLASS (pinos_stream_parent_class)->finalize (object); } @@ -358,25 +354,8 @@ pinos_stream_class_init (PinosStreamClass * klass) "Format", "The format of the stream", G_TYPE_BYTES, - G_PARAM_READABLE | + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - /** - * PinosStream:socket - * - * The socket of the stream. When doing pinos_stream_start() with - * #PINOS_STREAM_MODE_SOCKET, the socket will contain a data stream with - * meta data and anciliary data containing fds with the data. - */ - g_object_class_install_property (gobject_class, - PROP_SOCKET, - g_param_spec_object ("socket", - "Socket", - "The stream socket", - G_TYPE_SOCKET, - G_PARAM_READABLE | - G_PARAM_STATIC_STRINGS)); - - /** * PinosStream:new-buffer * @@ -494,6 +473,37 @@ pinos_stream_get_error (PinosStream *stream) return stream->priv->error; } +static void +on_received_buffer (PinosPort *port, + gpointer user_data) +{ + PinosStream *stream = user_data; + + g_debug ("buffer received"); + g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL); +} + +static void +on_port_notify (GObject *object, + GParamSpec *pspec, + gpointer user_data) +{ + PinosPort *port = PINOS_PORT (object); + PinosStream *stream = user_data; + PinosStreamPrivate *priv = stream->priv; + + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "format")) { + g_clear_pointer (&priv->format, g_bytes_unref); + g_object_get (port, "format", &priv->format, NULL); + g_object_notify (G_OBJECT (stream), "format"); + } + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "possible-formats")) { + g_clear_pointer (&priv->possible_formats, g_bytes_unref); + g_object_get (port, "possible-formats", &priv->possible_formats, NULL); + g_object_notify (G_OBJECT (stream), "possible-formats"); + } +} + static void on_port_created (GObject *source_object, GAsyncResult *res, @@ -511,6 +521,11 @@ on_port_created (GObject *source_object, if (priv->port == NULL) goto create_failed; + on_port_notify (G_OBJECT (priv->port), NULL, stream); + g_signal_connect (priv->port, "notify", (GCallback) on_port_notify, stream); + + pinos_port_set_received_buffer_cb (priv->port, on_received_buffer, stream, NULL); + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); g_object_unref (stream); @@ -808,186 +823,19 @@ pinos_stream_disconnect (PinosStream *stream) return TRUE; } -static gboolean -on_socket_condition (GSocket *socket, - GIOCondition condition, - gpointer user_data) -{ - PinosStream *stream = user_data; - PinosStreamPrivate *priv = stream->priv; - - switch (condition) { - case G_IO_IN: - { - gssize len; - GInputVector ivec; - PinosStackHeader *hdr; - GSocketControlMessage **messages = NULL; - gint num_messages = 0; - gint flags = 0; - gsize need; - GError *error = NULL; - gint i; - - need = sizeof (PinosStackHeader); - - if (priv->buffer.max_size < need) { - priv->buffer.max_size = need; - priv->buffer.data = priv->buffer.free_data = g_realloc (priv->buffer.free_data, need); - } - - hdr = priv->buffer.data; - - /* read header first */ - ivec.buffer = hdr; - ivec.size = sizeof (PinosStackHeader); - - len = g_socket_receive_message (socket, - NULL, - &ivec, - 1, - &messages, - &num_messages, - &flags, - NULL, - &error); - - g_assert (len == sizeof (PinosStackHeader)); - - /* now we know the total length */ - need += hdr->length; - - if (priv->buffer.max_size < need) { - priv->buffer.max_size = need; - hdr = priv->buffer.data = priv->buffer.free_data = g_realloc (priv->buffer.free_data, need); - } - priv->buffer.size = need; - - if (hdr->length > 0) { - /* read data */ - len = g_socket_receive (socket, - (gchar *)priv->buffer.data + sizeof (PinosStackHeader), - hdr->length, - NULL, - &error); - g_assert (len == hdr->length); - } - - if (priv->buffer.max_fds < num_messages) { - priv->buffer.max_fds = num_messages; - priv->buffer.fds = priv->buffer.free_fds = g_realloc (priv->buffer.free_fds, - num_messages * sizeof (int)); - } - - /* handle control messages */ - for (i = 0; i < num_messages; i++) { - GSocketControlMessage *msg = messages[i]; - gint *fds, n_fds, j; - - if (g_socket_control_message_get_msg_type (msg) != SCM_RIGHTS) - continue; - - fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (msg), &n_fds); - for (j = 0; j < n_fds; j++) - priv->buffer.fds[i] = fds[i]; - g_free (fds); - g_object_unref (msg); - } - g_free (messages); - - priv->buffer.magic = PSB_MAGIC; - - g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL); - - priv->buffer.magic = 0; - priv->buffer.size = 0; - priv->buffer.n_fds = 0; - break; - } - case G_IO_OUT: - g_warning ("can do IO\n"); - break; - - default: - break; - } - return TRUE; -} - - -static void -handle_socket (PinosStream *stream, gint fd) -{ - PinosStreamPrivate *priv = stream->priv; - GError *error = NULL; - - priv->socket = g_socket_new_from_fd (fd, &error); - if (priv->socket == NULL) - goto socket_failed; - - switch (priv->mode) { - case PINOS_STREAM_MODE_SOCKET: - g_object_notify (G_OBJECT (stream), "socket"); - break; - - case PINOS_STREAM_MODE_BUFFER: - { - priv->socket_source = g_socket_create_source (priv->socket, G_IO_IN, NULL); - g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, stream, NULL); - g_source_attach (priv->socket_source, priv->context->priv->context); - break; - } - - default: - break; - } - return; - - /* ERRORS */ -socket_failed: - { - g_warning ("failed to create socket: %s", error->message); - stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); - return; - } -} - -static void -unhandle_socket (PinosStream *stream) -{ - PinosStreamPrivate *priv = stream->priv; - - switch (priv->mode) { - case PINOS_STREAM_MODE_SOCKET: - g_clear_object (&priv->socket); - g_object_notify (G_OBJECT (stream), "socket"); - break; - - case PINOS_STREAM_MODE_BUFFER: - if (priv->socket_source) { - g_source_destroy (priv->socket_source); - g_clear_pointer (&priv->socket_source, g_source_unref); - } - break; - - default: - break; - } -} - /** - * pinos_stream_peek_buffer: + * pinos_stream_get_buffer: * @stream: a #PinosStream * @buffer: a #PinosBuffer * - * Peek the next buffer from @stream. This function should be called from + * Get the next buffer from @stream. This function should be called from * the new-buffer signal callback. * * Returns: %TRUE when @buffer contains valid information */ gboolean -pinos_stream_peek_buffer (PinosStream *stream, - PinosBuffer **buffer) +pinos_stream_get_buffer (PinosStream *stream, + PinosBuffer **buffer) { PinosStreamPrivate *priv; @@ -995,10 +843,9 @@ pinos_stream_peek_buffer (PinosStream *stream, g_return_val_if_fail (buffer != NULL, FALSE); priv = stream->priv; - g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE); - g_return_val_if_fail (is_valid_buffer (&priv->buffer), FALSE); + //g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE); - *buffer = (PinosBuffer *) &priv->buffer; + *buffer = pinos_port_get_buffer (priv->port); return TRUE; } @@ -1023,13 +870,7 @@ pinos_stream_send_buffer (PinosStream *stream, PinosBuffer *buffer) { PinosStreamPrivate *priv; - gssize len; - PinosStackBuffer *sb = (PinosStackBuffer *) buffer; - GOutputVector ovec[1]; - GSocketControlMessage *msg = NULL; - gint flags = 0; GError *error = NULL; - gint i, n_msg; g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); g_return_val_if_fail (buffer != NULL, FALSE); @@ -1037,47 +878,17 @@ pinos_stream_send_buffer (PinosStream *stream, priv = stream->priv; g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE); - ovec[0].buffer = sb->data; - ovec[0].size = sb->size; - - if (sb->n_fds) { - n_msg = 1; - msg = g_unix_fd_message_new (); - for (i = 0; i < sb->n_fds; i++) - if (!g_unix_fd_message_append_fd (G_UNIX_FD_MESSAGE (msg), sb->fds[i], &error)) - goto append_failed; - } - else { - n_msg = 0; - } - - len = g_socket_send_message (priv->socket, - NULL, - ovec, - 1, - &msg, - n_msg, - flags, - NULL, - &error); - if (len == -1) + if (!pinos_port_send_buffer (priv->port, buffer, &error)) goto send_error; - g_assert (len == (gssize) sb->size); - return TRUE; -append_failed: - { - g_warning ("failed to append fd: %s", error->message); - g_object_unref (msg); - stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); - return FALSE; - } + /* ERRORS */ send_error: { - g_warning ("failed to send_message: %s", error->message); + g_warning ("failed to send message: %s", error->message); stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); + g_clear_error (&error); return FALSE; } } diff --git a/pinos/client/stream.h b/pinos/client/stream.h index 5e6edc35d..187b54b44 100644 --- a/pinos/client/stream.h +++ b/pinos/client/stream.h @@ -106,7 +106,7 @@ gboolean pinos_stream_start (PinosStream *stream, PinosStreamMode mode); gboolean pinos_stream_stop (PinosStream *stream); -gboolean pinos_stream_peek_buffer (PinosStream *stream, +gboolean pinos_stream_get_buffer (PinosStream *stream, PinosBuffer **buffer); gboolean pinos_stream_send_buffer (PinosStream *stream, PinosBuffer *buffer); diff --git a/pinos/client/subscribe.c b/pinos/client/subscribe.c index 37a63e8a8..3b235df10 100644 --- a/pinos/client/subscribe.c +++ b/pinos/client/subscribe.c @@ -786,10 +786,12 @@ pinos_subscribe_get_proxy (PinosSubscribe *subscribe, if (data->pending) { data->tasks = g_list_prepend (data->tasks, task); - } else if (data->proxy) { - g_task_return_pointer (task, g_object_ref (data->proxy), g_object_unref); - } else { - g_task_return_error (task, NULL); + } else { + if (data->proxy) + g_task_return_pointer (task, g_object_ref (data->proxy), g_object_unref); + else + g_task_return_error (task, NULL); + g_object_unref (task); } break; } diff --git a/pinos/gst/gstpinos.c b/pinos/gst/gstpinos.c index aa1901bd3..23c833762 100644 --- a/pinos/gst/gstpinos.c +++ b/pinos/gst/gstpinos.c @@ -33,6 +33,8 @@ #endif #include "gstpinossocketsink.h" +#include "gstpinosportsink.h" +#include "gstpinosportsrc.h" #include "gstpinossrc.h" #include "gstpinossink.h" #include "gstpinosdeviceprovider.h" @@ -54,6 +56,10 @@ plugin_init (GstPlugin * plugin) GST_TYPE_PINOS_SINK); gst_element_register (plugin, "pinossocketsink", GST_RANK_NONE, GST_TYPE_PINOS_SOCKET_SINK); + gst_element_register (plugin, "pinosportsink", GST_RANK_NONE, + GST_TYPE_PINOS_PORT_SINK); + gst_element_register (plugin, "pinosportsrc", GST_RANK_NONE, + GST_TYPE_PINOS_PORT_SRC); if (!gst_device_provider_register (plugin, "pinosdeviceprovider", GST_RANK_PRIMARY + 1, GST_TYPE_PINOS_DEVICE_PROVIDER)) diff --git a/pinos/gst/gstpinosportsink.c b/pinos/gst/gstpinosportsink.c new file mode 100644 index 000000000..2f079b705 --- /dev/null +++ b/pinos/gst/gstpinosportsink.c @@ -0,0 +1,388 @@ +/* GStreamer + * Copyright (C) <2016> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-pinosportsink + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include "gstpinosportsink.h" +#include "gsttmpfileallocator.h" + +static GQuark fdids_quark; +static GQuark orig_buffer_quark; + +GST_DEBUG_CATEGORY_STATIC (pinos_port_sink_debug); +#define GST_CAT_DEFAULT pinos_port_sink_debug + +static GstStaticPadTemplate gst_pinos_port_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY + ); + +enum +{ + PROP_0, + PROP_PORT, +}; + +#define gst_pinos_port_sink_parent_class parent_class +G_DEFINE_TYPE (GstPinosPortSink, gst_pinos_port_sink, GST_TYPE_BASE_SINK); + +static gboolean +gst_pinos_port_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query) +{ + GstPinosPortSink *this = GST_PINOS_PORT_SINK (bsink); + + gst_query_add_allocation_param (query, this->allocator, NULL); + gst_query_add_allocation_meta (query, GST_NET_CONTROL_MESSAGE_META_API_TYPE, + NULL); + + return TRUE; +} + +static void +on_received_buffer (PinosPort *port, gpointer user_data) +{ + GstPinosPortSink *this = user_data; + GstEvent *ev; + PinosBuffer *pbuf; + PinosBufferIter it; + PinosBufferBuilder b; + gboolean have_out = FALSE; + + pbuf = pinos_port_get_buffer (port); + + if (this->pinos_input) { + pinos_buffer_builder_init (&b); + } + + pinos_buffer_iter_init (&it, pbuf); + while (pinos_buffer_iter_next (&it)) { + switch (pinos_buffer_iter_get_type (&it)) { + case PINOS_PACKET_TYPE_REFRESH_REQUEST: + { + PinosPacketRefreshRequest p; + + if (!pinos_buffer_iter_parse_refresh_request (&it, &p)) + continue; + + GST_LOG ("refresh request"); + if (!this->pinos_input) { + gst_pad_push_event (GST_BASE_SINK_PAD (this), + gst_video_event_new_upstream_force_key_unit (p.pts, + p.request_type == 1, 0)); + } else { + pinos_buffer_builder_add_refresh_request (&b, &p); + have_out = TRUE; + } + break; + } + default: + break; + } + } + pinos_buffer_clear (pbuf); + + if (this->pinos_input) { + GstBuffer *outbuf; + gsize size; + gpointer data; + + if (have_out) { + pinos_buffer_builder_end (&b, pbuf); + + data = pinos_buffer_steal_data (pbuf, &size); + + outbuf = gst_buffer_new_wrapped (data, size); + ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstNetworkMessage", + "object", G_TYPE_OBJECT, this, + "buffer", GST_TYPE_BUFFER, outbuf, NULL)); + gst_buffer_unref (outbuf); + + gst_pad_push_event (GST_BASE_SINK_PAD (this), ev); + } else { + pinos_buffer_builder_clear (&b); + } + } +} + +static void +set_port (GstPinosPortSink *this, PinosPort *port) +{ + if (this->port) + g_object_unref (this->port); + this->port = port; + + pinos_port_set_received_buffer_cb (port, on_received_buffer, this, NULL); +} + +static void +gst_pinos_port_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstPinosPortSink *this = GST_PINOS_PORT_SINK (object); + + switch (prop_id) { + case PROP_PORT: + set_port (this, g_value_dup_object (value)); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_pinos_port_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstPinosPortSink *this = GST_PINOS_PORT_SINK (object); + + switch (prop_id) { + case PROP_PORT: + g_value_set_object (value, this->port); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstCaps * +gst_pinos_port_sink_getcaps (GstBaseSink * bsink, GstCaps * filter) +{ + GstPinosPortSink *this = GST_PINOS_PORT_SINK (bsink); + GBytes *filt, *formats; + gchar *cstr; + GstCaps *result; + + if (filter) { + cstr = gst_caps_to_string (filter); + filt = g_bytes_new_take (cstr, strlen (cstr) + 1); + } else { + filt = NULL; + } + + formats = pinos_port_filter_formats (this->port, filt, NULL); + + if (filt) + g_bytes_unref (filt); + + if (formats) { + result = gst_caps_from_string (g_bytes_get_data (formats, NULL)); + g_bytes_unref (formats); + } else { + result = gst_caps_new_empty (); + } + + return result; +} + +static gboolean +gst_pinos_port_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) +{ + GstPinosPortSink *this = GST_PINOS_PORT_SINK (bsink); + GstStructure *str; + gchar *cstr; + + str = gst_caps_get_structure (caps, 0); + this->pinos_input = gst_structure_has_name (str, "application/x-pinos"); + if (!this->pinos_input) { + GBytes *format; + GError *error = NULL; + + cstr = gst_caps_to_string (caps); + format = g_bytes_new_take (cstr, strlen (cstr) + 1); + + if (!pinos_port_update_format (this->port, format, &error)) { + GST_WARNING ("update failed: %s", error->message); + g_clear_error (&error); + } + } + + return GST_BASE_SINK_CLASS (parent_class)->set_caps (bsink, caps); +} + +static GstFlowReturn +gst_pinos_port_sink_render_pinos (GstPinosPortSink * this, GstBuffer * buffer) +{ + GstMapInfo info; + PinosBuffer pbuf; + GError *error = NULL; + + gst_buffer_map (buffer, &info, GST_MAP_READ); + pinos_buffer_init_data (&pbuf, info.data, info.size, NULL, 0); + gst_buffer_unmap (buffer, &info); + + if (!pinos_port_send_buffer (this->port, &pbuf, &error)) { + GST_WARNING ("send failed: %s", error->message); + g_clear_error (&error); + } + + return GST_FLOW_OK; +} + +static GstMemory * +gst_pinos_port_sink_get_fd_memory (GstPinosPortSink * this, GstBuffer * buffer, gboolean *tmpfile) +{ + GstMemory *mem = NULL; + + if (gst_buffer_n_memory (buffer) == 1 + && gst_is_fd_memory (gst_buffer_peek_memory (buffer, 0))) { + mem = gst_buffer_get_memory (buffer, 0); + *tmpfile = gst_is_tmpfile_memory (mem); + } else { + GstMapInfo info; + GstAllocationParams params = {0, 0, 0, 0, { NULL, }}; + gsize size = gst_buffer_get_size (buffer); + GST_INFO_OBJECT (this, "Buffer cannot be sent without copying"); + mem = gst_allocator_alloc (this->allocator, size, ¶ms); + if (!gst_memory_map (mem, &info, GST_MAP_WRITE)) + return NULL; + gst_buffer_extract (buffer, 0, info.data, size); + gst_memory_unmap (mem, &info); + *tmpfile = TRUE; + } + return mem; +} + +static GstFlowReturn +gst_pinos_port_sink_render_other (GstPinosPortSink * this, GstBuffer * buffer) +{ + GstMemory *fdmem = NULL; + GError *error = NULL; + PinosBuffer pbuf; + PinosBufferBuilder builder; + PinosPacketHeader hdr; + PinosPacketFDPayload p; + gboolean tmpfile = TRUE; + + hdr.flags = 0; + hdr.seq = GST_BUFFER_OFFSET (buffer); + hdr.pts = GST_BUFFER_PTS (buffer) + GST_ELEMENT_CAST (this)->base_time; + hdr.dts_offset = 0; + + pinos_buffer_builder_init (&builder); + pinos_buffer_builder_add_header (&builder, &hdr); + + fdmem = gst_pinos_port_sink_get_fd_memory (this, buffer, &tmpfile); + p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); + p.id = pinos_fd_manager_get_id (this->fdmanager); + p.offset = fdmem->offset; + p.size = fdmem->size; + pinos_buffer_builder_add_fd_payload (&builder, &p); + + GST_LOG ("send %d %"G_GUINT64_FORMAT" %"G_GUINT64_FORMAT" %"G_GUINT64_FORMAT, + p.id, hdr.pts, GST_BUFFER_PTS (buffer), GST_ELEMENT_CAST (this)->base_time); + + pinos_buffer_builder_end (&builder, &pbuf); + + if (!pinos_port_send_buffer (this->port, &pbuf, &error)) { + GST_WARNING ("send failed: %s", error->message); + g_clear_error (&error); + } + + gst_memory_unref(fdmem); + + return GST_FLOW_OK; +} + +static GstFlowReturn +gst_pinos_port_sink_render (GstBaseSink * bsink, GstBuffer * buffer) +{ + GstPinosPortSink *this = GST_PINOS_PORT_SINK (bsink); + + if (this->pinos_input) + return gst_pinos_port_sink_render_pinos (this, buffer); + else + return gst_pinos_port_sink_render_other (this, buffer); +} + +static void +gst_pinos_port_sink_finalize (GObject * object) +{ + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_pinos_port_sink_class_init (GstPinosPortSinkClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + GstBaseSinkClass *gstbasesink_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + gstbasesink_class = (GstBaseSinkClass *) klass; + + gobject_class->finalize = gst_pinos_port_sink_finalize; + gobject_class->set_property = gst_pinos_port_sink_set_property; + gobject_class->get_property = gst_pinos_port_sink_get_property; + + g_object_class_install_property (gobject_class, PROP_PORT, + g_param_spec_object ("port", "Port", + "The pinos port object", PINOS_TYPE_PORT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_set_static_metadata (gstelement_class, + "Pinos Port sink", "Sink/Video", + "Send data to pinos port", "Wim Taymans "); + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&gst_pinos_port_sink_template)); + + gstbasesink_class->get_caps = gst_pinos_port_sink_getcaps; + gstbasesink_class->set_caps = gst_pinos_port_sink_setcaps; + gstbasesink_class->propose_allocation = gst_pinos_port_sink_propose_allocation; + gstbasesink_class->render = gst_pinos_port_sink_render; + + fdids_quark = g_quark_from_static_string ("GstPinosPortSinkFDIds"); + orig_buffer_quark = g_quark_from_static_string ("GstPinosPortSinkOrigBuffer"); + + GST_DEBUG_CATEGORY_INIT (pinos_port_sink_debug, "pinosportsink", 0, + "Pinos Socket Sink"); +} + +static void +gst_pinos_port_sink_init (GstPinosPortSink * this) +{ + this->allocator = gst_tmpfile_allocator_new (); + this->fdmanager = pinos_fd_manager_get (PINOS_FD_MANAGER_DEFAULT); +} diff --git a/pinos/gst/gstpinosportsink.h b/pinos/gst/gstpinosportsink.h new file mode 100644 index 000000000..57a251a3d --- /dev/null +++ b/pinos/gst/gstpinosportsink.h @@ -0,0 +1,65 @@ +/* GStreamer + * Copyright (C) <2016> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_PINOS_PORT_SINK_H__ +#define __GST_PINOS_PORT_SINK_H__ + +#include + +#include + +#include +#include + +G_BEGIN_DECLS + +#define GST_TYPE_PINOS_PORT_SINK (gst_pinos_port_sink_get_type()) +#define GST_PINOS_PORT_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PINOS_PORT_SINK,GstPinosPortSink)) +#define GST_PINOS_PORT_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PINOS_PORT_SINK,GstPinosPortSinkClass)) +#define GST_IS_PINOS_PORT_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PINOS_PORT_SINK)) +#define GST_IS_PINOS_PORT_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PINOS_PORT_SINK)) +#define GST_PINOS_PORT_SINK_CAST(obj) ((GstPinosPortSink *) (obj)) + +typedef struct _GstPinosPortSink GstPinosPortSink; +typedef struct _GstPinosPortSinkClass GstPinosPortSinkClass; + +/** + * GstPinosPortSink: + * + * Opaque data structure. + */ +struct _GstPinosPortSink { + GstBaseSink element; + + gboolean pinos_input; + GstAllocator *allocator; + + PinosPort *port; + PinosFdManager *fdmanager; +}; + +struct _GstPinosPortSinkClass { + GstBaseSinkClass parent_class; +}; + +GType gst_pinos_port_sink_get_type (void); + +G_END_DECLS + +#endif /* __GST_PINOS_PORT_SINK_H__ */ diff --git a/pinos/gst/gstpinosportsrc.c b/pinos/gst/gstpinosportsrc.c new file mode 100644 index 000000000..d9c7415a9 --- /dev/null +++ b/pinos/gst/gstpinosportsrc.c @@ -0,0 +1,635 @@ +/* GStreamer + * Copyright (C) <2015> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-pinosportsrc + * + * + * Example launch line + * |[ + * gst-launch -v pinosportsrc ! videoconvert ! ximagesink + * ]| Shows pinos output in an X window. + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include "gstpinosportsrc.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + + +static GQuark fdpayload_data_quark; + +GST_DEBUG_CATEGORY_STATIC (pinos_port_src_debug); +#define GST_CAT_DEFAULT pinos_port_src_debug + +enum +{ + PROP_0, + PROP_PORT, +}; + + +#define PINOSS_VIDEO_CAPS GST_VIDEO_CAPS_MAKE (GST_VIDEO_FORMATS_ALL) + +static GstStaticPadTemplate gst_pinos_port_src_template = +GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY + ); + +#define gst_pinos_port_src_parent_class parent_class +G_DEFINE_TYPE (GstPinosPortSrc, gst_pinos_port_src, GST_TYPE_PUSH_SRC); + +static GstStateChangeReturn +gst_pinos_port_src_change_state (GstElement * element, GstStateChange transition); + +static GstCaps *gst_pinos_port_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter); + +static GstFlowReturn gst_pinos_port_src_create (GstPushSrc * psrc, + GstBuffer ** buffer); +static gboolean gst_pinos_port_src_start (GstBaseSrc * basesrc); +static gboolean gst_pinos_port_src_stop (GstBaseSrc * basesrc); +static gboolean gst_pinos_port_src_event (GstBaseSrc * src, GstEvent * event); +static gboolean gst_pinos_port_src_query (GstBaseSrc * src, GstQuery * query); + +typedef struct { + GstPinosPortSrc *src; + PinosPacketFDPayload p; +} FDPayloadData; + +static void +fdpayload_data_destroy (gpointer user_data) +{ + FDPayloadData *data = user_data; + GstPinosPortSrc *this = data->src; + PinosBufferBuilder b; + PinosPacketReleaseFDPayload r; + PinosBuffer pbuf; + + r.id = data->p.id; + + GST_DEBUG_OBJECT (this, "destroy %d", r.id); + + pinos_buffer_builder_init (&b); + pinos_buffer_builder_add_release_fd_payload (&b, &r); + pinos_buffer_builder_end (&b, &pbuf); + + pinos_port_send_buffer (this->port, &pbuf, NULL); + pinos_buffer_clear (&pbuf); + + gst_object_unref (this); + g_slice_free (FDPayloadData, data); +} + +static void +on_received_buffer (PinosPort *port, + gpointer user_data) +{ + GstPinosPortSrc *this = user_data; + PinosBuffer *pbuf; + PinosBufferIter it; + GstBuffer *buf = NULL; + + GST_LOG_OBJECT (this, "got new buffer"); + pbuf = pinos_port_get_buffer (port); + + pinos_buffer_iter_init (&it, pbuf); + while (pinos_buffer_iter_next (&it)) { + switch (pinos_buffer_iter_get_type (&it)) { + case PINOS_PACKET_TYPE_HEADER: + { + PinosPacketHeader hdr; + + if (!pinos_buffer_iter_parse_header (&it, &hdr)) + goto parse_failed; + + if (buf == NULL) + buf = gst_buffer_new (); + + GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT, hdr.pts, hdr.dts_offset); + + if (GST_CLOCK_TIME_IS_VALID (hdr.pts)) { + GST_BUFFER_PTS (buf) = hdr.pts; + if (GST_BUFFER_PTS (buf) + hdr.dts_offset > 0) + GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + hdr.dts_offset; + } + GST_BUFFER_OFFSET (buf) = hdr.seq; + break; + } + case PINOS_PACKET_TYPE_FD_PAYLOAD: + { + GstMemory *fdmem = NULL; + FDPayloadData data; + int fd; + + if (!pinos_buffer_iter_parse_fd_payload (&it, &data.p)) + goto parse_failed; + + GST_DEBUG ("got fd payload id %d", data.p.id); + fd = pinos_buffer_get_fd (pbuf, data.p.fd_index); + if (fd == -1) + goto no_fds; + + if (buf == NULL) + buf = gst_buffer_new (); + + fdmem = gst_fd_allocator_alloc (this->fd_allocator, fd, + data.p.offset + data.p.size, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (fdmem, data.p.offset, data.p.size); + gst_buffer_append_memory (buf, fdmem); + + data.src = gst_object_ref (this); + gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (fdmem), + fdpayload_data_quark, + g_slice_dup (FDPayloadData, &data), + fdpayload_data_destroy); + break; + } + case PINOS_PACKET_TYPE_FORMAT_CHANGE: + { + PinosPacketFormatChange change; + GstCaps *caps; + + if (!pinos_buffer_iter_parse_format_change (&it, &change)) + goto parse_failed; + GST_DEBUG ("got format change %d %s", change.id, change.format); + + caps = gst_caps_from_string (change.format); + gst_base_src_set_caps (GST_BASE_SRC (this), caps); + gst_caps_unref (caps); + break; + } + default: + break; + } + } + if (buf) { + g_queue_push_tail (&this->queue, buf); + g_cond_signal (&this->cond); + } + + return; + + /* ERRORS */ +parse_failed: + { + gst_buffer_unref (buf); + GST_ELEMENT_ERROR (this, RESOURCE, FAILED, ("buffer parse failure"), (NULL)); + return; + } +no_fds: + { + gst_buffer_unref (buf); + GST_ELEMENT_ERROR (this, RESOURCE, FAILED, ("fd not found in buffer"), (NULL)); + return; + } + +} + +static void +set_port (GstPinosPortSrc *this, PinosPort *port) +{ + g_debug ("set port %p", port); + + if (this->port) + g_object_unref (this->port); + this->port = port; + + pinos_port_set_received_buffer_cb (port, on_received_buffer, this, NULL); +} + +static void +gst_pinos_port_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstPinosPortSrc *this = GST_PINOS_PORT_SRC (object); + + switch (prop_id) { + case PROP_PORT: + set_port (this, g_value_dup_object (value)); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_pinos_port_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstPinosPortSrc *this = GST_PINOS_PORT_SRC (object); + + switch (prop_id) { + case PROP_PORT: + g_value_set_object (value, this->port); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstClock * +gst_pinos_port_src_provide_clock (GstElement * elem) +{ + GstPinosPortSrc *this = GST_PINOS_PORT_SRC (elem); + GstClock *clock; + + GST_OBJECT_LOCK (this); + if (!GST_OBJECT_FLAG_IS_SET (this, GST_ELEMENT_FLAG_PROVIDE_CLOCK)) + goto clock_disabled; + + if (this->clock) + clock = GST_CLOCK_CAST (gst_object_ref (this->clock)); + else + clock = NULL; + GST_OBJECT_UNLOCK (this); + + return clock; + + /* ERRORS */ +clock_disabled: + { + GST_DEBUG_OBJECT (this, "clock provide disabled"); + GST_OBJECT_UNLOCK (this); + return NULL; + } +} + +static gboolean +gst_pinos_port_src_unlock (GstBaseSrc * basesrc) +{ + GstPinosPortSrc *this = GST_PINOS_PORT_SRC (basesrc); + + GST_DEBUG_OBJECT (this, "setting flushing"); + + GST_OBJECT_LOCK (this); + this->flushing = TRUE; + g_cond_signal (&this->cond); + GST_OBJECT_UNLOCK (this); + + return TRUE; +} + +static gboolean +gst_pinos_port_src_unlock_stop (GstBaseSrc * basesrc) +{ + GstPinosPortSrc *this = GST_PINOS_PORT_SRC (basesrc); + + GST_DEBUG_OBJECT (this, "unsetting flushing"); + this->flushing = FALSE; + + return TRUE; +} + +static void +gst_pinos_port_src_finalize (GObject * object) +{ + GstPinosPortSrc *this = GST_PINOS_PORT_SRC (object); + + g_queue_foreach (&this->queue, (GFunc) gst_mini_object_unref, NULL); + g_queue_clear (&this->queue); + g_cond_clear (&this->cond); + g_object_unref (this->fd_allocator); + g_object_unref (this->port); + if (this->clock) + gst_object_unref (this->clock); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_pinos_port_src_class_init (GstPinosPortSrcClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + GstBaseSrcClass *gstbasesrc_class; + GstPushSrcClass *gstpushsrc_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + gstbasesrc_class = (GstBaseSrcClass *) klass; + gstpushsrc_class = (GstPushSrcClass *) klass; + + gobject_class->finalize = gst_pinos_port_src_finalize; + gobject_class->set_property = gst_pinos_port_src_set_property; + gobject_class->get_property = gst_pinos_port_src_get_property; + + g_object_class_install_property (gobject_class, PROP_PORT, + g_param_spec_object ("port", "Port", + "The pinos port object", + PINOS_TYPE_PORT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + + gstelement_class->provide_clock = gst_pinos_port_src_provide_clock; + gstelement_class->change_state = gst_pinos_port_src_change_state; + + gst_element_class_set_static_metadata (gstelement_class, + "Pinos source", "Source/Video", + "Uses pinos to create video", "Wim Taymans "); + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&gst_pinos_port_src_template)); + + gstbasesrc_class->get_caps = gst_pinos_port_src_getcaps; + gstbasesrc_class->unlock = gst_pinos_port_src_unlock; + gstbasesrc_class->unlock_stop = gst_pinos_port_src_unlock_stop; + gstbasesrc_class->start = gst_pinos_port_src_start; + gstbasesrc_class->stop = gst_pinos_port_src_stop; + gstbasesrc_class->event = gst_pinos_port_src_event; + gstbasesrc_class->query = gst_pinos_port_src_query; + gstpushsrc_class->create = gst_pinos_port_src_create; + + GST_DEBUG_CATEGORY_INIT (pinos_port_src_debug, "pinosportsrc", 0, + "Pinos Source"); + + fdpayload_data_quark = g_quark_from_static_string ("GstPinosPortSrcFDPayloadQuark"); +} + +static void +gst_pinos_port_src_init (GstPinosPortSrc * src) +{ + /* we operate in time */ + gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME); + + GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + + g_cond_init (&src->cond); + g_queue_init (&src->queue); + + src->fd_allocator = gst_fd_allocator_new (); +} + +static void +parse_stream_properties (GstPinosPortSrc *this, PinosProperties *props) +{ + const gchar *var; + + var = pinos_properties_get (props, "pinos.latency.is-live"); + this->is_live = var ? (atoi (var) == 1) : FALSE; + gst_base_src_set_live (GST_BASE_SRC (this), this->is_live); + + var = pinos_properties_get (props, "pinos.latency.min"); + this->min_latency = var ? (GstClockTime) atoi (var) : 0; + + var = pinos_properties_get (props, "pinos.latency.max"); + this->max_latency = var ? (GstClockTime) atoi (var) : GST_CLOCK_TIME_NONE; + + var = pinos_properties_get (props, "pinos.clock.type"); + if (var != NULL) { + GST_DEBUG_OBJECT (this, "got clock type %s", var); + if (strcmp (var, "gst.net.time.provider") == 0) { + const gchar *address; + gint port; + GstClockTime base_time; + + address = pinos_properties_get (props, "pinos.clock.address"); + port = atoi (pinos_properties_get (props, "pinos.clock.port")); + base_time = atoll (pinos_properties_get (props, "pinos.clock.base-time")); + + GST_DEBUG_OBJECT (this, "making net clock for %s:%d %" G_GUINT64_FORMAT, address, port, base_time); + if (this->clock) + gst_object_unref (this->clock); + this->clock = gst_net_client_clock_new ("pinosclock", address, port, base_time); + + gst_element_post_message (GST_ELEMENT_CAST (this), + gst_message_new_clock_provide (GST_OBJECT_CAST (this), + this->clock, TRUE)); + } + } +} + +static GstCaps * +gst_pinos_port_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter) +{ + GstPinosPortSrc *this = GST_PINOS_PORT_SRC (bsrc); + GBytes *format; + GstCaps *caps = NULL; + + GST_DEBUG ("getting caps"); + + g_object_get (this->port, "format", &format, NULL); + if (format) { + GST_DEBUG ("have format %s", g_bytes_get_data (format, NULL)); + caps = gst_caps_from_string (g_bytes_get_data (format, NULL)); + g_bytes_unref (format); + } + return caps; +} + +static gboolean +gst_pinos_port_src_event (GstBaseSrc * src, GstEvent * event) +{ + gboolean res = FALSE; + GstPinosPortSrc *this; + + this = GST_PINOS_PORT_SRC (src); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_UPSTREAM: + if (gst_video_event_is_force_key_unit (event)) { + GstClockTime running_time; + gboolean all_headers; + guint count; + PinosPacketRefreshRequest refresh; + PinosBufferBuilder b; + PinosBuffer pbuf; + + gst_video_event_parse_upstream_force_key_unit (event, + &running_time, &all_headers, &count); + + refresh.last_id = 0; + refresh.request_type = all_headers ? 1 : 0; + refresh.pts = running_time; + + pinos_buffer_builder_init (&b); + pinos_buffer_builder_add_refresh_request (&b, &refresh); + pinos_buffer_builder_end (&b, &pbuf); + + GST_DEBUG_OBJECT (this, "send refresh request"); + pinos_port_send_buffer (this->port, &pbuf, NULL); + pinos_buffer_clear (&pbuf); + res = TRUE; + } else { + res = GST_BASE_SRC_CLASS (parent_class)->event (src, event); + } + break; + default: + res = GST_BASE_SRC_CLASS (parent_class)->event (src, event); + break; + } + return res; +} + +static gboolean +gst_pinos_port_src_query (GstBaseSrc * src, GstQuery * query) +{ + gboolean res = FALSE; + GstPinosPortSrc *this; + + this = GST_PINOS_PORT_SRC (src); + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_LATENCY: + gst_query_set_latency (query, this->is_live, this->min_latency, this->max_latency); + res = TRUE; + break; + default: + res = GST_BASE_SRC_CLASS (parent_class)->query (src, query); + break; + } + return res; +} + +static GstFlowReturn +gst_pinos_port_src_create (GstPushSrc * psrc, GstBuffer ** buffer) +{ + GstPinosPortSrc *this; + GstClockTime pts, dts, base_time; + + this = GST_PINOS_PORT_SRC (psrc); + + GST_OBJECT_LOCK (this); + while (TRUE) { + if (this->flushing) + goto streaming_stopped; + + *buffer = g_queue_pop_head (&this->queue); + if (*buffer != NULL) + break; + + g_cond_wait (&this->cond, GST_OBJECT_GET_LOCK (this)); + } + GST_OBJECT_UNLOCK (this); + + base_time = GST_ELEMENT_CAST (psrc)->base_time; + pts = GST_BUFFER_PTS (*buffer); + dts = GST_BUFFER_DTS (*buffer); + + if (GST_CLOCK_TIME_IS_VALID (pts)) + pts = (pts >= base_time ? pts - base_time : 0); + if (GST_CLOCK_TIME_IS_VALID (dts)) + dts = (dts >= base_time ? dts - base_time : 0); + + GST_INFO ("pts %" G_GUINT64_FORMAT ", dts %"G_GUINT64_FORMAT + ", base-time %"GST_TIME_FORMAT" -> %"GST_TIME_FORMAT", %"GST_TIME_FORMAT, + GST_BUFFER_PTS (*buffer), GST_BUFFER_DTS (*buffer), GST_TIME_ARGS (base_time), + GST_TIME_ARGS (pts), GST_TIME_ARGS (dts)); + + GST_BUFFER_PTS (*buffer) = pts; + GST_BUFFER_DTS (*buffer) = dts; + + return GST_FLOW_OK; + +streaming_stopped: + { + GST_OBJECT_UNLOCK (this); + return GST_FLOW_FLUSHING; + } +} + +static gboolean +gst_pinos_port_src_start (GstBaseSrc * basesrc) +{ + PinosProperties *props; + GstPinosPortSrc *this; + + this = GST_PINOS_PORT_SRC (basesrc); + + props = pinos_port_get_properties (this->port); + if (props) + parse_stream_properties (this, props); + + return TRUE; +} + +static void +clear_queue (GstPinosPortSrc *this) +{ + g_queue_foreach (&this->queue, (GFunc) gst_mini_object_unref, NULL); + g_queue_clear (&this->queue); +} + +static gboolean +gst_pinos_port_src_stop (GstBaseSrc * basesrc) +{ + GstPinosPortSrc *this; + + this = GST_PINOS_PORT_SRC (basesrc); + + clear_queue (this); + + return TRUE; +} + +static GstStateChangeReturn +gst_pinos_port_src_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + /* uncork and start recording */ + break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + /* stop recording ASAP by corking */ + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + if (gst_base_src_is_live (GST_BASE_SRC (element))) + ret = GST_STATE_CHANGE_NO_PREROLL; + break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_NULL: + break; + default: + break; + } + return ret; +} diff --git a/pinos/gst/gstpinosportsrc.h b/pinos/gst/gstpinosportsrc.h new file mode 100644 index 000000000..a135e59b9 --- /dev/null +++ b/pinos/gst/gstpinosportsrc.h @@ -0,0 +1,80 @@ +/* GStreamer + * Copyright (C) <2015> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_PINOS_PORT_SRC_H__ +#define __GST_PINOS_PORT_SRC_H__ + +#include +#include + +#include + +G_BEGIN_DECLS + +#define GST_TYPE_PINOS_PORT_SRC \ + (gst_pinos_port_src_get_type()) +#define GST_PINOS_PORT_SRC(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PINOS_PORT_SRC,GstPinosPortSrc)) +#define GST_PINOS_PORT_SRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PINOS_PORT_SRC,GstPinosPortSrcClass)) +#define GST_IS_PINOS_PORT_SRC(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PINOS_PORT_SRC)) +#define GST_IS_PINOS_PORT_SRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PINOS_PORT_SRC)) +#define GST_PINOS_PORT_SRC_CAST(obj) \ + ((GstPinosPortSrc *) (obj)) + +typedef struct _GstPinosPortSrc GstPinosPortSrc; +typedef struct _GstPinosPortSrcClass GstPinosPortSrcClass; + +/** + * GstPinosPortSrc: + * + * Opaque data structure. + */ +struct _GstPinosPortSrc { + GstPushSrc element; + + /*< private >*/ + PinosPort *port; + + gboolean negotiated; + gboolean flushing; + gboolean started; + + gboolean is_live; + GstClockTime min_latency; + GstClockTime max_latency; + GstClock *clock; + + GstAllocator *fd_allocator; + + GQueue queue; + GCond cond; +}; + +struct _GstPinosPortSrcClass { + GstPushSrcClass parent_class; +}; + +GType gst_pinos_port_src_get_type (void); + +G_END_DECLS + +#endif /* __GST_PINOS_PORT_SRC_H__ */ diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index e06409eea..da9894947 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -337,7 +337,7 @@ on_new_buffer (GObject *gobject, return; } - if (!pinos_stream_peek_buffer (pinossink->stream, &pbuf)) { + if (!pinos_stream_get_buffer (pinossink->stream, &pbuf)) { g_warning ("failed to capture buffer"); return; } @@ -430,7 +430,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) g_bytes_ref (format)); } else { pinos_stream_connect (pinossink->stream, - PINOS_DIRECTION_INPUT, + PINOS_DIRECTION_OUTPUT, pinossink->path, 0, g_bytes_ref (format)); @@ -449,21 +449,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) } } - if (state == PINOS_STREAM_STATE_STREAMING) { - PinosBufferBuilder builder; - PinosPacketFormatChange change; - PinosBuffer pbuf; - - pinos_buffer_builder_init (&builder); - - change.id = 1; - change.format = g_bytes_get_data (format, NULL); - pinos_buffer_builder_add_format_change (&builder, &change); - pinos_buffer_builder_end (&builder, &pbuf); - - res = pinos_stream_send_buffer (pinossink->stream, &pbuf); - pinos_buffer_clear (&pbuf); - } else { + if (state != PINOS_STREAM_STATE_STREAMING) { res = pinos_stream_start (pinossink->stream, g_bytes_ref (format), PINOS_STREAM_MODE_BUFFER); @@ -480,6 +466,23 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) pinos_main_loop_wait (pinossink->loop); } } + { + PinosBufferBuilder builder; + PinosPacketFormatChange change; + PinosBuffer pbuf; + + pinos_buffer_builder_init (&builder); + + change.id = 1; + change.format = g_bytes_get_data (format, NULL); + pinos_buffer_builder_add_format_change (&builder, &change); + pinos_buffer_builder_end (&builder, &pbuf); + + g_debug ("sending format"); + res = pinos_stream_send_buffer (pinossink->stream, &pbuf); + pinos_buffer_clear (&pbuf); + } + pinos_main_loop_unlock (pinossink->loop); g_bytes_unref (format); @@ -508,6 +511,7 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) PinosPacketFDPayload p; gsize size; gboolean tmpfile, res; + gint fd; pinossink = GST_PINOS_SINK (bsink); @@ -551,14 +555,15 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) pinos_buffer_builder_init (&builder); pinos_buffer_builder_add_header (&builder, &hdr); - p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (mem)); + fd = dup (gst_fd_memory_get_fd (mem)); + p.fd_index = pinos_buffer_builder_add_fd (&builder, fd); p.id = pinossink->id_counter++; p.offset = 0; p.size = size; pinos_buffer_builder_add_fd_payload (&builder, &p); pinos_buffer_builder_end (&builder, &pbuf); - GST_LOG ("sending fd index %d", p.id); + GST_LOG ("sending fd index %d %d %d", p.id, p.fd_index, fd); pinos_main_loop_lock (pinossink->loop); if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING) diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index 144cd4e0e..012b74371 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -75,7 +75,6 @@ static GstStateChangeReturn gst_pinos_src_change_state (GstElement * element, GstStateChange transition); static gboolean gst_pinos_src_negotiate (GstBaseSrc * basesrc); -static GstCaps *gst_pinos_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter); static GstCaps *gst_pinos_src_src_fixate (GstBaseSrc * bsrc, GstCaps * caps); @@ -175,7 +174,8 @@ gst_pinos_src_finalize (GObject * object) { GstPinosSrc *pinossrc = GST_PINOS_SRC (object); - g_queue_free_full (&pinossrc->queue, (GDestroyNotify) gst_mini_object_unref); + g_queue_foreach (&pinossrc->queue, (GFunc) gst_mini_object_unref, NULL); + g_queue_clear (&pinossrc->queue); if (pinossrc->properties) gst_structure_free (pinossrc->properties); g_object_unref (pinossrc->fd_allocator); @@ -242,7 +242,6 @@ gst_pinos_src_class_init (GstPinosSrcClass * klass) gst_static_pad_template_get (&gst_pinos_src_template)); gstbasesrc_class->negotiate = gst_pinos_src_negotiate; - gstbasesrc_class->get_caps = gst_pinos_src_getcaps; gstbasesrc_class->fixate = gst_pinos_src_src_fixate; gstbasesrc_class->unlock = gst_pinos_src_unlock; gstbasesrc_class->unlock_stop = gst_pinos_src_unlock_stop; @@ -360,7 +359,7 @@ on_new_buffer (GObject *gobject, GstBuffer *buf = NULL; GST_LOG_OBJECT (pinossrc, "got new buffer"); - if (!pinos_stream_peek_buffer (pinossrc->stream, &pbuf)) { + if (!pinos_stream_get_buffer (pinossrc->stream, &pbuf)) { g_warning ("failed to capture buffer"); return; } @@ -435,6 +434,7 @@ on_new_buffer (GObject *gobject, break; } } + if (buf) { g_queue_push_tail (&pinossrc->queue, buf); @@ -534,12 +534,14 @@ static gboolean gst_pinos_src_stream_start (GstPinosSrc *pinossrc, GstCaps * caps) { gchar *str; - GBytes *format; + GBytes *format = NULL; gboolean res; PinosProperties *props; - str = gst_caps_to_string (caps); - format = g_bytes_new_take (str, strlen (str) + 1); + if (caps) { + str = gst_caps_to_string (caps); + format = g_bytes_new_take (str, strlen (str) + 1); + } pinos_main_loop_lock (pinossrc->loop); res = pinos_stream_start (pinossrc->stream, format, PINOS_STREAM_MODE_BUFFER); @@ -617,7 +619,7 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc) caps = thiscaps; } if (caps && !gst_caps_is_empty (caps)) { - GBytes *accepted, *possible; + GBytes *accepted; gchar *str; GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps); @@ -647,7 +649,7 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc) } GST_DEBUG_OBJECT (basesrc, "connect capture with path %s", pinossrc->path); - pinos_stream_connect (pinossrc->stream, PINOS_DIRECTION_OUTPUT, pinossrc->path, 0, accepted); + pinos_stream_connect (pinossrc->stream, PINOS_DIRECTION_INPUT, pinossrc->path, 0, accepted); while (TRUE) { PinosStreamState state = pinos_stream_get_state (pinossrc->stream); @@ -662,36 +664,7 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc) } pinos_main_loop_unlock (pinossrc->loop); - g_object_get (pinossrc->stream, "possible-formats", &possible, NULL); - if (possible) { - GstCaps *newcaps; - - newcaps = gst_caps_from_string (g_bytes_get_data (possible, NULL)); - if (newcaps) - caps = newcaps; - - g_bytes_unref (possible); - } - /* now fixate */ - GST_DEBUG_OBJECT (basesrc, "server fixated caps: %" GST_PTR_FORMAT, caps); - if (gst_caps_is_any (caps)) { - GST_DEBUG_OBJECT (basesrc, "any caps, we stop"); - /* hmm, still anything, so element can do anything and - * nego is not needed */ - result = TRUE; - } else { - caps = gst_pinos_src_src_fixate (basesrc, caps); - GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps); - if (gst_caps_is_fixed (caps)) { - /* yay, fixed caps, use those then, it's possible that the subclass does - * not accept this caps after all and we have to fail. */ - result = gst_base_src_set_caps (basesrc, caps); - if (result) { - result = gst_pinos_src_stream_start (pinossrc, caps); - } - } - } - gst_caps_unref (caps); + result = gst_pinos_src_stream_start (pinossrc, NULL); } else { if (caps) gst_caps_unref (caps); @@ -724,12 +697,6 @@ connect_error: } } -static GstCaps * -gst_pinos_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter) -{ - return GST_BASE_SRC_CLASS (parent_class)->get_caps (bsrc, filter); -} - static gboolean gst_pinos_src_unlock (GstBaseSrc * basesrc) { diff --git a/pinos/modules/gst/gst-sink.c b/pinos/modules/gst/gst-sink.c index 9527559bf..178fa3caf 100644 --- a/pinos/modules/gst/gst-sink.c +++ b/pinos/modules/gst/gst-sink.c @@ -115,24 +115,23 @@ setup_pipeline (PinosGstSink *sink, GError **error) { PinosGstSinkPrivate *priv = sink->priv; GstBus *bus; - GstCaps *caps; g_debug ("gst-sink %p: setup pipeline", sink); priv->pipeline = gst_pipeline_new (NULL); - priv->src = gst_element_factory_make ("socketsrc", NULL); - caps = gst_caps_new_empty_simple ("application/x-pinos"); - g_object_set (priv->src, "send-messages", TRUE, - "caps", caps, NULL); - gst_caps_unref (caps); + priv->src = gst_element_factory_make ("pinosportsrc", NULL); + g_object_set (priv->src, "port", priv->input, + NULL); + gst_bin_add (GST_BIN (priv->pipeline), priv->src); - priv->depay = gst_element_factory_make ("pinosdepay", NULL); - gst_bin_add (GST_BIN (priv->pipeline), priv->depay); - gst_element_link (priv->src, priv->depay); +// priv->depay = gst_element_factory_make ("pinosdepay", NULL); +// gst_bin_add (GST_BIN (priv->pipeline), priv->depay); +// gst_element_link (priv->src, priv->depay); + g_object_set (priv->element, "sync", FALSE, NULL); gst_bin_add (GST_BIN (priv->pipeline), priv->element); - gst_element_link (priv->depay, priv->element); + gst_element_link (priv->src, priv->element); bus = gst_pipeline_get_bus (GST_PIPELINE (priv->pipeline)); gst_bus_add_watch (bus, bus_handler, sink); @@ -240,80 +239,6 @@ set_state (PinosNode *node, return TRUE; } -#if 0 -static void -on_socket_notify (GObject *gobject, - GParamSpec *pspec, - gpointer user_data) -{ - PinosNode *node = user_data; - PinosGstSink *sink = user_data; - PinosGstSinkPrivate *priv = sink->priv; - GSocket *socket; - guint num_handles; - GstCaps *caps; - GBytes *requested_format, *format = NULL; - gchar *str; - gpointer state = NULL; - const gchar *key, *val; - PinosProperties *props; - - g_object_get (gobject, "socket", &socket, NULL); - GST_DEBUG ("got socket %p", socket); - - if (socket == NULL) { - g_object_set (priv->src, "socket", NULL, NULL); - num_handles = 0; - } else { - g_object_set (priv->src, "socket", socket, NULL); - num_handles = 1; - } - - if (num_handles == 0) { - pinos_node_report_idle (node); - g_object_set (priv->depay, "caps", NULL, NULL); - - str = gst_caps_to_string (priv->possible_formats); - format = g_bytes_new_take (str, strlen (str) + 1); - } else if (socket) { - /* what client requested */ - g_object_get (gobject, "requested-format", &requested_format, NULL); - g_assert (requested_format != NULL); - - if (num_handles == 1) { - /* first client, we set the requested format as the format */ - format = requested_format; - - /* set on the filter */ - caps = gst_caps_from_string (g_bytes_get_data (format, NULL)); - g_assert (caps != NULL); - g_object_set (priv->depay, "caps", caps, NULL); - gst_caps_unref (caps); - } else { - /* we already have a client, format is whatever is configured already */ - g_bytes_unref (requested_format); - - g_object_get (priv->depay, "caps", &caps, NULL); - str = gst_caps_to_string (caps); - format = g_bytes_new_take (str, strlen (str) + 1); - gst_caps_unref (caps); - } - /* this is what we use as the final format for the output */ - g_object_set (gobject, "format", format, NULL); - pinos_node_report_busy (node); - g_object_unref (socket); - } - - g_object_get (gobject, "properties", &props, NULL); - while ((key = pinos_properties_iterate (priv->props, &state))) { - val = pinos_properties_get (priv->props, key); - pinos_properties_set (props, key, val); - } - g_object_set (gobject, "properties", props, NULL); - pinos_properties_free (props); -} -#endif - static void get_property (GObject *object, guint prop_id, @@ -362,19 +287,48 @@ set_property (GObject *object, } } +static void +on_linked (PinosPort *port, PinosPort *peer, gpointer user_data) +{ + PinosNode *node = user_data; + gint n_peers; + + g_debug ("port %p: linked", port); + + n_peers = pinos_port_get_n_links (port); + if (n_peers == 1) + pinos_node_report_busy (node); +} + +static void +on_unlinked (PinosPort *port, PinosPort *peer, gpointer user_data) +{ + PinosNode *node = user_data; + gint n_peers; + + g_debug ("port %p: unlinked", port); + n_peers = pinos_port_get_n_links (port); + if (n_peers == 0) + pinos_node_report_idle (node); +} + static void on_input_port_created (GObject *source_object, GAsyncResult *res, gpointer user_data) { PinosNode *node = PINOS_NODE (source_object); - PinosGstSinkPrivate *priv = PINOS_GST_SINK (node)->priv; + PinosGstSink *sink = PINOS_GST_SINK (node); + PinosGstSinkPrivate *priv = sink->priv; priv->input = pinos_node_create_port_finish (node, res, NULL); + + g_signal_connect (priv->input, "linked", (GCallback) on_linked, node); + g_signal_connect (priv->input, "unlinked", (GCallback) on_unlinked, node); + + setup_pipeline (sink, NULL); } - - static void sink_constructed (GObject * object) { @@ -382,32 +336,32 @@ sink_constructed (GObject * object) PinosGstSink *sink = PINOS_GST_SINK (object); PinosGstSinkPrivate *priv = sink->priv; gchar *str; - GBytes *format; + GBytes *possible_formats; G_OBJECT_CLASS (pinos_gst_sink_parent_class)->constructed (object); str = gst_caps_to_string (priv->possible_formats); - format = g_bytes_new_take (str, strlen (str) + 1); + possible_formats = g_bytes_new_take (str, strlen (str) + 1); pinos_node_create_port (PINOS_NODE (node), PINOS_DIRECTION_INPUT, "input", - format, + possible_formats, NULL, NULL, on_input_port_created, node); - g_bytes_unref (format); - - setup_pipeline (sink, NULL); + g_bytes_unref (possible_formats); } static void sink_finalize (GObject * object) { + PinosServerNode *node = PINOS_SERVER_NODE (object); PinosGstSink *sink = PINOS_GST_SINK (object); PinosGstSinkPrivate *priv = sink->priv; + pinos_node_remove_port (PINOS_NODE (node), priv->input); destroy_pipeline (sink); g_clear_pointer (&priv->possible_formats, gst_caps_unref); pinos_properties_free (priv->props); diff --git a/pinos/modules/gst/gst-source.c b/pinos/modules/gst/gst-source.c index fdd93d6cc..c62e3eb20 100644 --- a/pinos/modules/gst/gst-source.c +++ b/pinos/modules/gst/gst-source.c @@ -32,7 +32,7 @@ struct _PinosGstSourcePrivate { GstElement *pipeline; GstElement *element; - GstElement *filter; + GstElement *pay; GstElement *sink; PinosPort *output; @@ -122,18 +122,21 @@ setup_pipeline (PinosGstSource *source, GError **error) gst_bin_add (GST_BIN (priv->pipeline), priv->element); - priv->filter = gst_element_factory_make ("capsfilter", NULL); - gst_bin_add (GST_BIN (priv->pipeline), priv->filter); - gst_element_link (priv->element, priv->filter); +#if 0 + priv->pay = gst_element_factory_make ("pinospay", NULL); + gst_bin_add (GST_BIN (priv->pipeline), priv->pay); + gst_element_link (priv->element, priv->pay); +#endif - priv->sink = gst_element_factory_make ("pinossocketsink", NULL); + priv->sink = gst_element_factory_make ("pinosportsink", NULL); g_object_set (priv->sink, "sync", TRUE, "enable-last-sample", FALSE, "qos", FALSE, + "port", priv->output, NULL); gst_bin_add (GST_BIN (priv->pipeline), priv->sink); - gst_element_link (priv->filter, priv->sink); + gst_element_link (priv->element, priv->sink); bus = gst_pipeline_get_bus (GST_PIPELINE (priv->pipeline)); gst_bus_add_watch (bus, bus_handler, source); @@ -286,86 +289,6 @@ set_state (PinosNode *node, return TRUE; } -#if 0 -static void -on_socket_notify (GObject *gobject, - GParamSpec *pspec, - gpointer user_data) -{ - PinosGstSource *source = user_data; - PinosGstSourcePrivate *priv = source->priv; - GSocket *socket; - guint num_handles; - GstCaps *caps; - GBytes *requested_format, *format = NULL; - gchar *str; - gpointer state = NULL; - const gchar *key, *val; - PinosProperties *props; - - g_object_get (gobject, "socket", &socket, NULL); - GST_DEBUG ("got socket %p", socket); - - if (socket == NULL) { - GSocket *prev_socket = g_object_steal_data (gobject, "last-socket"); - if (prev_socket) { - g_signal_emit_by_name (priv->sink, "remove", prev_socket); - g_object_unref (prev_socket); - } - } else { - g_signal_emit_by_name (priv->sink, "add", socket); - g_object_set_data_full (gobject, "last-socket", socket, g_object_unref); - } - - g_object_get (priv->sink, "num-handles", &num_handles, NULL); - if (num_handles == 0) { - pinos_node_report_idle (PINOS_NODE (source)); - g_object_set (priv->filter, "caps", NULL, NULL); - - str = gst_caps_to_string (priv->possible_formats); - format = g_bytes_new_take (str, strlen (str) + 1); - } else if (socket) { - /* what client requested */ - g_object_get (gobject, "requested-format", &requested_format, NULL); - g_assert (requested_format != NULL); - - if (num_handles == 1) { - /* first client, we set the requested format as the format */ - format = requested_format; - - /* set on the filter */ - caps = gst_caps_from_string (g_bytes_get_data (format, NULL)); - g_assert (caps != NULL); - g_object_set (priv->filter, "caps", caps, NULL); - gst_caps_unref (caps); - } else { - /* we already have a client, format is whatever is configured already */ - g_bytes_unref (requested_format); - - g_object_get (priv->filter, "caps", &caps, NULL); - str = gst_caps_to_string (caps); - format = g_bytes_new_take (str, strlen (str) + 1); - gst_caps_unref (caps); - } - /* this is what we use as the final format for the output */ - g_object_set (gobject, "format", format, NULL); - pinos_node_report_busy (PINOS_NODE (source)); - } - if (format) { - g_object_set (priv->output, "possible-formats", format, NULL); - g_bytes_unref (format); - } - - g_object_get (gobject, "properties", &props, NULL); - while ((key = pinos_properties_iterate (priv->props, &state))) { - val = pinos_properties_get (priv->props, key); - pinos_properties_set (props, key, val); - } - g_object_set (gobject, "properties", props, NULL); - pinos_properties_free (props); -} -#endif - static void get_property (GObject *object, guint prop_id, @@ -414,15 +337,43 @@ set_property (GObject *object, } } +static void +on_linked (PinosPort *port, PinosPort *peer, gpointer user_data) +{ + PinosNode *node = user_data; + gint n_peers; + + n_peers = pinos_port_get_n_links (port); + if (n_peers == 1) + pinos_node_report_busy (node); +} + +static void +on_unlinked (PinosPort *port, PinosPort *peer, gpointer user_data) +{ + PinosNode *node = user_data; + gint n_peers; + + n_peers = pinos_port_get_n_links (port); + if (n_peers == 0) + pinos_node_report_idle (node); +} + static void on_output_port_created (GObject *source_object, GAsyncResult *res, gpointer user_data) { PinosNode *node = PINOS_NODE (source_object); - PinosGstSourcePrivate *priv = PINOS_GST_SOURCE (node)->priv; + PinosGstSource *source = PINOS_GST_SOURCE (node); + PinosGstSourcePrivate *priv = source->priv; priv->output = pinos_node_create_port_finish (node, res, NULL); + + g_signal_connect (priv->output, "linked", (GCallback) on_linked, node); + g_signal_connect (priv->output, "unlinked", (GCallback) on_unlinked, node); + + setup_pipeline (source, NULL); } static void @@ -432,24 +383,22 @@ source_constructed (GObject * object) PinosGstSource *source = PINOS_GST_SOURCE (object); PinosGstSourcePrivate *priv = source->priv; gchar *str; - GBytes *format; + GBytes *possible_formats; G_OBJECT_CLASS (pinos_gst_source_parent_class)->constructed (object); str = gst_caps_to_string (priv->possible_formats); - format = g_bytes_new_take (str, strlen (str) + 1); + possible_formats = g_bytes_new_take (str, strlen (str) + 1); pinos_node_create_port (PINOS_NODE (node), PINOS_DIRECTION_OUTPUT, "output", - format, + possible_formats, NULL, NULL, on_output_port_created, node); - g_bytes_unref (format); - - setup_pipeline (source, NULL); + g_bytes_unref (possible_formats); } static void @@ -459,8 +408,9 @@ source_finalize (GObject * object) PinosGstSource *source = PINOS_GST_SOURCE (object); PinosGstSourcePrivate *priv = source->priv; - pinos_node_remove_port (PINOS_NODE (node), priv->output); + g_debug ("gst-source %p: dispose", node); destroy_pipeline (source); + pinos_node_remove_port (PINOS_NODE (node), priv->output); g_clear_pointer (&priv->possible_formats, gst_caps_unref); pinos_properties_free (priv->props); diff --git a/pinos/server/client.c b/pinos/server/client.c index a55f5f748..da15d1b6c 100644 --- a/pinos/server/client.c +++ b/pinos/server/client.c @@ -174,7 +174,7 @@ handle_create_node (PinosClient1 *interface, client); object_path = pinos_server_node_get_object_path (PINOS_SERVER_NODE (node)); - g_debug ("client %p: add node %p, %s", client, node, object_path); + g_debug ("client %p: add node %p %d, %s", client, node, G_OBJECT (node)->ref_count, object_path); g_dbus_method_invocation_return_value (invocation, g_variant_new ("(o)", object_path)); diff --git a/pinos/server/daemon.c b/pinos/server/daemon.c index 8f4485f88..c669ca27b 100644 --- a/pinos/server/daemon.c +++ b/pinos/server/daemon.c @@ -439,7 +439,11 @@ pinos_daemon_find_port (PinosDaemon *daemon, break; } - format = pinos_port_get_formats (PINOS_PORT (p), format_filter, NULL); + g_debug ("port %s with filter %s", + pinos_server_port_get_object_path (p), + format_filter ? (gchar*)g_bytes_get_data (format_filter, NULL) : "ANY"); + + format = pinos_port_filter_formats (PINOS_PORT (p), format_filter, NULL); if (format != NULL) { g_debug ("port %s with format %s matches filter %s", pinos_server_port_get_object_path (p), diff --git a/pinos/server/server-node.c b/pinos/server/server-node.c index 68e0898c3..dd778837b 100644 --- a/pinos/server/server-node.c +++ b/pinos/server/server-node.c @@ -20,6 +20,7 @@ #include #include +#include #include "pinos/client/pinos.h" #include "pinos/client/enumtypes.h" @@ -80,6 +81,7 @@ server_node_create_port (PinosNode *node, NULL); g_task_return_pointer (task, port, (GDestroyNotify) g_object_unref); + g_object_unref (task); } static void @@ -94,19 +96,55 @@ on_port_created (GObject *source_object, gpointer user_data) { PinosNode *node = PINOS_NODE (source_object); + PinosServerNodePrivate *priv = PINOS_SERVER_NODE (node)->priv; GDBusMethodInvocation *invocation = user_data; - PinosPort *port; + PinosPort *port, *peer; const gchar *object_path; GError *error = NULL; + GUnixFDList *fdlist; + GSocket *socket; + int fd, fdidx; + PinosDirection direction; port = pinos_node_create_port_finish (node, res, &error); if (port == NULL) goto no_port; + g_debug ("server-node %p %d: port %p created", node, G_OBJECT (node)->ref_count, port); + + socket = pinos_port_get_socket_pair (port, &error); + if (socket == NULL) + goto no_sockets; + + fd = g_socket_get_fd (socket); + fdlist = g_unix_fd_list_new (); + fdidx = g_unix_fd_list_append (fdlist, fd, &error); + g_object_unref (socket); + + if (fdidx == -1) + goto no_fdlist; + + direction = pinos_port_get_direction (port); + direction = pinos_direction_reverse (direction); + + peer = pinos_daemon_find_port (priv->daemon, + direction, + NULL, + pinos_port_get_properties (port), + pinos_port_get_possible_formats (port), + &error); + if (peer == NULL) + goto no_port_found; + + pinos_port_link (port, peer); + object_path = pinos_server_port_get_object_path (PINOS_SERVER_PORT (port)); - g_debug ("node %p: add port %p, %s", node, port, object_path); - g_dbus_method_invocation_return_value (invocation, - g_variant_new ("(o)", object_path)); + g_debug ("server-node %p: add port %p, remote fd %d, %s", node, port, fd, object_path); + g_dbus_method_invocation_return_value_with_unix_fd_list (invocation, + g_variant_new ("(oh)", + object_path, + fdidx), + fdlist); return; @@ -117,6 +155,31 @@ no_port: "org.pinos.Error", "can't create port"); return; } +no_sockets: + { + g_debug ("server-node %p: could create socketpair %s", node, error->message); + g_dbus_method_invocation_return_gerror (invocation, error); + g_clear_error (&error); + g_object_unref (port); + return; + } +no_fdlist: + { + g_debug ("server-node %p: could add to fdlist", node); + g_dbus_method_invocation_return_gerror (invocation, error); + g_clear_error (&error); + g_object_unref (fdlist); + g_object_unref (port); + return; + } +no_port_found: + { + g_debug ("server-node %p: could not find matching port", node); + g_dbus_method_invocation_return_gerror (invocation, error); + g_clear_error (&error); + g_object_unref (fdlist); + return; + } } @@ -142,6 +205,7 @@ handle_create_port (PinosNode1 *interface, formats = g_bytes_new (arg_possible_formats, strlen (arg_possible_formats) + 1); props = pinos_properties_from_variant (arg_properties); + g_debug ("server-node %p %d: create port", node, G_OBJECT (node)->ref_count); pinos_node_create_port (node, arg_direction, arg_name, @@ -173,7 +237,7 @@ handle_remove (PinosNode1 *interface, { PinosNode *node = user_data; - g_debug ("server-node %p: remove", node); + g_debug ("server-node %p %d: remove", node, G_OBJECT (node)->ref_count); pinos_node_remove (node); g_dbus_method_invocation_return_value (invocation, @@ -264,15 +328,34 @@ node_unregister_object (PinosServerNode *node) pinos_daemon_remove_node (priv->daemon, node); } +static void +on_property_notify (GObject *obj, + GParamSpec *pspec, + gpointer user_data) +{ + PinosNode *node = user_data; + PinosServerNodePrivate *priv = PINOS_SERVER_NODE (node)->priv; + + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "name")) { + pinos_node1_set_name (priv->iface, pinos_node_get_name (node)); + } + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "properties")) { + PinosProperties *props = pinos_node_get_properties (node); + pinos_node1_set_properties (priv->iface, props ? pinos_properties_to_variant (props) : NULL); + } +} + static void pinos_server_node_constructed (GObject * obj) { PinosServerNode *node = PINOS_SERVER_NODE (obj); g_debug ("server-node %p: constructed", node); - node_register_object (node); + g_signal_connect (node, "notify", (GCallback) on_property_notify, node); G_OBJECT_CLASS (pinos_server_node_parent_class)->constructed (obj); + + node_register_object (node); } static void diff --git a/pinos/server/server-port.c b/pinos/server/server-port.c index 4c021562b..d8c469f71 100644 --- a/pinos/server/server-port.c +++ b/pinos/server/server-port.c @@ -18,6 +18,8 @@ */ #include +#include + #include #include @@ -38,6 +40,7 @@ struct _PinosServerPortPrivate PinosDaemon *daemon; PinosPort1 *iface; gchar *object_path; + gboolean have_sockets; }; G_DEFINE_TYPE (PinosServerPort, pinos_server_port, PINOS_TYPE_PORT); @@ -49,6 +52,17 @@ enum PROP_OBJECT_PATH, }; +const gchar * +pinos_server_port_get_object_path (PinosServerPort *port) +{ + PinosServerPortPrivate *priv; + + g_return_val_if_fail (PINOS_IS_SERVER_PORT (port), NULL); + priv = port->priv; + + return priv->object_path; +} + static gboolean handle_remove (PinosPort1 *interface, GDBusMethodInvocation *invocation, @@ -142,15 +156,51 @@ port_unregister_object (PinosServerPort *port) pinos_daemon_unexport (priv->daemon, priv->object_path); } +static void +on_property_notify (GObject *obj, + GParamSpec *pspec, + gpointer user_data) +{ + PinosPort *port = PINOS_PORT (obj); + PinosServerPortPrivate *priv = PINOS_SERVER_PORT (port)->priv; + + g_debug ("update %s", pspec ? g_param_spec_get_name (pspec) : "NULL"); + + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "node") == 0) { + PinosServerNode *node = PINOS_SERVER_NODE (pinos_port_get_node (port)); + pinos_port1_set_node (priv->iface, pinos_server_node_get_object_path (node)); + } + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "direction") == 0) { + pinos_port1_set_direction (priv->iface, pinos_port_get_direction (port)); + } + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "name") == 0) { + pinos_port1_set_name (priv->iface, pinos_port_get_name (port)); + } + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "properties") == 0) { + PinosProperties *props = pinos_port_get_properties (port); + pinos_port1_set_properties (priv->iface, props ? pinos_properties_to_variant (props) : NULL); + } + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "possible-formats") == 0) { + GBytes *bytes = pinos_port_get_possible_formats (port); + pinos_port1_set_possible_formats (priv->iface, bytes ? g_bytes_get_data (bytes, NULL) : NULL); + } + if (pspec == NULL || strcmp (g_param_spec_get_name (pspec), "format") == 0) { + GBytes *bytes = pinos_port_get_format (port); + pinos_port1_set_format (priv->iface, bytes ? g_bytes_get_data (bytes, NULL) : NULL); + } +} + static void pinos_server_port_constructed (GObject * object) { PinosServerPort *port = PINOS_SERVER_PORT (object); g_debug ("server-port %p: constructed", port); - port_register_object (port); + g_signal_connect (port, "notify", (GCallback) on_property_notify, port); G_OBJECT_CLASS (pinos_server_port_parent_class)->constructed (object); + + port_register_object (port); } static void @@ -182,6 +232,7 @@ static void pinos_server_port_class_init (PinosServerPortClass * klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + //PinosPortClass *port_class = PINOS_PORT_CLASS (klass); g_type_class_add_private (klass, sizeof (PinosServerPortPrivate)); @@ -222,14 +273,3 @@ pinos_server_port_init (PinosServerPort * port) port); } - -const gchar * -pinos_server_port_get_object_path (PinosServerPort *port) -{ - PinosServerPortPrivate *priv; - - g_return_val_if_fail (PINOS_IS_SERVER_PORT (port), NULL); - priv = port->priv; - - return priv->object_path; -} diff --git a/pinos/tools/pinos-monitor.c b/pinos/tools/pinos-monitor.c index d8b25166c..90232b45d 100644 --- a/pinos/tools/pinos-monitor.c +++ b/pinos/tools/pinos-monitor.c @@ -163,6 +163,7 @@ dump_port_info (PinosContext *c, const PinosPortInfo *info, gpointer user_data) g_print ("%c\tname: \"%s\"\n", MARK_CHANGE (0), info->name); print_properties (info->properties, MARK_CHANGE (1)); print_formats ("possible formats", info->possible_formats, MARK_CHANGE (2)); + print_formats ("format", info->format, MARK_CHANGE (3)); } }