Avoid more autogenerated code

Avoid using the autogenerated object manager because it does not
emit signals in the right context in all cases. Use our own proxy
and make our own proxy objects when they appear.

Make it possible to get a previously created proxy object or wait when
it is still being constructed.
This commit is contained in:
Wim Taymans 2015-05-04 10:38:26 +02:00
parent c185755b3f
commit 0a9f79b675
8 changed files with 565 additions and 561 deletions

View file

@ -23,8 +23,6 @@
#include "client/pv-enumtypes.h"
#include "client/pv-subscribe.h"
#include "dbus/org-pulsevideo.h"
#include "client/pv-private.h"
#define PV_CONTEXT_GET_PRIVATE(obj) \
@ -300,27 +298,6 @@ context_set_state (PvContext *context, PvContextState state)
}
}
static void
on_client_proxy (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PvContext *context = user_data;
PvContextPrivate *priv = context->priv;
GError *error = NULL;
g_assert (g_main_context_get_thread_default () == priv->context);
priv->client = pv_client1_proxy_new_finish (res, &error);
if (priv->client == NULL) {
priv->error = error;
context_set_state (context, PV_CONTEXT_STATE_ERROR);
g_error ("failed to get client proxy: %s", error->message);
return;
}
context_set_state (context, PV_CONTEXT_STATE_READY);
}
static void
on_client_connected (GObject *source_object,
GAsyncResult *res,
@ -328,26 +305,23 @@ on_client_connected (GObject *source_object,
{
PvContext *context = user_data;
PvContextPrivate *priv = context->priv;
GVariant *ret;
GError *error = NULL;
gchar *client_path;
g_assert (g_main_context_get_thread_default () == priv->context);
if (!pv_daemon1_call_connect_client_finish (priv->daemon, &client_path, res, &error)) {
ret = g_dbus_proxy_call_finish (priv->daemon, res, &error);
if (ret == NULL) {
g_error ("failed to connect client: %s", error->message);
priv->error = error;
context_set_state (context, PV_CONTEXT_STATE_ERROR);
g_error ("failed to connect client: %s", error->message);
return;
}
pv_client1_proxy_new (priv->connection,
G_DBUS_PROXY_FLAGS_NONE,
PV_DBUS_SERVICE,
client_path,
NULL,
on_client_proxy,
context);
g_free (client_path);
g_variant_get (ret, "(o)", &priv->client_path);
g_print ("got client %s\n", priv->client_path);
g_variant_unref (ret);
}
static void
@ -357,24 +331,23 @@ on_daemon_connected (GObject *source_object,
{
PvContext *context = user_data;
PvContextPrivate *priv = context->priv;
GVariantBuilder builder;
g_assert (g_main_context_get_thread_default () == priv->context);
context_set_state (context, PV_CONTEXT_STATE_REGISTERING);
{
GVariantBuilder builder;
g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}"));
g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string ("hello"));
g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}"));
g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string ("hello"));
pv_daemon1_call_connect_client (priv->daemon,
g_variant_builder_end (&builder), /* GVariant *arg_properties */
NULL, /* GCancellable *cancellable */
on_client_connected,
context);
}
g_dbus_proxy_call (priv->daemon,
"ConnectClient",
g_variant_new ("(@a{sv})", g_variant_builder_end (&builder)),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL,
on_client_connected,
context);
}
static void
@ -391,13 +364,16 @@ subscription_cb (PvSubscribe *subscribe,
g_assert (g_main_context_get_thread_default () == priv->context);
switch (flags) {
case PV_SUBSCRIPTION_FLAGS_DAEMON:
priv->daemon = PV_DAEMON1 (object);
priv->daemon = object;
break;
case PV_SUBSCRIPTION_FLAGS_CLIENT:
if (g_strcmp0 (g_dbus_proxy_get_object_path (object), priv->client_path) == 0) {
priv->client = object;
context_set_state (context, PV_CONTEXT_STATE_READY);
}
break;
case PV_SUBSCRIPTION_FLAGS_SOURCE:
@ -547,16 +523,37 @@ on_client_disconnected (GObject *source_object,
PvContext *context = user_data;
PvContextPrivate *priv = context->priv;
GError *error = NULL;
GVariant *ret;
if (!pv_client1_call_disconnect_finish (priv->client, res, &error)) {
ret = g_dbus_proxy_call_finish (priv->client, res, &error);
if (ret == NULL) {
g_error ("failed to disconnect client: %s", error->message);
priv->error = error;
context_set_state (context, PV_CONTEXT_STATE_ERROR);
g_error ("failed to disconnect client: %s", error->message);
return;
}
g_variant_unref (ret);
context_set_state (context, PV_CONTEXT_STATE_UNCONNECTED);
}
static gboolean
do_disconnect (PvContext *context)
{
PvContextPrivate *priv = context->priv;
g_dbus_proxy_call (priv->client,
"Disconnect",
g_variant_new ("()"),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL,
on_client_disconnected,
context);
return FALSE;
}
/**
* pv_context_disconnect:
* @context: a #PvContext
@ -575,10 +572,8 @@ pv_context_disconnect (PvContext *context)
priv = context->priv;
g_return_val_if_fail (priv->client != NULL, FALSE);
pv_client1_call_disconnect (priv->client,
NULL, /* GCancellable *cancellable */
on_client_disconnected,
context);
g_main_context_invoke (priv->context, (GSourceFunc) do_disconnect, context);
return TRUE;
}

View file

@ -23,8 +23,6 @@
#include "client/pv-enumtypes.h"
#include "client/pv-subscribe.h"
#include "dbus/org-pulsevideo.h"
#include "client/pv-private.h"
/**
@ -51,9 +49,7 @@ pv_context_list_source_info (PvContext *context,
GDBusProxy *proxy = walk->data;
PvSourceInfo info;
info.name = pv_source1_get_name (PV_SOURCE1 (proxy));
info.properties = pv_source1_get_properties (PV_SOURCE1 (proxy));
info.state = pv_source1_get_state (PV_SOURCE1 (proxy));
info.name = "v4l2";
cb (context, &info, user_data);
}

View file

@ -28,11 +28,14 @@ struct _PvContextPrivate
GDBusConnection *connection;
PvContextFlags flags;
PvContextState state;
GError *error;
PvDaemon1 *daemon;
GDBusProxy *daemon;
PvClient1 *client;
gchar *client_path;
GDBusProxy *client;
PvSubscriptionFlags subscription_mask;
PvSubscribe *subscribe;
@ -40,8 +43,6 @@ struct _PvContextPrivate
GList *sources;
GDBusObjectManagerServer *server_manager;
GError *error;
};
GDBusProxy * pv_context_find_source (PvContext *context, const gchar *name, GVariant *props);

View file

@ -20,12 +20,11 @@
#include <gio/gunixfdlist.h>
#include "server/pv-daemon.h"
#include "client/pulsevideo.h"
#include "client/pv-context.h"
#include "client/pv-stream.h"
#include "client/pv-enumtypes.h"
#include "dbus/org-pulsevideo.h"
#include "client/pv-private.h"
struct _PvStreamPrivate
@ -38,9 +37,9 @@ struct _PvStreamPrivate
GError *error;
gchar *source_output_path;
PvSource1 *source;
GDBusProxy *source;
GVariant *spec;
PvSourceOutput1 *source_output;
GDBusProxy *source_output;
GSocket *socket;
PvStreamMode mode;
@ -325,33 +324,36 @@ pv_stream_get_error (PvStream *stream)
static void
on_request_reconfigure (PvSourceOutput1 *interface,
GVariant *props,
gpointer user_data)
on_source_output_signal (GDBusProxy *proxy,
gchar *sender_name,
gchar *signal_name,
GVariant *parameters,
gpointer user_data)
{
g_print ("on request reconfigure\n");
g_print ("on source output signal\n");
}
static void
on_source_output1_proxy (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
on_source_output_proxy (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PvStream *stream = user_data;
PvStreamPrivate *priv = stream->priv;
GError *error = NULL;
g_assert (g_main_context_get_thread_default () == priv->context->priv->context);
priv->source_output = pv_source_output1_proxy_new_finish (res, &error);
if (priv->source_output == NULL)
goto source_output_failed;
g_signal_connect (priv->source_output, "request-reconfigure", (GCallback) on_request_reconfigure, stream);
g_signal_connect (priv->source_output,
"g-signal",
(GCallback) on_source_output_signal,
stream);
stream_set_state (stream, PV_STREAM_STATE_READY);
return;
/* ERRORS */
source_output_failed:
{
priv->error = error;
@ -369,23 +371,27 @@ on_source_output_created (GObject *source_object,
PvStream *stream = user_data;
PvStreamPrivate *priv = stream->priv;
PvContext *context = priv->context;
GVariant *ret;
GError *error = NULL;
g_assert (g_main_context_get_thread_default () == priv->context->priv->context);
if (!pv_source1_call_create_source_output_finish (priv->source,
&priv->source_output_path, res, &error))
ret = g_dbus_proxy_call_finish (priv->source, res, &error);
if (ret == NULL)
goto create_failed;
g_variant_get (ret, "(o)", &priv->source_output_path);
g_print ("got source-output %s\n", priv->source_output_path);
g_variant_unref (ret);
priv->source_output = pv_subscribe_get_proxy (context->priv->subscribe,
PV_DBUS_SERVICE,
priv->source_output_path,
"org.pulsevideo.SourceOutput1",
NULL,
on_source_ouput_proxy,
stream);
pv_source_output1_proxy_new (context->priv->connection,
G_DBUS_PROXY_FLAGS_NONE,
g_dbus_proxy_get_name (G_DBUS_PROXY (priv->source)),
priv->source_output_path,
NULL,
on_source_output1_proxy,
stream);
return;
/* ERRORS */
@ -398,42 +404,6 @@ create_failed:
}
}
static void
on_source_output_removed (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PvStream *stream = user_data;
PvStreamPrivate *priv = stream->priv;
GError *error = NULL;
g_assert (g_main_context_get_thread_default () == priv->context->priv->context);
if (!pv_source_output1_call_remove_finish (priv->source_output,
res, &error)) {
priv->error = error;
stream_set_state (stream, PV_STREAM_STATE_ERROR);
g_print ("failed to disconnect: %s", error->message);
return;
}
g_clear_pointer (&priv->source_output_path, g_free);
g_clear_object (&priv->source_output);
}
static gboolean
remove_source_output (PvStream *stream)
{
PvStreamPrivate *priv = stream->priv;
g_assert (g_main_context_get_thread_default () == priv->context->priv->context);
pv_source_output1_call_remove (priv->source_output,
NULL, /* GCancellable *cancellable */
on_source_output_removed,
stream);
return TRUE;
}
static gboolean
do_connect_capture (PvStream *stream)
{
@ -441,11 +411,15 @@ do_connect_capture (PvStream *stream)
g_assert (g_main_context_get_thread_default () == priv->context->priv->context);
pv_source1_call_create_source_output (priv->source,
priv->spec, /* GVariant *arg_props */
NULL, /* GCancellable *cancellable */
on_source_output_created,
stream);
g_dbus_proxy_call (priv->source,
"CreateSourceOutput",
g_variant_new ("(@a{sv})", priv->spec),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_source_output_created,
stream);
return FALSE;
}
@ -477,7 +451,7 @@ pv_stream_connect_capture (PvStream *stream,
priv->target = g_strdup (source);
priv->source = PV_SOURCE1 (pv_context_find_source (context, priv->target, NULL));
priv->source = pv_context_find_source (context, priv->target, NULL);
if (priv->source == NULL) {
g_warning ("can't find source");
return FALSE;
@ -491,6 +465,50 @@ pv_stream_connect_capture (PvStream *stream,
return TRUE;
}
static void
on_source_output_removed (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PvStream *stream = user_data;
PvStreamPrivate *priv = stream->priv;
GVariant *ret;
GError *error = NULL;
g_assert (g_main_context_get_thread_default () == priv->context->priv->context);
ret = g_dbus_proxy_call_finish (priv->source, res, &error);
if (ret == NULL) {
priv->error = error;
stream_set_state (stream, PV_STREAM_STATE_ERROR);
g_print ("failed to disconnect: %s", error->message);
return;
}
g_clear_pointer (&priv->source_output_path, g_free);
g_clear_object (&priv->source_output);
stream_set_state (stream, PV_STREAM_STATE_UNCONNECTED);
}
static gboolean
do_disconnect (PvStream *stream)
{
PvStreamPrivate *priv = stream->priv;
g_assert (g_main_context_get_thread_default () == priv->context->priv->context);
g_dbus_proxy_call (priv->source_output,
"Remove",
g_variant_new ("()"),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_source_output_removed,
stream);
return FALSE;
}
/**
* pv_stream_disconnect:
* @stream: a #PvStream
@ -512,9 +530,8 @@ pv_stream_disconnect (PvStream *stream)
context = priv->context;
g_return_val_if_fail (pv_context_get_state (context) == PV_CONTEXT_STATE_READY, FALSE);
remove_source_output (stream);
g_main_context_invoke (context->priv->context, (GSourceFunc) do_disconnect, stream);
stream_set_state (stream, PV_STREAM_STATE_UNCONNECTED);
return TRUE;
}
@ -648,7 +665,7 @@ on_stream_started (GObject *source_object,
GError *error = NULL;
GVariant *result;
result = g_dbus_proxy_call_with_unix_fd_list_finish (G_DBUS_PROXY (priv->source_output),
result = g_dbus_proxy_call_with_unix_fd_list_finish (priv->source_output,
&out_fd_list,
res,
&error);
@ -700,11 +717,15 @@ do_start (PvStream *stream)
g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}"));
g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string ("hello"));
pv_source_output1_call_start (priv->source_output,
g_variant_builder_end (&builder), /* GVariant *arg_properties */
NULL, /* GCancellable *cancellable */
on_stream_started,
stream);
g_dbus_proxy_call (priv->source_output,
"Start",
g_variant_new ("(@a{sv})", g_variant_builder_end (&builder)),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_stream_started,
stream);
return FALSE;
}
@ -750,12 +771,15 @@ on_stream_stopped (GObject *source_object,
{
PvStream *stream = user_data;
PvStreamPrivate *priv = stream->priv;
GVariant *ret;
GError *error = NULL;
if (!pv_source_output1_call_stop_finish (priv->source_output,
res, &error))
ret = g_dbus_proxy_call_finish (priv->source_output, res, &error);
if (ret == NULL)
goto call_failed;
g_variant_unref (ret);
unhandle_socket (stream);
stream_set_state (stream, PV_STREAM_STATE_READY);
@ -777,10 +801,15 @@ do_stop (PvStream *stream)
{
PvStreamPrivate *priv = stream->priv;
pv_source_output1_call_stop (priv->source_output,
NULL, /* GCancellable *cancellable */
on_stream_stopped,
stream);
g_dbus_proxy_call (priv->source_output,
"Stop",
g_variant_new ("()"),
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL, /* GCancellable *cancellable */
on_stream_stopped,
stream);
return FALSE;
}

View file

@ -22,25 +22,34 @@
#include "client/pulsevideo.h"
#include "client/pv-enumtypes.h"
#include "dbus/org-pulsevideo.h"
struct _PvSubscribePrivate
{
PvSubscriptionState state;
GDBusConnection *connection;
gchar *service;
GCancellable *cancellable;
PvSubscriptionFlags subscription_mask;
GDBusObjectManager *client_manager;
guint pending_subscribes;
GDBusConnection *connection;
GCancellable *cancellable;
GHashTable *senders;
GDBusProxy *manager_proxy;
guint pending_proxies;
GList *objects;
PvSubscriptionState state;
GError *error;
};
typedef struct
{
PvSubscribe *subscribe;
gchar *sender_name;
gchar *object_path;
gchar *interface_name;
gboolean pending;
GDBusProxy *proxy;
GList *tasks;
} PvObjectData;
#define PV_SUBSCRIBE_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), PV_TYPE_SUBSCRIBE, PvSubscribePrivate))
@ -64,43 +73,6 @@ enum
static guint signals[LAST_SIGNAL] = { 0 };
typedef struct {
PvSubscribe *subscribe;
gchar *sender;
guint id;
PvSubscribe *sender_subscribe;
GList *clients;
gulong signal_event;
gulong signal_state;
} SenderData;
static void
notify_subscription (PvSubscribe *subscribe,
GDBusObject *object,
GDBusInterface *interface,
PvSubscriptionEvent event);
static void
on_sender_subscription_event (PvSubscribe *sender_subscribe,
PvSubscriptionEvent event,
PvSubscriptionFlags flags,
GDBusProxy *object,
gpointer user_data)
{
SenderData *data = user_data;
PvSubscribe *subscribe = data->subscribe;
g_print ("on sender subscription def: %p\n", g_main_context_get_thread_default ());
g_signal_emit (subscribe,
signals[SIGNAL_SUBSCRIPTION_EVENT],
0,
event,
flags,
object);
}
static void
subscription_set_state (PvSubscribe *subscribe, PvSubscriptionState state)
{
@ -113,309 +85,231 @@ subscription_set_state (PvSubscribe *subscribe, PvSubscriptionState state)
}
static void
on_sender_subscription_state (GObject *object,
GParamSpec *pspec,
gpointer user_data)
notify_event (PvSubscribe *subscribe,
PvObjectData *data,
PvSubscriptionEvent event)
{
SenderData *data = user_data;
const gchar *interface_name;
PvSubscriptionFlags flags = 0;
interface_name = g_dbus_proxy_get_interface_name (data->proxy);
if (g_strcmp0 (interface_name, "org.pulsevideo.Daemon1") == 0) {
flags = PV_SUBSCRIPTION_FLAGS_DAEMON;
}
else if (g_strcmp0 (interface_name, "org.pulsevideo.Client1") == 0) {
flags = PV_SUBSCRIPTION_FLAGS_CLIENT;
}
else if (g_strcmp0 (interface_name, "org.pulsevideo.Source1") == 0) {
flags = PV_SUBSCRIPTION_FLAGS_SOURCE;
}
else if (g_strcmp0 (interface_name, "org.pulsevideo.SourceOutput1") == 0) {
flags = PV_SUBSCRIPTION_FLAGS_SOURCE_OUTPUT;
}
g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0,
event, flags, data->proxy);
}
static void
on_proxy_properties_changed (GDBusProxy *proxy,
GVariant *changed_properties,
GStrv invalidated_properties,
gpointer user_data)
{
ProxyData *data = user_data;
notify_event (subscribe, data->proxy, PV_SUBSCRIPTION_EVENT_CHANGE);
}
static void
on_proxy_created (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
ProxyData *data = user_data;
PvSubscribe *subscribe = data->subscribe;
PvSubscribePrivate *priv = subscribe->priv;
PvSubscriptionState state;
GError *error = NULL;
GList *walk;
g_object_get (object, "state", &state, NULL);
switch (state) {
case PV_SUBSCRIPTION_STATE_READY:
if (--priv->pending_subscribes == 0)
subscription_set_state (subscribe, state);
break;
case PV_SUBSCRIPTION_STATE_ERROR:
subscription_set_state (subscribe, state);
break;
default:
break;
}
}
static void
client_name_appeared_handler (GDBusConnection *connection,
const gchar *name,
const gchar *name_owner,
gpointer user_data)
{
SenderData *data = user_data;
g_print ("client name appeared def: %p\n", g_main_context_get_thread_default ());
if (!g_strcmp0 (name, g_dbus_connection_get_unique_name (connection)))
data->proxy = g_dbus_proxy_new_finish (res, &error);
if (data->proxy == NULL) {
priv->objects = g_list_remove (priv->objects, data);
g_warning ("could not create proxy: %s", error->message);
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_ERROR);
priv->error = error;
return;
}
g_print ("appeared client %s %p\n", name, data);
/* subscribe to Source events. We want to be notified when this new
* sender add/change/remove sources and outputs */
data->sender_subscribe = pv_subscribe_new ();
g_object_set (data->sender_subscribe, "service", data->sender,
"subscription-mask", PV_SUBSCRIPTION_FLAGS_ALL,
"connection", connection,
NULL);
data->signal_event = g_signal_connect (data->sender_subscribe,
"subscription-event",
(GCallback) on_sender_subscription_event,
g_signal_connect (data->proxy,
"g-properties-changed",
(GCallback) on_proxy_properties_changed,
data);
data->signal_state = g_signal_connect (data->sender_subscribe,
"notify::state",
(GCallback) on_sender_subscription_state,
data);
}
static void
remove_client (PvClient1 *client, SenderData *data)
{
g_signal_emit (data->subscribe,
signals[SIGNAL_SUBSCRIPTION_EVENT],
0,
PV_SUBSCRIPTION_EVENT_REMOVE,
PV_SUBSCRIPTION_FLAGS_CLIENT,
client);
}
notify_event (subscribe, data, PV_SUBSCRIPTION_EVENT_NEW);
static void
client_name_vanished_handler (GDBusConnection *connection,
const gchar *name,
gpointer user_data)
{
SenderData *data = user_data;
g_print ("vanished client %s %p\n", name, data);
g_bus_unwatch_name (data->id);
}
static void
data_free (SenderData *data)
{
g_print ("free client %s %p\n", data->sender, data);
g_list_foreach (data->clients, (GFunc) remove_client, data);
g_hash_table_remove (data->subscribe->priv->senders, data->sender);
if (data->sender_subscribe) {
g_signal_handler_disconnect (data->sender_subscribe, data->signal_event);
g_signal_handler_disconnect (data->sender_subscribe, data->signal_state);
g_object_unref (data->sender_subscribe);
for (walk = data->tasks; walk; walk = g_list_next (walk)) {
GTask *task = walk->data;
g_task_return_pointer (task, g_object_ref (data->proxy), g_object_unref);
g_object_unref (task);
}
g_list_free (data->tasks);
data->tasks = NULL;
g_free (data->sender);
g_free (data);
}
static SenderData *
sender_data_new (PvSubscribe *subscribe, const gchar *sender)
{
PvSubscribePrivate *priv = subscribe->priv;
SenderData *data;
data = g_new0 (SenderData, 1);
data->subscribe = subscribe;
data->sender = g_strdup (sender);
g_print ("watch name def: %p\n", g_main_context_get_thread_default ());
g_print ("watch name %s %p\n", sender, data);
data->id = g_bus_watch_name_on_connection (priv->connection,
sender,
G_BUS_NAME_WATCHER_FLAGS_NONE,
client_name_appeared_handler,
client_name_vanished_handler,
data,
(GDestroyNotify) data_free);
g_hash_table_insert (priv->senders, data->sender, data);
priv->pending_subscribes++;
return data;
}
static void
notify_subscription (PvSubscribe *subscribe,
GDBusObject *object,
GDBusInterface *interface,
PvSubscriptionEvent event)
{
PvSubscribePrivate *priv = subscribe->priv;
g_print ("notify subscription def: %p\n", g_main_context_get_thread_default ());
//g_assert (g_main_context_get_thread_default ());
if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_DAEMON) {
PvDaemon1 *daemon;
if (interface == NULL)
daemon = pv_object_peek_daemon1 (PV_OBJECT (object));
else if (PV_IS_DAEMON1_PROXY (interface))
daemon = PV_DAEMON1 (interface);
else
daemon = NULL;
if (daemon) {
g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, event,
PV_SUBSCRIPTION_FLAGS_DAEMON, daemon);
}
}
if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_CLIENT) {
PvClient1 *client;
if (interface == NULL)
client = pv_object_peek_client1 (PV_OBJECT (object));
else if (PV_IS_CLIENT1_PROXY (interface))
client = PV_CLIENT1 (interface);
else
client = NULL;
if (client) {
const gchar *sender;
SenderData *data;
sender = pv_client1_get_name (client);
data = g_hash_table_lookup (priv->senders, sender);
if (data == NULL && event != PV_SUBSCRIPTION_EVENT_REMOVE) {
data = sender_data_new (subscribe, sender);
}
if (data) {
if (event == PV_SUBSCRIPTION_EVENT_NEW)
data->clients = g_list_prepend (data->clients, client);
else if (event == PV_SUBSCRIPTION_EVENT_REMOVE)
data->clients = g_list_remove (data->clients, client);
g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, event,
PV_SUBSCRIPTION_FLAGS_CLIENT, client);
}
}
}
if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_SOURCE) {
PvSource1 *source;
if (interface == NULL)
source = pv_object_peek_source1 (PV_OBJECT (object));
else if (PV_IS_SOURCE1_PROXY (interface))
source = PV_SOURCE1 (interface);
else
source = NULL;
if (source) {
g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, event,
PV_SUBSCRIPTION_FLAGS_SOURCE, source);
}
}
if (priv->subscription_mask & PV_SUBSCRIPTION_FLAGS_SOURCE_OUTPUT) {
PvSourceOutput1 *output;
if (interface == NULL)
output = pv_object_peek_source_output1 (PV_OBJECT (object));
else if PV_IS_SOURCE_OUTPUT1_PROXY (interface)
output = PV_SOURCE_OUTPUT1 (interface);
else
output = NULL;
if (output) {
g_signal_emit (subscribe, signals[SIGNAL_SUBSCRIPTION_EVENT], 0, event,
PV_SUBSCRIPTION_FLAGS_SOURCE_OUTPUT, output);
}
}
}
static void
on_client_manager_interface_added (GDBusObjectManager *manager,
GDBusObject *object,
GDBusInterface *interface,
gpointer user_data)
{
PvSubscribe *subscribe = user_data;
notify_subscription (subscribe, object, interface, PV_SUBSCRIPTION_EVENT_NEW);
}
static void
on_client_manager_interface_removed (GDBusObjectManager *manager,
GDBusObject *object,
GDBusInterface *interface,
gpointer user_data)
{
PvSubscribe *subscribe = user_data;
notify_subscription (subscribe, object, interface, PV_SUBSCRIPTION_EVENT_REMOVE);
}
static void
on_client_manager_object_added (GDBusObjectManager *manager,
GDBusObject *object,
gpointer user_data)
{
PvSubscribe *subscribe = user_data;
notify_subscription (subscribe, object, NULL, PV_SUBSCRIPTION_EVENT_NEW);
}
static void
on_client_manager_object_removed (GDBusObjectManager *manager,
GDBusObject *object,
gpointer user_data)
{
PvSubscribe *subscribe = user_data;
notify_subscription (subscribe, object, NULL, PV_SUBSCRIPTION_EVENT_REMOVE);
}
static void
on_client_manager_properties_changed (GDBusObjectManagerClient *manager,
GDBusObjectProxy *object_proxy,
GDBusProxy *interface_proxy,
GVariant *changed_properties,
GStrv invalidated_properties,
gpointer user_data)
{
g_print ("properties changed\n");
}
static void
on_client_manager_signal (GDBusObjectManagerClient *manager,
GDBusObjectProxy *object_proxy,
GDBusProxy *interface_proxy,
gchar *sender_name,
gchar *signal_name,
GVariant *parameters,
gpointer user_data)
{
g_print ("proxy signal %s\n", signal_name);
}
static void
client_manager_appeared (PvSubscribe *subscribe)
{
PvSubscribePrivate *priv = subscribe->priv;
GList *objects, *walk;
g_print ("client manager appeared def: %p\n", g_main_context_get_thread_default ());
objects = g_dbus_object_manager_get_objects (G_DBUS_OBJECT_MANAGER (priv->client_manager));
for (walk = objects; walk ; walk = g_list_next (walk)) {
on_client_manager_object_added (G_DBUS_OBJECT_MANAGER (priv->client_manager),
walk->data,
subscribe);
}
if (--priv->pending_subscribes == 0)
if (--priv->pending_proxies == 0)
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_READY);
}
static void
client_manager_disappeared (PvSubscribe *subscribe)
add_interface (PvSubscribe *subscribe,
const gchar *object_path,
const gchar *interface_name,
GVariant *properties)
{
PvSubscribePrivate *priv = subscribe->priv;
ProxyData *data;
data = g_new0 (ProxyData, 1);
data->subscribe = subscribe;
data->sender_name = g_strdup (priv->service);
data->object_path = g_strdup (object_path);
data->interface_name = g_strdup (interface_name);
priv->objects = g_list_prepend (priv->objects, data);
priv->pending_proxies++;
g_dbus_proxy_new (priv->connection,
G_DBUS_PROXY_FLAGS_NONE,
NULL, /* GDBusInterfaceInfo* */
priv->service,
object_path,
interface_name,
priv->cancellable,
on_proxy_created,
data);
}
static void
add_ifaces_and_properties (PvSubscribe *subscribe,
const gchar *object_path,
GVariant *ifaces_and_properties)
{
GVariantIter iter;
const gchar *interface_name;
GVariant *properties;
g_variant_iter_init (&iter, ifaces_and_properties);
while (g_variant_iter_next (&iter,
"{&s@a{sv}}",
&interface_name,
&properties)) {
add_interface (subscribe, object_path, interface_name, properties);
g_variant_unref (properties);
}
}
static void
on_manager_proxy_signal (GDBusProxy *proxy,
const gchar *sender_name,
const gchar *signal_name,
GVariant *parameters,
gpointer user_data)
{
PvSubscribe *subscribe = user_data;
const gchar *object_path;
g_print ("proxy signal %s %p\n", signal_name, g_main_context_get_thread_default ());
if (g_strcmp0 (signal_name, "InterfacesAdded") == 0) {
GVariant *ifaces_and_properties;
g_variant_get (parameters,
"(&o@a{sa{sv}})",
&object_path,
&ifaces_and_properties);
add_ifaces_and_properties (subscribe, object_path, ifaces_and_properties);
g_variant_unref (ifaces_and_properties);
} else if (g_strcmp0 (signal_name, "InterfacesRemoved") == 0) {
const gchar **ifaces;
g_variant_get (parameters,
"(&o^a&s)",
&object_path,
&ifaces);
g_free (ifaces);
}
}
static void
on_managed_objects_ready (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PvSubscribe *subscribe = user_data;
PvSubscribePrivate *priv = subscribe->priv;
GError *error = NULL;
GVariant *objects;
GVariant *arg0;
const gchar *object_path;
GVariant *ifaces_and_properties;
GVariantIter object_iter;
objects = g_dbus_proxy_call_finish (priv->manager_proxy, res, &error);
if (objects == NULL) {
g_warning ("could not get objects: %s", error->message);
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_ERROR);
priv->error = error;
return;
}
arg0 = g_variant_get_child_value (objects, 0);
g_variant_iter_init (&object_iter, arg0);
while (g_variant_iter_next (&object_iter,
"{&o@a{sa{sv}}}",
&object_path,
&ifaces_and_properties)) {
add_ifaces_and_properties (subscribe, object_path, ifaces_and_properties);
g_variant_unref (ifaces_and_properties);
}
g_variant_unref (arg0);
g_variant_unref (objects);
if (priv->pending_proxies == 0)
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_READY);
}
static void
manager_proxy_appeared (PvSubscribe *subscribe)
{
PvSubscribePrivate *priv = subscribe->priv;
g_print ("client manager appeared def: %p\n", g_main_context_get_thread_default ());
g_dbus_proxy_call (priv->manager_proxy,
"GetManagedObjects",
NULL, /* parameters */
G_DBUS_CALL_FLAGS_NONE,
-1,
priv->cancellable,
on_managed_objects_ready,
subscribe);
}
static void
manager_proxy_disappeared (PvSubscribe *subscribe)
{
}
static void
on_client_manager_name_owner (GObject *object,
GParamSpec *pspec,
gpointer user_data)
on_manager_proxy_name_owner (GObject *object,
GParamSpec *pspec,
gpointer user_data)
{
PvSubscribe *subscribe = user_data;
PvSubscribePrivate *priv = subscribe->priv;
@ -423,16 +317,16 @@ on_client_manager_name_owner (GObject *object,
g_print ("client manager owner def: %p\n", g_main_context_get_thread_default ());
g_object_get (priv->client_manager, "name-owner", &name_owner, NULL);
g_object_get (priv->manager_proxy, "g-name-owner", &name_owner, NULL);
g_print ("client manager %s %s\n",
g_dbus_object_manager_client_get_name (G_DBUS_OBJECT_MANAGER_CLIENT (priv->client_manager)),
g_dbus_proxy_get_name (G_DBUS_PROXY (priv->manager_proxy)),
name_owner);
if (name_owner) {
client_manager_appeared (subscribe);
manager_proxy_appeared (subscribe);
g_free (name_owner);
} else {
client_manager_disappeared (subscribe);
manager_proxy_disappeared (subscribe);
}
}
@ -444,40 +338,31 @@ connect_client_signals (PvSubscribe *subscribe)
g_print ("add signals def: %p\n", g_main_context_get_thread_default ());
g_signal_connect (priv->client_manager, "notify::name-owner",
(GCallback) on_client_manager_name_owner, subscribe);
g_signal_connect (priv->client_manager, "interface-added",
(GCallback) on_client_manager_interface_added, subscribe);
g_signal_connect (priv->client_manager, "interface-removed",
(GCallback) on_client_manager_interface_removed, subscribe);
g_signal_connect (priv->client_manager, "object-added",
(GCallback) on_client_manager_object_added, subscribe);
g_signal_connect (priv->client_manager, "object-removed",
(GCallback) on_client_manager_object_removed, subscribe);
g_signal_connect (priv->client_manager, "interface-proxy-signal",
(GCallback) on_client_manager_signal, subscribe);
g_signal_connect (priv->client_manager, "interface-proxy-properties-changed",
(GCallback) on_client_manager_properties_changed, subscribe);
g_signal_connect (priv->manager_proxy, "notify::g-name-owner",
(GCallback) on_manager_proxy_name_owner, subscribe);
g_signal_connect (priv->manager_proxy, "g-signal",
(GCallback) on_manager_proxy_signal, subscribe);
}
static void
on_client_manager_ready (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
on_manager_proxy_ready (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
PvSubscribe *subscribe = user_data;
PvSubscribePrivate *priv = subscribe->priv;
GError *error = NULL;
g_print ("client manager ready def: %p\n", g_main_context_get_thread_default ());
g_print ("manager proyx ready def: %p\n", g_main_context_get_thread_default ());
priv->client_manager = pv_object_manager_client_new_finish (res, &error);
if (priv->client_manager == NULL)
priv->manager_proxy = g_dbus_proxy_new_finish (res, &error);
if (priv->manager_proxy == NULL)
goto manager_error;
connect_client_signals (subscribe);
on_client_manager_name_owner (G_OBJECT (priv->client_manager), NULL, subscribe);
on_manager_proxy_name_owner (G_OBJECT (priv->manager_proxy), NULL, subscribe);
g_object_unref (subscribe);
return;
@ -503,14 +388,15 @@ install_subscription (PvSubscribe *subscribe)
g_print ("new client manager def: %p\n", g_main_context_get_thread_default ());
g_print ("new client manager for %s\n", priv->service);
pv_object_manager_client_new (priv->connection,
G_DBUS_OBJECT_MANAGER_CLIENT_FLAGS_NONE,
priv->service,
PV_DBUS_OBJECT_PREFIX,
priv->cancellable,
on_client_manager_ready,
g_object_ref (subscribe));
priv->pending_subscribes++;
g_dbus_proxy_new (priv->connection,
G_DBUS_PROXY_FLAGS_NONE,
NULL, /* GDBusInterfaceInfo* */
priv->service,
PV_DBUS_OBJECT_PREFIX,
"org.freedesktop.DBus.ObjectManager",
priv->cancellable,
on_manager_proxy_ready,
g_object_ref (subscribe));
}
static void
@ -518,7 +404,7 @@ uninstall_subscription (PvSubscribe *subscribe)
{
PvSubscribePrivate *priv = subscribe->priv;
g_clear_object (&priv->client_manager);
g_clear_object (&priv->manager_proxy);
g_clear_error (&priv->error);
subscription_set_state (subscribe, PV_SUBSCRIPTION_STATE_UNCONNECTED);
}
@ -599,9 +485,8 @@ pv_subscribe_finalize (GObject * object)
g_print ("cancel\n");
g_cancellable_cancel (priv->cancellable);
g_hash_table_unref (priv->senders);
if (priv->client_manager)
g_object_unref (priv->client_manager);
if (priv->manager_proxy)
g_object_unref (priv->manager_proxy);
g_object_unref (priv->cancellable);
g_free (priv->service);
@ -703,7 +588,6 @@ pv_subscribe_init (PvSubscribe * subscribe)
PvSubscribePrivate *priv = subscribe->priv = PV_SUBSCRIBE_GET_PRIVATE (subscribe);
priv->service = g_strdup (PV_DBUS_SERVICE);
priv->senders = g_hash_table_new (g_str_hash, g_str_equal);
priv->state = PV_SUBSCRIPTION_STATE_UNCONNECTED;
priv->cancellable = g_cancellable_new ();
}
@ -745,3 +629,61 @@ pv_subscribe_get_error (PvSubscribe *subscribe)
return priv->error;
}
static gint
compare_data (PvObjectData *data, const gchar *name,
const gchar *object_path,
const gchar *interface_name)
{
gint res;
if ((res = g_strcmp0 (data->sender_name, name)) != 0)
return res;
if ((res = g_strcmp0 (data->object_path, object_path)) != 0)
return res;
return g_strcmp0 (data->interface_name, interface_name);
}
void
pv_subscribe_get_proxy (PvSubscribe *subscribe,
GDBusProxyFlags flags,
GDBusInterfaceInfo *info,
const gchar *name,
const gchar *object_path,
const gchar *interface_name,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
PvSubscribePrivate *priv;
GDBusProxy *res = NULL;
GList *walk;
g_return_val_if_fail (PV_IS_SUBSCRIBE (subscribe), NULL);
priv = subscribe->priv;
for (walk = priv->objects; walk; walk = g_list_next (walk)) {
PvObjectData *data = walk->data;
if (compare_data (data, name, object_path, interface_name) == 0) {
GTask *task;
task = g_task_new (subscribe,
cancellable,
callback,
user_data);
if (data->pending) {
g_task_set_task_data (task, data, NULL);
data->tasks = g_list_prepend (data->tasks, task);
} else if (data->proxy) {
g_task_return_pointer (task, g_object_ref (data->proxy), g_object_unref);
} else {
g_task_return_error (task, NULL);
}
break;
}
}
}

