diff --git a/doc/design.txt b/doc/design.txt index 838705c7b..14c5f02a2 100644 --- a/doc/design.txt +++ b/doc/design.txt @@ -35,6 +35,8 @@ Node1: a processing node, this can be a source, sink or transform /org/pinos/node* Port1: a port on a Node1, ports can be input or output ports /org/pinos/node*/port* +Channel1: a communication channel between a client and port + /org/pinos/client/channel* Link1: a link between 2 ports /org/pinos/link* diff --git a/pinos/Makefile.am b/pinos/Makefile.am index 4f9793e66..b1f1200e2 100644 --- a/pinos/Makefile.am +++ b/pinos/Makefile.am @@ -180,6 +180,7 @@ lib_LTLIBRARIES = \ # Public interface libpinos_@PINOS_MAJORMINOR@_la_SOURCES = \ client/buffer.h client/buffer.c \ + client/io.c \ client/context.h client/context.c \ client/enumtypes.h client/enumtypes.c \ client/introspect.h client/introspect.c \ @@ -204,10 +205,13 @@ lib_LTLIBRARIES += libpinoscore-@PINOS_MAJORMINOR@.la # Pure core stuff libpinoscore_@PINOS_MAJORMINOR@_la_SOURCES = \ + server/channel.c server/channel.h \ + server/client.c server/client.h \ server/daemon.c server/daemon.h \ server/node.c server/node.h \ server/port.c server/port.h \ server/node-factory.c server/node-factory.h \ + server/utils.c server/utils.h \ modules/gst/gst-manager.c modules/gst/gst-manager.h \ modules/gst/gst-source.c modules/gst/gst-source.h \ modules/gst/gst-sink.c modules/gst/gst-sink.h \ diff --git a/pinos/client/buffer.c b/pinos/client/buffer.c index 8a7f08280..63f54f79d 100644 --- a/pinos/client/buffer.c +++ b/pinos/client/buffer.c @@ -459,8 +459,8 @@ pinos_buffer_builder_init_full (PinosBufferBuilder *builder, sb->buf.max_size = sizeof (PinosStackHeader) + 128; sb->buf.data = g_malloc (sb->buf.max_size); sb->buf.free_data = sb->buf.data; - g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, - builder, max_data, sb->buf.max_size); +// g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, +// builder, max_data, sb->buf.max_size); } else { sb->buf.max_size = max_data; sb->buf.data = data; @@ -581,8 +581,8 @@ pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, if (sb->buf.n_fds >= sb->buf.max_fds) { gint new_size = sb->buf.max_fds + 8; - g_warning ("builder %p: realloc buffer fds %d -> %d", - builder, sb->buf.max_fds, new_size); +// g_warning ("builder %p: realloc buffer fds %d -> %d", +// builder, sb->buf.max_fds, new_size); sb->buf.max_fds = new_size; sb->buf.free_fds = g_realloc (sb->buf.free_fds, new_size * sizeof (int)); sb->buf.fds = sb->buf.free_fds; @@ -599,8 +599,8 @@ builder_ensure_size (struct stack_builder *sb, gsize size) { if (sb->buf.size + size > sb->buf.max_size) { gsize new_size = sb->buf.size + MAX (size, 1024); - g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, - sb, sb->buf.max_size, new_size); +// g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, +// sb, sb->buf.max_size, new_size); sb->buf.max_size = new_size; sb->buf.free_data = g_realloc (sb->buf.free_data, new_size); sb->sh = sb->buf.data = sb->buf.free_data; diff --git a/pinos/client/context.c b/pinos/client/context.c index f1a0b9abf..b33f6e46f 100644 --- a/pinos/client/context.c +++ b/pinos/client/context.c @@ -414,6 +414,13 @@ subscription_cb (PinosSubscribe *subscribe, else if (event == PINOS_SUBSCRIPTION_EVENT_REMOVE) priv->ports = g_list_remove (priv->ports, object); break; + + case PINOS_SUBSCRIPTION_FLAG_CHANNEL: + if (event == PINOS_SUBSCRIPTION_EVENT_NEW) + priv->channels = g_list_prepend (priv->channels, object); + else if (event == PINOS_SUBSCRIPTION_EVENT_REMOVE) + priv->channels = g_list_remove (priv->channels, object); + break; } if (flags & priv->subscription_mask) diff --git a/pinos/client/introspect.c b/pinos/client/introspect.c index ea6bb480f..e6ca096cb 100644 --- a/pinos/client/introspect.c +++ b/pinos/client/introspect.c @@ -462,6 +462,143 @@ pinos_port_state_as_string (PinosPortState state) return val == NULL ? "invalid-state" : val->value_nick; } +/** + * pinos_channel_state_as_string: + * @state: a #PinosChannelState + * + * Return the string representation of @state. + * + * Returns: the string representation of @state. + */ +const gchar * +pinos_channel_state_as_string (PinosChannelState state) +{ + GEnumValue *val; + + val = g_enum_get_value (G_ENUM_CLASS (g_type_class_ref (PINOS_TYPE_CHANNEL_STATE)), + state); + + return val == NULL ? "invalid-state" : val->value_nick; +} + +static void +channel_fill_info (PinosChannelInfo *info, GDBusProxy *proxy) +{ + GHashTable *changed = g_object_get_data (G_OBJECT (proxy), "pinos-changed-properties"); + + info->id = proxy; + info->channel_path = g_dbus_proxy_get_object_path (proxy); + SET_UINT32 ("Direction", direction, 2, PINOS_DIRECTION_INVALID); + SET_STRING ("Client", client_path, 0); + + info->change_mask = 0; + SET_STRING ("Port", port_path, 0); + SET_PROPERTIES ("Properties", properties, 1); + SET_UINT32 ("State", state, 2, PINOS_CHANNEL_STATE_ERROR); + SET_BYTES ("PossibleFormats", possible_formats, 3); + SET_BYTES ("Format", format, 4); + + if (changed) + g_hash_table_remove_all (changed); +} + +static void +channel_clear_info (PinosChannelInfo *info) +{ + if (info->possible_formats) + g_bytes_unref (info->possible_formats); + if (info->format) + g_bytes_unref (info->format); + if (info->properties) + pinos_properties_free (info->properties); +} + + +/** + * pinos_context_list_channel_info: + * @context: a connected #PinosContext + * @flags: extra #PinosChannelInfoFlags + * @cb: a #PinosChannelInfoCallback + * @cancelable: a #GCancellable + * @callback: a #GAsyncReadyCallback to call when the operation is finished + * @user_data: user data passed to @cb + * + * Call @cb for each channel. + */ +void +pinos_context_list_channel_info (PinosContext *context, + PinosChannelInfoFlags flags, + PinosChannelInfoCallback cb, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + PinosContextPrivate *priv; + GList *walk; + GTask *task; + + g_return_if_fail (PINOS_IS_CONTEXT (context)); + g_return_if_fail (cb != NULL); + + task = g_task_new (context, cancellable, callback, user_data); + + priv = context->priv; + + for (walk = priv->channels; walk; walk = g_list_next (walk)) { + GDBusProxy *proxy = walk->data; + PinosChannelInfo info; + + channel_fill_info (&info, proxy); + cb (context, &info, user_data); + channel_clear_info (&info); + } + + g_task_return_boolean (task, TRUE); + g_object_unref (task); +} + +/** + * pinos_context_get_channel_info_by_id: + * @context: a connected #PinosContext + * @id: a channel id + * @flags: extra #PinosChannelInfoFlags + * @cb: a #PinosChannelInfoCallback + * @cancelable: a #GCancellable + * @callback: a #GAsyncReadyCallback to call when the operation is finished + * @user_data: user data passed to @cb + * + * Call @cb for the channel with @id. + */ +void +pinos_context_get_channel_info_by_id (PinosContext *context, + gpointer id, + PinosChannelInfoFlags flags, + PinosChannelInfoCallback cb, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + PinosChannelInfo info; + GDBusProxy *proxy; + GTask *task; + + g_return_if_fail (PINOS_IS_CONTEXT (context)); + g_return_if_fail (id != NULL); + g_return_if_fail (cb != NULL); + + task = g_task_new (context, cancellable, callback, user_data); + + proxy = G_DBUS_PROXY (id); + + channel_fill_info (&info, proxy); + cb (context, &info, user_data); + channel_clear_info (&info); + + g_task_return_boolean (task, TRUE); + g_object_unref (task); +} + + static void connection_fill_info (PinosConnectionInfo *info, GDBusProxy *proxy) { diff --git a/pinos/client/introspect.h b/pinos/client/introspect.h index 43a2cf472..c2aea5742 100644 --- a/pinos/client/introspect.h +++ b/pinos/client/introspect.h @@ -270,6 +270,97 @@ void pinos_context_get_port_info_by_id (PinosContext *context, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data); + + +/** + * PinosChannelState: + * @PINOS_CHANNEL_STATE_ERROR: the channel is in error + * @PINOS_CHANNEL_STATE_STOPPED: the channel is stopped + * @PINOS_CHANNEL_STATE_STARTING: the channel is starting + * @PINOS_CHANNEL_STATE_STREAMING: the channel is streaming + * + * The different channel states + */ +typedef enum { + PINOS_CHANNEL_STATE_ERROR = -1, + PINOS_CHANNEL_STATE_STOPPED = 0, + PINOS_CHANNEL_STATE_STARTING = 1, + PINOS_CHANNEL_STATE_STREAMING = 2, +} PinosChannelState; + +const gchar * pinos_channel_state_as_string (PinosChannelState state); + +/** + * PinosChannelInfo: + * @id: generic id of the channel_ + * @channel_path: the unique path of the channel + * @direction: the channel direction + * @client_path: the owner client + * @change_mask: bitfield of changed fields since last call + * @port_path: the owner port + * @properties: the properties of the channel + * @state: the state + * @possible_formats: the possible formats + * @format: when streaming, the current format + * + * The channel information. Extra information can be added in later + * versions. + */ +typedef struct { + gpointer id; + const char *channel_path; + PinosDirection direction; + const char *client_path; + guint64 change_mask; + const char *port_path; + PinosProperties *properties; + PinosChannelState state; + GBytes *possible_formats; + GBytes *format; +} PinosChannelInfo; + +/** + * PinosChannelInfoFlags: + * @PINOS_CHANNEL_INFO_FLAGS_NONE: no flags + * @PINOS_CHANNEL_INFO_FLAGS_NO_INPUT: don't list input channels + * @PINOS_CHANNEL_INFO_FLAGS_NO_OUTPUT: don't list output channels + * + * Extra flags to pass to pinos_context_list_channel_info() and + * pinos_context_get_channel_info_by_id(). + */ +typedef enum { + PINOS_CHANNEL_INFO_FLAGS_NONE = 0, + PINOS_CHANNEL_INFO_FLAGS_NO_INPUT = (1 << 0), + PINOS_CHANNEL_INFO_FLAGS_NO_OUTPUT = (1 << 1), +} PinosChannelInfoFlags; + + +/** + * PinosChannelInfoCallback: + * @c: a #PinosContext + * @info: a #PinosChannelInfo + * @user_data: user data + * + * Callback with information about the Pinos channel in @info. + */ +typedef void (*PinosChannelInfoCallback) (PinosContext *c, + const PinosChannelInfo *info, + gpointer user_data); + +void pinos_context_list_channel_info (PinosContext *context, + PinosChannelInfoFlags flags, + PinosChannelInfoCallback cb, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +void pinos_context_get_channel_info_by_id (PinosContext *context, + gpointer id, + PinosChannelInfoFlags flags, + PinosChannelInfoCallback cb, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + /** * PinosConnectionInfo: * @id: generic id of the connection diff --git a/pinos/client/io.c b/pinos/client/io.c new file mode 100644 index 000000000..e4564618a --- /dev/null +++ b/pinos/client/io.c @@ -0,0 +1,179 @@ +/* Pinos + * Copyright (C) 2015 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include + +#include + +#include "pinos/client/pinos.h" +#include "pinos/client/private.h" + +gboolean +pinos_io_read_buffer (int fd, + PinosBuffer *buffer, + void *data, + size_t max_data, + int *fds, + size_t max_fds, + GError **error) +{ + gssize len; + PinosStackHeader *hdr; + PinosStackBuffer *sb = (PinosStackBuffer *) buffer; + gsize need; + struct cmsghdr *cmsg; + struct msghdr msg = {0}; + struct iovec iov[1]; + char cmsgbuf[CMSG_SPACE (max_fds * sizeof (int))]; + + g_assert (sb->refcount == 0); + + sb->data = data; + sb->max_size = max_data; + sb->size = 0; + sb->free_data = NULL; + sb->fds = fds; + sb->max_fds = max_fds; + sb->n_fds = 0; + sb->free_fds = NULL; + + hdr = sb->data; + + /* read header and control messages first */ + iov[0].iov_base = hdr; + iov[0].iov_len = sizeof (PinosStackHeader);; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + msg.msg_control = cmsgbuf; + msg.msg_controllen = sizeof (cmsgbuf); + msg.msg_flags = MSG_CMSG_CLOEXEC; + + while (TRUE) { + len = recvmsg (fd, &msg, msg.msg_flags); + if (len < 0) { + if (errno == EINTR) + continue; + else + goto recv_error; + } + break; + } + g_assert (len == sizeof (PinosStackHeader)); + + /* now we know the total length */ + need = sizeof (PinosStackHeader) + hdr->length; + + if (sb->max_size < need) { + g_warning ("io: realloc receive memory %" G_GSIZE_FORMAT" -> %" G_GSIZE_FORMAT, sb->max_size, need); + sb->max_size = need; + hdr = sb->data = sb->free_data = g_realloc (sb->free_data, need); + } + sb->size = need; + + if (hdr->length > 0) { + /* read data */ + while (TRUE) { + len = recv (fd, (gchar *)sb->data + sizeof (PinosStackHeader), hdr->length, 0); + if (len < 0) { + if (errno == EINTR) + continue; + else + goto recv_error; + } + break; + } + g_assert (len == hdr->length); + } + + /* handle control messages */ + for (cmsg = CMSG_FIRSTHDR (&msg); cmsg != NULL; cmsg = CMSG_NXTHDR (&msg, cmsg)) { + if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS) + continue; + + sb->n_fds = (cmsg->cmsg_len - ((char *)CMSG_DATA (cmsg) - (char *)cmsg)) / sizeof (int); + memcpy (sb->fds, CMSG_DATA (cmsg), sb->n_fds * sizeof (int)); + } + sb->refcount = 1; + sb->magic = PSB_MAGIC; + + return TRUE; + + /* ERRORS */ +recv_error: + { + g_set_error (error, + G_IO_ERROR, + g_io_error_from_errno (errno), + "could not recvmsg: %s", strerror (errno)); + return FALSE; + } +} + +gboolean +pinos_io_write_buffer (int fd, + PinosBuffer *buffer, + GError **error) +{ + PinosStackBuffer *sb = (PinosStackBuffer *) buffer; + gssize len; + struct msghdr msg = {0}; + struct iovec iov[1]; + struct cmsghdr *cmsg; + char cmsgbuf[CMSG_SPACE (sb->n_fds * sizeof (int))]; + gint fds_len = sb->n_fds * sizeof (int); + + iov[0].iov_base = sb->data; + iov[0].iov_len = sb->size; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + msg.msg_control = cmsgbuf; + msg.msg_controllen = CMSG_SPACE (fds_len); + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN (fds_len); + memcpy(CMSG_DATA(cmsg), sb->fds, fds_len); + msg.msg_controllen = cmsg->cmsg_len; + + while (TRUE) { + len = sendmsg (fd, &msg, 0); + if (len < 0) { + if (errno == EINTR) + continue; + else + goto send_error; + } + break; + } + g_assert (len == (gssize) sb->size); + + return TRUE; + + /* ERRORS */ +send_error: + { + g_set_error (error, + G_IO_ERROR, + g_io_error_from_errno (errno), + "could not sendmsg: %s", strerror (errno)); + return FALSE; + } +} diff --git a/pinos/client/private.h b/pinos/client/private.h index f3c2a6209..7332b37dd 100644 --- a/pinos/client/private.h +++ b/pinos/client/private.h @@ -41,6 +41,7 @@ struct _PinosContextPrivate GList *nodes; GList *ports; GList *connections; + GList *channels; }; void pinos_subscribe_get_proxy (PinosSubscribe *subscribe, @@ -79,3 +80,14 @@ typedef struct { #define PSB_MAGIC ((gsize) 5493683301u) #define is_valid_buffer(b) (b != NULL && \ PSB(b)->magic == PSB_MAGIC) + +gboolean pinos_io_read_buffer (int fd, + PinosBuffer *sb, + void *data, + size_t max_data, + int *fds, + size_t max_fds, + GError **error); +gboolean pinos_io_write_buffer (int fd, + PinosBuffer *buffer, + GError **error); diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 1db124e7a..3031a827e 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -31,6 +32,9 @@ #include "pinos/client/private.h" +#define MAX_BUFFER_SIZE 1024 +#define MAX_FDS 16 + struct _PinosStreamPrivate { PinosContext *context; @@ -49,11 +53,18 @@ struct _PinosStreamPrivate GBytes *format; - PinosNode *node; - PinosPort *port; + GDBusProxy *channel; gboolean disconnecting; PinosStreamMode mode; + GSocket *socket; + GSource *socket_source; + int fd; + + PinosBuffer *buffer; + PinosBuffer recv_buffer; + guint8 recv_data[MAX_BUFFER_SIZE]; + int recv_fds[MAX_FDS]; }; #define PINOS_STREAM_GET_PRIVATE(obj) \ @@ -184,100 +195,6 @@ stream_set_state (PinosStream *stream, } } -static GDBusProxy * -get_proxy (PinosStream *stream, GList *list, const gchar *path) -{ - GList *walk; - - for (walk = list; walk; walk = g_list_next (walk)) { - GDBusProxy *proxy = walk->data; - - if (!g_strcmp0 (g_dbus_proxy_get_object_path (proxy), path)) { - return proxy; - } - } - return NULL; -} - -static GDBusProxy * -get_port_proxy (PinosStream *stream, const gchar *path) -{ - return get_proxy (stream, stream->priv->context->priv->ports, path); -} - -static GDBusProxy * -get_node_proxy (PinosStream *stream, const gchar *path) -{ - return get_proxy (stream, stream->priv->context->priv->nodes, path); -} - -static GDBusProxy * -get_peer_port_proxy (PinosStream *stream) -{ - PinosStreamPrivate *priv = stream->priv; - GDBusProxy *port_proxy; - GDBusProxy *peer_proxy = NULL; - - if (priv->port == NULL) - return NULL; - - g_object_get (priv->port, "proxy", &port_proxy, NULL); - if (port_proxy) { - GVariant *v; - - v = g_dbus_proxy_get_cached_property (port_proxy, "Peers"); - if (v) { - GVariantIter *iter; - gchar *peer_path; - - g_variant_get (v, "ao", &iter); - if (g_variant_iter_next (iter, "&o", &peer_path, NULL)) { - peer_proxy = get_port_proxy (stream, peer_path); - } - g_variant_iter_free (iter); - } - } - - return peer_proxy; -} - -static GDBusProxy * -get_peer_node_proxy (PinosStream *stream) -{ - GDBusProxy *peer_port; - GVariant *v; - GDBusProxy *node = NULL; - - peer_port = get_peer_port_proxy (stream); - if (peer_port) { - v = g_dbus_proxy_get_cached_property (peer_port, "Node"); - if (v) { - const gchar *node_path = g_variant_get_string (v, NULL); - node = get_node_proxy (stream, node_path); - g_variant_unref (v); - } - } - return node; -} - -static void -merge_peer_properties (PinosStream *stream) -{ - PinosStreamPrivate *priv = stream->priv; - GDBusProxy *peer_node; - - peer_node = get_peer_node_proxy (stream); - if (peer_node) { - GVariant *v = g_dbus_proxy_get_cached_property (peer_node, "Properties"); - if (v) { - PinosProperties *props = pinos_properties_from_variant (v); - priv->properties = pinos_properties_merge (priv->properties, props); - pinos_properties_free (props); - g_variant_unref (v); - } - } -} - static void subscription_cb (PinosSubscribe *subscribe, PinosSubscriptionEvent event, @@ -289,28 +206,14 @@ subscription_cb (PinosSubscribe *subscribe, PinosStreamPrivate *priv = stream->priv; switch (flags) { - case PINOS_SUBSCRIPTION_FLAG_NODE: + case PINOS_SUBSCRIPTION_FLAG_CHANNEL: if (event == PINOS_SUBSCRIPTION_EVENT_REMOVE) { - if (object == priv->node && !priv->disconnecting) { + if (object == priv->channel && !priv->disconnecting) { stream_set_state (stream, PINOS_STREAM_STATE_ERROR, g_error_new_literal (G_IO_ERROR, G_IO_ERROR_CLOSED, - "Node disappeared")); - } - } else if (event == PINOS_SUBSCRIPTION_EVENT_NEW || - event == PINOS_SUBSCRIPTION_EVENT_CHANGE) { - if (object == get_peer_node_proxy (stream)) { - merge_peer_properties (stream); - } - } - break; - - case PINOS_SUBSCRIPTION_FLAG_PORT: - if (event == PINOS_SUBSCRIPTION_EVENT_NEW || - event == PINOS_SUBSCRIPTION_EVENT_CHANGE) { - if (object == get_peer_port_proxy (stream)) { - merge_peer_properties (stream); + "Channel disappeared")); } } break; @@ -342,7 +245,7 @@ pinos_stream_finalize (GObject * object) g_debug ("free stream %p", stream); - g_clear_object (&priv->node); + g_clear_object (&priv->channel); if (priv->possible_formats) g_bytes_unref (priv->possible_formats); @@ -577,12 +480,259 @@ pinos_stream_get_error (PinosStream *stream) return stream->priv->error; } + +static void +on_channel_proxy (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + PinosStream *stream = user_data; + PinosStreamPrivate *priv = stream->priv; + PinosContext *context = priv->context; + GVariant *v; + gchar *str; + GError *error = NULL; + + priv->channel = pinos_subscribe_get_proxy_finish (context->priv->subscribe, + res, + &error); + if (priv->channel == NULL) + goto channel_failed; + + /* get the port we are connected to */ + v = g_dbus_proxy_get_cached_property (priv->channel, "Port"); + if (v) { + gsize len; + str = g_variant_dup_string (v, &len); + g_variant_unref (v); + + g_free (priv->path); + priv->path = str; + } + + v = g_dbus_proxy_get_cached_property (priv->channel, "PossibleFormats"); + if (v) { + gsize len; + str = g_variant_dup_string (v, &len); + g_variant_unref (v); + + if (priv->possible_formats) + g_bytes_unref (priv->possible_formats); + priv->possible_formats = g_bytes_new_take (str, len + 1); + + g_object_notify (G_OBJECT (stream), "possible-formats"); + } + v = g_dbus_proxy_get_cached_property (priv->channel, "Properties"); + if (v) { + if (priv->properties) + pinos_properties_free (priv->properties); + priv->properties = pinos_properties_from_variant (v); + g_variant_unref (v); + + g_object_notify (G_OBJECT (stream), "properties"); + } + + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); + g_object_unref (stream); + + return; + +channel_failed: + { + g_warning ("failed to get channel proxy: %s", error->message); + stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); + g_object_unref (stream); + return; + } +} + +static gboolean +on_socket_condition (GSocket *socket, + GIOCondition condition, + gpointer user_data) +{ + PinosStream *stream = user_data; + PinosStreamPrivate *priv = stream->priv; + + switch (condition) { + case G_IO_IN: + { + PinosBuffer *buffer = &priv->recv_buffer; + GError *error = NULL; + + if (!pinos_io_read_buffer (priv->fd, + buffer, + priv->recv_data, + MAX_BUFFER_SIZE, + priv->recv_fds, + MAX_FDS, + &error)) { + g_warning ("stream %p: failed to read buffer: %s", stream, error->message); + g_clear_error (&error); + return TRUE; + } + + priv->buffer = buffer; + g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL); + priv->buffer = NULL; + + g_assert (pinos_buffer_unref (buffer) == FALSE); + break; + } + + case G_IO_OUT: + g_warning ("can do IO\n"); + break; + + default: + break; + } + return TRUE; +} + +static void +handle_socket (PinosStream *stream, gint fd) +{ + PinosStreamPrivate *priv = stream->priv; + GError *error = NULL; + + priv->socket = g_socket_new_from_fd (fd, &error); + if (priv->socket == NULL) + goto socket_failed; + + switch (priv->mode) { + case PINOS_STREAM_MODE_SOCKET: + g_object_notify (G_OBJECT (stream), "socket"); + break; + + case PINOS_STREAM_MODE_BUFFER: + { + priv->fd = g_socket_get_fd (priv->socket); + priv->socket_source = g_socket_create_source (priv->socket, G_IO_IN, NULL); + g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, stream, NULL); + g_source_attach (priv->socket_source, priv->context->priv->context); + break; + } + + default: + break; + } + return; + + /* ERRORS */ +socket_failed: + { + g_warning ("failed to create socket: %s", error->message); + stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); + return; + } +} + +static void +unhandle_socket (PinosStream *stream) +{ + PinosStreamPrivate *priv = stream->priv; + + switch (priv->mode) { + case PINOS_STREAM_MODE_SOCKET: + g_clear_object (&priv->socket); + g_object_notify (G_OBJECT (stream), "socket"); + break; + + case PINOS_STREAM_MODE_BUFFER: + if (priv->socket_source) { + g_source_destroy (priv->socket_source); + g_clear_pointer (&priv->socket_source, g_source_unref); + } + break; + + default: + break; + } +} + + +static void +on_channel_created (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + PinosStream *stream = user_data; + PinosStreamPrivate *priv = stream->priv; + PinosContext *context = priv->context; + GVariant *ret; + GError *error = NULL; + const gchar *channel_path; + GUnixFDList *fd_list; + gint fd_idx, fd; + + g_assert (context->priv->daemon == G_DBUS_PROXY (source_object)); + + ret = g_dbus_proxy_call_with_unix_fd_list_finish (context->priv->daemon, + &fd_list, + res, &error); + if (ret == NULL) + goto create_failed; + + g_variant_get (ret, "(&oh)", &channel_path, &fd_idx); + g_variant_unref (ret); + + if ((fd = g_unix_fd_list_get (fd_list, fd_idx, &error)) < 0) + goto fd_failed; + + priv->fd = fd; + + g_object_unref (fd_list); + + pinos_subscribe_get_proxy (context->priv->subscribe, + PINOS_DBUS_SERVICE, + channel_path, + "org.pinos.Channel1", + NULL, + on_channel_proxy, + stream); + + return; + + /* ERRORS */ +create_failed: + { + g_warning ("failed to connect: %s", error->message); + goto exit_error; + } +fd_failed: + { + g_warning ("failed to get FD: %s", error->message); + g_object_unref (fd_list); + goto exit_error; + } +exit_error: + { + stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); + g_object_unref (stream); + return; + } +} + + static gboolean do_connect (PinosStream *stream) { PinosStreamPrivate *priv = stream->priv; PinosContext *context = priv->context; + g_dbus_proxy_call (context->priv->daemon, + "CreateChannel", + g_variant_new ("(sus@a{sv})", + (priv->path ? priv->path : ""), + priv->direction, + g_bytes_get_data (priv->possible_formats, NULL), + pinos_properties_to_variant (priv->properties)), + G_DBUS_CALL_FLAGS_NONE, + -1, + NULL, /* GCancellable *cancellable */ + on_channel_created, + stream); return FALSE; } @@ -633,11 +783,76 @@ pinos_stream_connect (PinosStream *stream, return TRUE; } +static void +on_stream_started (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + PinosStream *stream = user_data; + PinosStreamPrivate *priv = stream->priv; + gchar *format; + GError *error = NULL; + GVariant *result, *properties; + + result = g_dbus_proxy_call_finish (priv->channel, + res, + &error); + if (result == NULL) + goto start_failed; + + g_variant_get (result, + "(s@a{sv})", + &format, + &properties); + + g_variant_unref (result); + + if (priv->format) + g_bytes_unref (priv->format); + priv->format = g_bytes_new_take (format, strlen (format) + 1); + g_object_notify (G_OBJECT (stream), "format"); + + if (priv->properties) + pinos_properties_free (priv->properties); + priv->properties = pinos_properties_from_variant (properties); + g_variant_unref (properties); + g_object_notify (G_OBJECT (stream), "properties"); + + stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); + g_object_unref (stream); + + return; + + /* ERRORS */ +start_failed: + { + g_warning ("failed to start: %s", error->message); + goto exit_error; + } +exit_error: + { + stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); + g_object_unref (stream); + return; + } +} + static gboolean do_start (PinosStream *stream) { - stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); - g_object_unref (stream); + PinosStreamPrivate *priv = stream->priv; + + handle_socket (stream, priv->fd); + + g_dbus_proxy_call (priv->channel, + "Start", + g_variant_new ("(s)", + priv->format ? g_bytes_get_data (priv->format, NULL) : "ANY"), + G_DBUS_CALL_FLAGS_NONE, + -1, + NULL, /* GCancellable *cancellable */ + on_stream_started, + stream); return FALSE; } @@ -683,11 +898,54 @@ pinos_stream_start (PinosStream *stream, return TRUE; } +static void +on_stream_stopped (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + PinosStream *stream = user_data; + PinosStreamPrivate *priv = stream->priv; + GVariant *ret; + GError *error = NULL; + + ret = g_dbus_proxy_call_finish (priv->channel, res, &error); + if (ret == NULL) + goto call_failed; + + g_variant_unref (ret); + + unhandle_socket (stream); + g_clear_pointer (&priv->format, g_bytes_unref); + g_object_notify (G_OBJECT (stream), "format"); + + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); + g_object_unref (stream); + + return; + + /* ERRORS */ +call_failed: + { + g_warning ("failed to stop: %s", error->message); + stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); + g_object_unref (stream); + return; + } +} + static gboolean do_stop (PinosStream *stream) { - stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); - g_object_unref (stream); + PinosStreamPrivate *priv = stream->priv; + + g_dbus_proxy_call (priv->channel, + "Stop", + g_variant_new ("()"), + G_DBUS_CALL_FLAGS_NONE, + -1, + NULL, /* GCancellable *cancellable */ + on_stream_stopped, + stream); return FALSE; } @@ -717,11 +975,56 @@ pinos_stream_stop (PinosStream *stream) return TRUE; } +static void +on_channel_removed (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + PinosStream *stream = user_data; + PinosStreamPrivate *priv = stream->priv; + GVariant *ret; + GError *error = NULL; + + g_assert (priv->channel == G_DBUS_PROXY (source_object)); + + priv->disconnecting = FALSE; + g_clear_object (&priv->channel); + + ret = g_dbus_proxy_call_finish (G_DBUS_PROXY (source_object), res, &error); + if (ret == NULL) + goto proxy_failed; + + g_variant_unref (ret); + + stream_set_state (stream, PINOS_STREAM_STATE_UNCONNECTED, NULL); + g_object_unref (stream); + return; + + /* ERRORS */ +proxy_failed: + { + g_warning ("failed to disconnect: %s", error->message); + stream_set_state (stream, PINOS_STREAM_STATE_ERROR, error); + g_object_unref (stream); + return; + } +} + + static gboolean do_disconnect (PinosStream *stream) { PinosStreamPrivate *priv = stream->priv; + g_dbus_proxy_call (priv->channel, + "Remove", + g_variant_new ("()"), + G_DBUS_CALL_FLAGS_NONE, + -1, + NULL, /* GCancellable *cancellable */ + on_channel_removed, + stream); + return FALSE; } @@ -742,7 +1045,7 @@ pinos_stream_disconnect (PinosStream *stream) g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); priv = stream->priv; g_return_val_if_fail (priv->state >= PINOS_STREAM_STATE_READY, FALSE); - g_return_val_if_fail (priv->node != NULL, FALSE); + g_return_val_if_fail (priv->channel != NULL, FALSE); context = priv->context; g_return_val_if_fail (pinos_context_get_state (context) >= PINOS_CONTEXT_STATE_CONNECTED, FALSE); g_return_val_if_fail (!priv->disconnecting, FALSE); @@ -770,12 +1073,10 @@ pinos_stream_peek_buffer (PinosStream *stream) { PinosStreamPrivate *priv; - g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); - + g_return_val_if_fail (PINOS_IS_STREAM (stream), NULL); priv = stream->priv; - //g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE); - return NULL; + return priv->buffer; } /** @@ -790,11 +1091,9 @@ pinos_stream_peek_buffer (PinosStream *stream) void pinos_stream_buffer_builder_init (PinosStream *stream, PinosBufferBuilder *builder) { - PinosStreamPrivate *priv; - g_return_if_fail (PINOS_IS_STREAM (stream)); - priv = stream->priv; + pinos_buffer_builder_init (builder); } /** @@ -825,5 +1124,10 @@ pinos_stream_send_buffer (PinosStream *stream, priv = stream->priv; g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE); + if (!pinos_io_write_buffer (priv->fd, buffer, &error)) { + g_warning ("stream %p: failed to read buffer: %s", stream, error->message); + g_clear_error (&error); + return FALSE; + } return TRUE; } diff --git a/pinos/client/stream.h b/pinos/client/stream.h index b33cc1e05..b71a4f76c 100644 --- a/pinos/client/stream.h +++ b/pinos/client/stream.h @@ -59,6 +59,7 @@ typedef enum { typedef enum { PINOS_STREAM_MODE_SOCKET = 0, PINOS_STREAM_MODE_BUFFER = 1, + PINOS_STREAM_MODE_RINGBUFFER = 2, } PinosStreamMode; /** diff --git a/pinos/client/subscribe.c b/pinos/client/subscribe.c index 089d6abb7..2f7237f45 100644 --- a/pinos/client/subscribe.c +++ b/pinos/client/subscribe.c @@ -108,6 +108,9 @@ notify_event (PinosSubscribe *subscribe, else if (g_strcmp0 (interface_name, "org.pinos.Port1") == 0) { flags = PINOS_SUBSCRIPTION_FLAG_PORT; } + else if (g_strcmp0 (interface_name, "org.pinos.Channel1") == 0) { + flags = PINOS_SUBSCRIPTION_FLAG_CHANNEL; + } g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, event, flags, data->proxy); } diff --git a/pinos/client/subscribe.h b/pinos/client/subscribe.h index eb14a2b1a..820385969 100644 --- a/pinos/client/subscribe.h +++ b/pinos/client/subscribe.h @@ -47,10 +47,11 @@ typedef enum { typedef enum { PINOS_SUBSCRIPTION_FLAG_DAEMON = (1 << 0), PINOS_SUBSCRIPTION_FLAG_NODE = (1 << 1), - PINOS_SUBSCRIPTION_FLAG_PORT = (1 << 2) + PINOS_SUBSCRIPTION_FLAG_PORT = (1 << 2), + PINOS_SUBSCRIPTION_FLAG_CHANNEL = (1 << 3) } PinosSubscriptionFlags; -#define PINOS_SUBSCRIPTION_FLAGS_ALL 0x07 +#define PINOS_SUBSCRIPTION_FLAGS_ALL 0x0f typedef enum { PINOS_SUBSCRIPTION_EVENT_NEW = 0, diff --git a/pinos/dbus/org.pinos.xml b/pinos/dbus/org.pinos.xml index 888e44a8f..b1ab9b9d5 100644 --- a/pinos/dbus/org.pinos.xml +++ b/pinos/dbus/org.pinos.xml @@ -69,6 +69,12 @@ + + + + + +