libnm/tests: extend handling connections in python test service

This commit is contained in:
Thomas Haller 2015-12-23 13:50:19 +01:00
parent 570d24b88c
commit 8859cd0738

View file

@ -10,6 +10,7 @@ import dbus.service
import dbus.mainloop.glib
import random
import collections
import uuid
mainloop = GLib.MainLoop()
@ -918,6 +919,15 @@ class NetworkManager(ExportedObj):
def AutoRemoveNextConnection(self):
settings.auto_remove_next_connection()
@dbus.service.method(dbus_interface=IFACE_TEST, in_signature='a{sa{sv}}b', out_signature='o')
def AddConnection(self, connection, verify_connection):
return settings.add_connection(connection, verify_connection)
@dbus.service.method(dbus_interface=IFACE_TEST, in_signature='sa{sa{sv}}b', out_signature='')
def UpdateConnection(self, path, connection, verify_connection):
return settings.update_connection(connection, path, verify_connection)
###################################################################
IFACE_CONNECTION = 'org.freedesktop.NetworkManager.Settings.Connection'
@ -934,17 +944,15 @@ class MissingSettingException(dbus.DBusException):
_dbus_error_name = IFACE_CONNECTION + '.MissingSetting'
class Connection(dbus.service.Object):
def __init__(self, bus, object_path, settings, remove_func):
dbus.service.Object.__init__(self, bus, object_path)
def __init__(self, bus, object_path, settings, remove_func, verify_connection=True):
if 'connection' not in settings:
raise MissingSettingException('connection: setting is required')
s_con = settings['connection']
if 'type' not in s_con:
raise MissingPropertyException('connection.type: property is required')
type = s_con['type']
if not type in ['802-3-ethernet', '802-11-wireless', 'vlan', 'wimax']:
raise InvalidPropertyException('connection.type: unsupported connection type')
if self.get_uuid(settings) is None:
if 'connection' not in settings:
settings['connection'] = { }
settings['connection']['uuid'] = uuid.uuid4()
self.verify(settings, verify_strict=verify_connection)
dbus.service.Object.__init__(self, bus, object_path)
self.path = object_path
self.settings = settings
@ -953,6 +961,45 @@ class Connection(dbus.service.Object):
self.props = {}
self.props['Unsaved'] = False
def get_uuid(self, settings=None):
if settings is None:
settings = self.settings
if 'connection' in settings:
s_con = settings['connection']
if 'uuid' in s_con:
return s_con['uuid']
return None
def verify(self, settings=None, verify_strict=True):
if settings is None:
settings = self.settings;
if 'connection' not in settings:
raise MissingSettingException('connection: setting is required')
s_con = settings['connection']
if 'type' not in s_con:
raise MissingPropertyException('connection.type: property is required')
if 'uuid' not in s_con:
raise MissingPropertyException('connection.uuid: property is required')
if 'id' not in s_con:
raise MissingPropertyException('connection.id: property is required')
if not verify_strict:
return;
t = s_con['type']
if t not in ['802-3-ethernet', '802-11-wireless', 'vlan', 'wimax']:
raise InvalidPropertyException('connection.type: unsupported connection type "%s"' % (t))
def update_connection(self, settings, verify_connection):
self.verify(settings, verify_strict=verify_connection)
old_uuid = self.get_uuid()
new_uuid = self.get_uuid(settings)
if old_uuid != new_uuid:
raise InvalidPropertyException('connection.uuid: cannot change the uuid from %s to %s' % (old_uuid, new_uuid))
self.settings = settings;
self.Updated()
# Properties interface
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}')
def GetAll(self, iface):
@ -986,6 +1033,10 @@ class Connection(dbus.service.Object):
self.Removed()
self.remove_from_connection()
@dbus.service.method(dbus_interface=IFACE_CONNECTION, in_signature='a{sa{sv}}', out_signature='')
def Update(self, settings):
self.update_connection(settings, TRUE)
@dbus.service.signal(IFACE_CONNECTION, signature='')
def Removed(self):
pass
@ -1024,9 +1075,18 @@ class Settings(dbus.service.Object):
@dbus.service.method(dbus_interface=IFACE_SETTINGS, in_signature='a{sa{sv}}', out_signature='o')
def AddConnection(self, settings):
return self.add_connection(settings)
def add_connection(self, settings, verify_connection=True):
path = "/org/freedesktop/NetworkManager/Settings/Connection/{0}".format(self.counter)
con = Connection(self.bus, path, settings, self.delete_connection, verify_connection)
uuid = con.get_uuid()
if uuid in [c.get_uuid() for c in self.connections.itervalues()]:
raise InvalidSettingException('cannot add duplicate connection with uuid %s' % (uuid))
self.counter = self.counter + 1
self.connections[path] = Connection(self.bus, path, settings, self.delete_connection)
self.connections[path] = con
self.props['Connections'] = dbus.Array(self.connections.keys(), 'o')
self.NewConnection(path)
self.PropertiesChanged({ 'connections': self.props['Connections'] })
@ -1037,6 +1097,14 @@ class Settings(dbus.service.Object):
return path
def update_connection(self, connection, path=None, verify_connection=True):
if path is None:
path = connection.path
if path not in self.connections:
raise UnknownConnectionException('Connection not found')
con = self.connections[path]
con.update_connection(connection, verify_connection)
def delete_connection(self, connection):
del self.connections[connection.path]
self.props['Connections'] = dbus.Array(self.connections.keys(), 'o')