View file

@ -87,6 +87,13 @@ PvSubscribe * pv_subscribe_new (void);
PvSubscriptionState pv_subscribe_get_state (PvSubscribe *subscribe);
GError * pv_subscribe_get_error (PvSubscribe *subscribe);
GDBusProxy * pv_subscribe_get_proxy (PvSubscribe *subscribe,
const gchar *name,
const gchar *object_path,
const gchar *interface_name,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
G_END_DECLS

View file

@ -23,9 +23,11 @@
#include "server/pv-daemon.h"
#include "server/pv-client.h"
#include "modules/v4l2/pv-v4l2-source.h"
#include "dbus/org-pulsevideo.h"
#include "modules/v4l2/pv-v4l2-source.h"
#define PV_DAEMON_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), PV_TYPE_DAEMON, PvDaemonPrivate))
@ -36,10 +38,16 @@ struct _PvDaemonPrivate
GDBusObjectManagerServer *server_manager;
PvSubscribe *subscribe;
GHashTable *clients;
GList *sources;
GHashTable *senders;
};
typedef struct {
guint id;
gchar *sender;
PvDaemon *daemon;
GList *clients;
} SenderData;
static void
on_server_subscription_event (PvSubscribe *subscribe,
PvSubscriptionEvent event,
@ -47,8 +55,6 @@ on_server_subscription_event (PvSubscribe *subscribe,
GDBusProxy *object,
gpointer user_data)
{
PvDaemon *daemon = user_data;
PvDaemonPrivate *priv = daemon->priv;
const gchar *name, *object_path;
name = g_dbus_proxy_get_name (object);
@ -58,27 +64,81 @@ on_server_subscription_event (PvSubscribe *subscribe,
switch (event) {
case PV_SUBSCRIPTION_EVENT_NEW:
{
if (PV_IS_SOURCE1 (object)) {
priv->sources = g_list_prepend (priv->sources, object);
}
break;
}
case PV_SUBSCRIPTION_EVENT_CHANGE:
break;
case PV_SUBSCRIPTION_EVENT_REMOVE:
{
if (PV_IS_SOURCE1 (object)) {
priv->sources = g_list_remove (priv->sources, object);
} else if (PV_IS_CLIENT1 (object)) {
g_hash_table_remove (priv->clients, object_path);
}
break;
}
}
}
static void
client_name_appeared_handler (GDBusConnection *connection,
const gchar *name,
const gchar *name_owner,
gpointer user_data)
{
SenderData *data = user_data;
g_print ("client name appeared def: %p\n", g_main_context_get_thread_default ());
if (!g_strcmp0 (name, g_dbus_connection_get_unique_name (connection)))
return;
g_print ("appeared client %s %p\n", name, data);
}
static void
client_name_vanished_handler (GDBusConnection *connection,
const gchar *name,
gpointer user_data)
{
SenderData *data = user_data;
g_print ("vanished client %s %p\n", name, data);
g_bus_unwatch_name (data->id);
}
static void
data_free (SenderData *data)
{
g_print ("free client %s %p\n", data->sender, data);
g_list_free_full (data->clients, g_object_unref);
g_hash_table_remove (data->daemon->priv->senders, data->sender);
g_free (data->sender);
g_free (data);
}
static SenderData *
sender_data_new (PvDaemon *daemon, const gchar *sender)
{
PvDaemonPrivate *priv = daemon->priv;
SenderData *data;
data = g_new0 (SenderData, 1);
data->daemon = daemon;
data->sender = g_strdup (sender);
g_print ("watch name def: %p\n", g_main_context_get_thread_default ());
g_print ("watch name %s %p\n", sender, data);
data->id = g_bus_watch_name_on_connection (priv->connection,
sender,
G_BUS_NAME_WATCHER_FLAGS_NONE,
client_name_appeared_handler,
client_name_vanished_handler,
data,
(GDestroyNotify) data_free);
g_hash_table_insert (priv->senders, data->sender, data);
return data;
}
static gboolean
handle_connect_client (PvDaemon1 *interface,
@ -90,15 +150,19 @@ handle_connect_client (PvDaemon1 *interface,
PvDaemonPrivate *priv = daemon->priv;
PvClient *client;
const gchar *sender, *object_path;
SenderData *data;
sender = g_dbus_method_invocation_get_sender (invocation);
g_print ("connect client %s\n", sender);
data = g_hash_table_lookup (priv->senders, sender);
if (data == NULL)
data = sender_data_new (daemon, sender);
client = pv_client_new (daemon, sender, PV_DBUS_OBJECT_PREFIX);
object_path = pv_client_get_object_path (client);
g_hash_table_insert (priv->clients, g_strdup (object_path), client);
data->clients = g_list_prepend (data->clients, client);
pv_daemon1_complete_connect_client (interface, invocation, object_path);
@ -301,32 +365,6 @@ pv_daemon_remove_source (PvDaemon *daemon, PvSource *source)
pv_source_set_manager (source, NULL);
}
/**
* pv_daemon_get_source:
* @daemon: a #PvDaemon
* @name: a name
*
* Find a #PvSource1 for @name in @daemon
*
* Returns: a #PvSource1
*/
PvSource1 *
pv_daemon_get_source (PvDaemon *daemon, const gchar *name)
{
PvDaemonPrivate *priv;
PvSource1 *source;
g_return_val_if_fail (PV_IS_DAEMON (daemon), NULL);
priv = daemon->priv;
if (priv->sources == NULL)
return NULL;
source = priv->sources->data;
return source;
}
G_DEFINE_TYPE (PvDaemon, pv_daemon, G_TYPE_OBJECT);
static void
@ -357,7 +395,7 @@ pv_daemon_init (PvDaemon * daemon)
PvDaemonPrivate *priv = daemon->priv = PV_DAEMON_GET_PRIVATE (daemon);
priv->server_manager = g_dbus_object_manager_server_new (PV_DBUS_OBJECT_PREFIX);
priv->clients = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
priv->senders = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
priv->subscribe = pv_subscribe_new ();
g_signal_connect (priv->subscribe,

View file

@ -23,8 +23,6 @@
#include <glib-object.h>
#include <gio/gio.h>
#include "dbus/org-pulsevideo.h"
G_BEGIN_DECLS
#define PV_TYPE_DAEMON (pv_daemon_get_type ())
@ -76,8 +74,6 @@ void pv_daemon_unexport (PvDaemon *daemon, const gchar *nam
void pv_daemon_add_source (PvDaemon *daemon, PvSource *source);
void pv_daemon_remove_source (PvDaemon *daemon, PvSource *source);
PvSource1 * pv_daemon_get_source (PvDaemon *daemon, const gchar *name);
G_END_DECLS
#endif /* __PV_DAEMON_H__ */