mirror of
https://gitlab.freedesktop.org/NetworkManager/NetworkManager
synced 2024-09-06 09:04:55 +00:00
5672757ac4
Previously, the mock server did not behave as expected when given an aliased property and the test results were not matching that of actual NM daemon behavior.
2996 lines
106 KiB
Python
Executable file
2996 lines
106 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
from __future__ import print_function
|
|
|
|
import sys
|
|
|
|
import gi
|
|
from gi.repository import GLib
|
|
|
|
try:
|
|
gi.require_version("NM", "1.0")
|
|
from gi.repository import NM
|
|
except Exception as e:
|
|
print("Cannot load gi.NM: %s" % (str(e)))
|
|
sys.exit(77)
|
|
|
|
import os
|
|
import dbus
|
|
import dbus.service
|
|
import dbus.mainloop.glib
|
|
import random
|
|
import uuid
|
|
import hashlib
|
|
import socket
|
|
import collections
|
|
|
|
###############################################################################
|
|
|
|
_DEFAULT_ARG = object()
|
|
|
|
###############################################################################
|
|
|
|
|
|
class AliasedProperty:
|
|
def __init__(self, old_name, new_name):
|
|
self.old_name = old_name
|
|
self.new_name = new_name
|
|
|
|
|
|
ALIASED_PROPERTIES = {
|
|
NM.SETTING_CONNECTION_SETTING_NAME: [
|
|
AliasedProperty(
|
|
NM.SETTING_CONNECTION_MASTER,
|
|
"controller", # NM.SETTING_CONNECTION_CONTROLLER
|
|
),
|
|
AliasedProperty(
|
|
NM.SETTING_CONNECTION_SLAVE_TYPE,
|
|
"port-type", # NM.SETTING_CONNECTION_PORT_TYPE
|
|
),
|
|
AliasedProperty(NM.SETTING_CONNECTION_AUTOCONNECT_SLAVES, "autoconnect-ports"),
|
|
],
|
|
NM.SETTING_WIRELESS_SETTING_NAME: [
|
|
AliasedProperty(
|
|
NM.SETTING_WIRELESS_MAC_ADDRESS_BLACKLIST, "mac-address-denylist"
|
|
)
|
|
],
|
|
}
|
|
|
|
###############################################################################
|
|
|
|
|
|
class Global:
|
|
pass
|
|
|
|
|
|
gl = None
|
|
|
|
###############################################################################
|
|
|
|
|
|
class TestError(AssertionError):
|
|
def __init__(self, message="Unspecified error", errors=None):
|
|
AssertionError.__init__(self, message)
|
|
self.errors = errors
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
class Util:
|
|
PY3 = sys.version_info[0] == 3
|
|
|
|
@staticmethod
|
|
def g_source_remove(source_id):
|
|
if source_id is not None:
|
|
GLib.source_remove(source_id)
|
|
|
|
@staticmethod
|
|
def addr_family_check(family, allow_af_unspec=False):
|
|
if family == socket.AF_INET:
|
|
return
|
|
if family == socket.AF_INET6:
|
|
return
|
|
if allow_af_unspec and family == socket.AF_UNSPEC:
|
|
return
|
|
raise TestError("invalid address family %s" % (family))
|
|
|
|
@staticmethod
|
|
def ip_addr_pton(addr, family=None):
|
|
if addr is None:
|
|
return (None, None)
|
|
if family is not None and family is not socket.AF_UNSPEC:
|
|
Util.addr_family_check(family)
|
|
a = socket.inet_pton(family, addr)
|
|
else:
|
|
a = None
|
|
family = None
|
|
try:
|
|
a = socket.inet_pton(socket.AF_INET, addr)
|
|
family = socket.AF_INET
|
|
except:
|
|
a = socket.inet_pton(socket.AF_INET6, addr)
|
|
family = socket.AF_INET6
|
|
if Util.PY3:
|
|
a = tuple([int(c) for c in a])
|
|
else:
|
|
a = tuple([ord(c) for c in a])
|
|
return (a, family)
|
|
|
|
@staticmethod
|
|
def ip_addr_ntop(addr, family=None):
|
|
if Util.PY3:
|
|
a = bytes(addr)
|
|
else:
|
|
a = "".join([chr(c) for c in addr])
|
|
if len(a) == 4:
|
|
f = socket.AF_INET
|
|
elif len(a) == 16:
|
|
f = socket.AF_INET6
|
|
else:
|
|
raise TestError("Invalid binary IP address '%s'" % (repr(addr)))
|
|
if family is not None and f != family:
|
|
raise TestError(
|
|
"Unexpected address family. Expected %s but ip address was %s"
|
|
% (family, repr(addr))
|
|
)
|
|
return socket.inet_ntop(f, a)
|
|
|
|
@staticmethod
|
|
def ip_addr_norm(addr, family=None):
|
|
a, family = Util.ip_addr_pton(addr, family)
|
|
return (Util.ip_addr_ntop(a, family), family)
|
|
|
|
@staticmethod
|
|
def ip4_addr_be32(addr):
|
|
# return the IPv4 address as 32 bit integer in network byte order
|
|
# (big endian).
|
|
a, family = Util.ip_addr_pton(addr, socket.AF_INET)
|
|
n = 0
|
|
for i in range(4):
|
|
n = (n << 8) + a[i]
|
|
return socket.htonl(n)
|
|
|
|
@staticmethod
|
|
def ip6_addr_ay(addr):
|
|
return Util.ip_addr_pton(addr, socket.AF_INET6)[0]
|
|
|
|
@staticmethod
|
|
def ip_net_parse(net, family=None):
|
|
parts = net.split("/")
|
|
if len(parts) != 2:
|
|
raise TestError(
|
|
"Invalid IP network '%s' has not '/' for the prefix length" % (net)
|
|
)
|
|
prefix = int(parts[1])
|
|
addr, family = Util.ip_addr_norm(parts[0], family)
|
|
if family == socket.AF_INET:
|
|
if prefix < 0 or prefix > 32:
|
|
raise TestError("Invalid prefix length for IPv4 address '%s'" % (net))
|
|
else:
|
|
if prefix < 0 or prefix > 128:
|
|
raise TestError("Invalid prefix length for IPv4 address '%s'" % (net))
|
|
return (addr, prefix, family)
|
|
|
|
class RandomSeed:
|
|
def __init__(self, seed):
|
|
self.cnt = 0
|
|
self.seed = str(seed)
|
|
|
|
def _next(self):
|
|
c = self.cnt
|
|
self.cnt += 1
|
|
return self.seed + "-" + str(c)
|
|
|
|
@staticmethod
|
|
def wrap(seed):
|
|
if seed is None:
|
|
return None
|
|
if isinstance(seed, Util.RandomSeed):
|
|
return seed
|
|
return Util.RandomSeed(seed)
|
|
|
|
@staticmethod
|
|
def get(seed, extra_seed=None):
|
|
if seed is None:
|
|
return None
|
|
if isinstance(seed, Util.RandomSeed):
|
|
seed = seed._next()
|
|
else:
|
|
seed = str(seed)
|
|
if extra_seed is None:
|
|
try:
|
|
extra_seed = Util.RandomSeed._extra_seed
|
|
except:
|
|
extra_seed = os.environ.get(
|
|
"NM_TEST_NETWORKMANAGER_SERVICE_SEED", ""
|
|
)
|
|
Util.RandomSeed._extra_seed = extra_seed
|
|
return extra_seed + seed
|
|
|
|
@staticmethod
|
|
def random_stream(seed, length=None):
|
|
seed = Util.RandomSeed.wrap(seed)
|
|
# generates a stream of integers, in the range [0..255]
|
|
if seed is None:
|
|
# without a seed, we generate new random numbers.
|
|
while length is None or length > 0:
|
|
yield random.randint(0, 255)
|
|
if length is not None:
|
|
length -= 1
|
|
return
|
|
v = None
|
|
while length is None or length > 0:
|
|
if not v:
|
|
s = Util.RandomSeed.get(seed)
|
|
s = s.encode("utf8")
|
|
v = hashlib.sha256(s).hexdigest()
|
|
yield int(v[0:2], 16)
|
|
v = v[2:]
|
|
if length is not None:
|
|
length -= 1
|
|
|
|
@staticmethod
|
|
def random_int(seed, v_start=_DEFAULT_ARG, v_end=_DEFAULT_ARG):
|
|
# - if neither start not end is give, return a number in the range
|
|
# u32 range [0, 0xFFFFFFFF]
|
|
# - if only start is given (the first argument), interpret it as
|
|
# the range of the interval. That is, return random number in
|
|
# range [0, start-1]
|
|
# - if end and start is given, return a random number with this
|
|
# range (inclusive!): [start, end]
|
|
if v_end is _DEFAULT_ARG:
|
|
# if only one edge is provided (no v_end), then the range
|
|
# is [0, v_start[. That is, random_int(seed, 5), returns
|
|
# values from 0 to 4.
|
|
if v_start is _DEFAULT_ARG:
|
|
# by default, return a 32u integer.
|
|
v_end = 0x100000000
|
|
else:
|
|
v_end = v_start
|
|
v_start = 0
|
|
else:
|
|
if v_start is _DEFAULT_ARG:
|
|
raise TestError("Cannot specify end without start")
|
|
# if a full range is provided, v_end is included.
|
|
# random_int(seed, 0, 4) returns values from 0 to 4.
|
|
v_end += 1
|
|
n = 0
|
|
span = v_end - v_start
|
|
assert span > 0
|
|
for r in Util.random_stream(seed):
|
|
n = n * 256 + r
|
|
if n > span:
|
|
break
|
|
return v_start + (n % span)
|
|
|
|
@staticmethod
|
|
def random_bool(seed):
|
|
return Util.random_int(seed, 0, 1) == 1
|
|
|
|
@staticmethod
|
|
def random_subset(seed, all_set):
|
|
all_set = list(all_set)
|
|
result = []
|
|
seed = Util.RandomSeed.wrap(seed)
|
|
for i in list(
|
|
range(Util.random_int(Util.RandomSeed.get(seed), len(all_set) + 1))
|
|
):
|
|
idx = Util.random_int(Util.RandomSeed.get(seed), len(all_set))
|
|
result.append(all_set[idx])
|
|
del all_set[idx]
|
|
return result
|
|
|
|
@staticmethod
|
|
def random_mac(seed):
|
|
return "%02X:%02X:%02X:%02X:%02X:%02X" % tuple(Util.random_stream(seed, 6))
|
|
|
|
@staticmethod
|
|
def random_ip(seed, net=None, family=None):
|
|
if net is not None:
|
|
mask, prefix, family = Util.ip_net_parse(net, family)
|
|
a_mask, unused = Util.ip_addr_pton(mask, family)
|
|
else:
|
|
prefix = None
|
|
Util.addr_family_check(family)
|
|
if family == socket.AF_INET:
|
|
l = 4
|
|
else:
|
|
l = 16
|
|
a = tuple(Util.random_stream(seed, l))
|
|
if prefix is not None:
|
|
a2 = []
|
|
for i in range(l):
|
|
if prefix == 0:
|
|
c = a[i]
|
|
elif prefix >= 8:
|
|
c = a_mask[i]
|
|
prefix -= 8
|
|
else:
|
|
c = 0xFF & (0xFF << (8 - prefix))
|
|
c = (a[i] & ~c) | (a_mask[i] & c)
|
|
prefix = 0
|
|
a2.append(c)
|
|
a = tuple(a2)
|
|
return (Util.ip_addr_ntop(a, family), family)
|
|
|
|
@staticmethod
|
|
def log(message):
|
|
if gl.log_file:
|
|
try:
|
|
gl.log_file.write(message + "\n")
|
|
gl.log_file.flush()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
print(message, file=sys.stderr)
|
|
|
|
@staticmethod
|
|
def variant_from_dbus(val):
|
|
if isinstance(val, (dbus.String, str)):
|
|
return GLib.Variant("s", str(val))
|
|
if isinstance(val, dbus.UInt32):
|
|
return GLib.Variant("u", int(val))
|
|
if isinstance(val, dbus.UInt64):
|
|
return GLib.Variant("t", int(val))
|
|
if isinstance(val, dbus.Int32):
|
|
return GLib.Variant("i", int(val))
|
|
if isinstance(val, dbus.Boolean):
|
|
return GLib.Variant("b", bool(val))
|
|
if isinstance(val, dbus.Byte):
|
|
return GLib.Variant("y", int(val))
|
|
if isinstance(val, dbus.Array):
|
|
try:
|
|
if val.signature == "s":
|
|
return GLib.Variant("as", [str(x) for x in val])
|
|
if val.signature == "b":
|
|
return GLib.Variant("ab", [bool(x) for x in val])
|
|
if val.signature == "y":
|
|
return GLib.Variant("ay", [int(x) for x in val])
|
|
if val.signature == "u":
|
|
return GLib.Variant("au", [int(x) for x in val])
|
|
if val.signature == "ay":
|
|
return GLib.Variant("aay", [Util.variant_from_dbus(x) for x in val])
|
|
if val.signature == "au":
|
|
return GLib.Variant("aau", [Util.variant_from_dbus(x) for x in val])
|
|
if val.signature == "a{sv}":
|
|
return GLib.Variant(
|
|
"aa{sv}",
|
|
[
|
|
collections.OrderedDict(
|
|
[
|
|
(str(k), Util.variant_from_dbus(v))
|
|
for k, v in addr.items()
|
|
]
|
|
)
|
|
for addr in val
|
|
],
|
|
)
|
|
if val.signature == "(ayuay)":
|
|
return GLib.Variant(
|
|
"a(ayuay)", [Util.variant_from_dbus(x) for x in val]
|
|
)
|
|
if val.signature == "(ayuayu)":
|
|
return GLib.Variant(
|
|
"a(ayuayu)", [Util.variant_from_dbus(x) for x in val]
|
|
)
|
|
except Exception as e:
|
|
raise Exception(
|
|
"Cannot convert array element to type '%s': %s" % (val.signature, e)
|
|
)
|
|
if isinstance(val, dbus.Dictionary):
|
|
if val.signature == "ss":
|
|
return GLib.Variant(
|
|
"a{ss}",
|
|
collections.OrderedDict([(str(k), str(v)) for k, v in val.items()]),
|
|
)
|
|
if val.signature == "sv":
|
|
return GLib.Variant(
|
|
"a{sv}",
|
|
collections.OrderedDict(
|
|
[(str(k), Util.variant_from_dbus(v)) for k, v in val.items()]
|
|
),
|
|
)
|
|
if val.signature == "sa{sv}" or val.signature == "sa{ss}":
|
|
c = collections.OrderedDict(
|
|
[
|
|
(
|
|
str(key1),
|
|
collections.OrderedDict(
|
|
[
|
|
(str(key2), Util.variant_from_dbus(arr2))
|
|
for key2, arr2 in arr1.items()
|
|
]
|
|
),
|
|
)
|
|
for key1, arr1 in val.items()
|
|
]
|
|
)
|
|
return GLib.Variant("a{sa{sv}}", c)
|
|
|
|
raise Exception("Unsupported type for value '%s'" % (repr(val)))
|
|
|
|
|
|
###############################################################################
|
|
|
|
IFACE_DBUS = "org.freedesktop.DBus"
|
|
IFACE_OBJECT_MANAGER = "org.freedesktop.DBus.ObjectManager"
|
|
IFACE_CONNECTION = "org.freedesktop.NetworkManager.Settings.Connection"
|
|
IFACE_DEVICE = "org.freedesktop.NetworkManager.Device"
|
|
IFACE_WIFI = "org.freedesktop.NetworkManager.Device.Wireless"
|
|
IFACE_TEST = "org.freedesktop.NetworkManager.LibnmGlibTest"
|
|
IFACE_NM = "org.freedesktop.NetworkManager"
|
|
IFACE_SETTINGS = "org.freedesktop.NetworkManager.Settings"
|
|
IFACE_AGENT_MANAGER = "org.freedesktop.NetworkManager.AgentManager"
|
|
IFACE_AGENT = "org.freedesktop.NetworkManager.SecretAgent"
|
|
IFACE_WIRED = "org.freedesktop.NetworkManager.Device.Wired"
|
|
IFACE_MODEM = "org.freedesktop.NetworkManager.Device.Modem"
|
|
IFACE_VLAN = "org.freedesktop.NetworkManager.Device.Vlan"
|
|
IFACE_WIFI_AP = "org.freedesktop.NetworkManager.AccessPoint"
|
|
IFACE_ACTIVE_CONNECTION = "org.freedesktop.NetworkManager.Connection.Active"
|
|
IFACE_VPN_CONNECTION = "org.freedesktop.NetworkManager.VPN.Connection"
|
|
IFACE_DNS_MANAGER = "org.freedesktop.NetworkManager.DnsManager"
|
|
IFACE_IP4_CONFIG = "org.freedesktop.NetworkManager.IP4Config"
|
|
IFACE_IP6_CONFIG = "org.freedesktop.NetworkManager.IP6Config"
|
|
IFACE_DHCP4_CONFIG = "org.freedesktop.NetworkManager.DHCP4Config"
|
|
IFACE_DHCP6_CONFIG = "org.freedesktop.NetworkManager.DHCP6Config"
|
|
|
|
###############################################################################
|
|
|
|
|
|
class BusErr:
|
|
class UnknownInterfaceException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.UnknownInterface".format(IFACE_DBUS)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class UnknownPropertyException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.UnknownProperty".format(IFACE_DBUS)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class InvalidPropertyException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.InvalidProperty".format(IFACE_CONNECTION)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class MissingPropertyException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.MissingProperty".format(IFACE_CONNECTION)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class InvalidSettingException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.InvalidSetting".format(IFACE_CONNECTION)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class MissingSettingException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.MissingSetting".format(IFACE_CONNECTION)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class NotSoftwareException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.NotSoftware".format(IFACE_DEVICE)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class ApNotFoundException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.AccessPointNotFound".format(IFACE_WIFI)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class PermissionDeniedException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.PermissionDenied".format(IFACE_NM)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class UnknownDeviceException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.UnknownDevice".format(IFACE_NM)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class UnknownConnectionException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.UnknownConnection".format(IFACE_NM)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class InvalidHostnameException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.InvalidHostname".format(IFACE_SETTINGS)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class NoSecretsException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.NoSecrets".format(IFACE_AGENT_MANAGER)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
class UserCanceledException(dbus.DBusException):
|
|
def __init__(self, *args, **kwargs):
|
|
self._dbus_error_name = "{}.UserCanceled".format(IFACE_AGENT_MANAGER)
|
|
dbus.DBusException.__init__(self, *args, **kwargs)
|
|
|
|
@staticmethod
|
|
def from_nmerror(e):
|
|
try:
|
|
domain, code = (e.domain, e.code)
|
|
except:
|
|
return None
|
|
if domain == GLib.quark_to_string(NM.ConnectionError.quark()):
|
|
if code == NM.ConnectionError.MISSINGSETTING:
|
|
return BusErr.MissingSettingException(e.message)
|
|
if code == NM.ConnectionError.INVALIDPROPERTY:
|
|
return BusErr.InvalidPropertyException(e.message)
|
|
return None
|
|
|
|
@staticmethod
|
|
def raise_nmerror(e):
|
|
e2 = BusErr.from_nmerror(e)
|
|
if e2 is not None:
|
|
raise e2
|
|
raise e
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
class NmUtil:
|
|
@staticmethod
|
|
def con_hash_to_connection(con_hash, do_verify=False, do_normalize=False):
|
|
x_con = []
|
|
for v_setting_name, v_setting in list(con_hash.items()):
|
|
if isinstance(v_setting_name, (dbus.String, str)):
|
|
v_setting_name = str(v_setting_name)
|
|
else:
|
|
raise Exception(
|
|
"Expected string dict, but got '%s' key" % (v_setting_name)
|
|
)
|
|
x_setting = []
|
|
for v_property_name, v_value in list(v_setting.items()):
|
|
if isinstance(v_property_name, (dbus.String, str)):
|
|
v_property_name = str(v_property_name)
|
|
else:
|
|
raise Exception(
|
|
"Expected string dict, but got '%s' subkey under %s (%s)"
|
|
% (v_property_name, v_setting_name, repr(con_hash))
|
|
)
|
|
try:
|
|
v = Util.variant_from_dbus(v_value)
|
|
except Exception as e:
|
|
raise Exception(
|
|
"Unsupported value %s.%s = %s (%s)"
|
|
% (v_setting_name, v_property_name, v_value, str(e))
|
|
)
|
|
x_setting.append((v_property_name, v))
|
|
|
|
x_con.append((v_setting_name, collections.OrderedDict(x_setting)))
|
|
|
|
x_con = GLib.Variant("a{sa{sv}}", collections.OrderedDict(x_con))
|
|
|
|
assert GLib.Variant.equal(x_con, Util.variant_from_dbus(con_hash))
|
|
|
|
try:
|
|
con = NM.SimpleConnection.new_from_dbus(x_con)
|
|
except:
|
|
if do_verify:
|
|
raise
|
|
return None
|
|
|
|
if do_normalize:
|
|
try:
|
|
con.normalize()
|
|
except:
|
|
if do_verify:
|
|
raise
|
|
|
|
if do_verify:
|
|
con.verify()
|
|
|
|
return con
|
|
|
|
@staticmethod
|
|
def con_hash_verify(con_hash, do_verify_strict=True):
|
|
if NM.SETTING_CONNECTION_SETTING_NAME not in con_hash:
|
|
raise BusErr.MissingSettingException("connection: setting is required")
|
|
s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME]
|
|
if NM.SETTING_CONNECTION_TYPE not in s_con:
|
|
raise BusErr.MissingPropertyException(
|
|
"connection.type: property is required"
|
|
)
|
|
if NM.SETTING_CONNECTION_UUID not in s_con:
|
|
raise BusErr.MissingPropertyException(
|
|
"connection.uuid: property is required"
|
|
)
|
|
if NM.SETTING_CONNECTION_ID not in s_con:
|
|
raise BusErr.MissingPropertyException("connection.id: property is required")
|
|
|
|
if not do_verify_strict:
|
|
return
|
|
t = s_con[NM.SETTING_CONNECTION_TYPE]
|
|
if t not in [
|
|
NM.SETTING_GSM_SETTING_NAME,
|
|
NM.SETTING_VLAN_SETTING_NAME,
|
|
NM.SETTING_VPN_SETTING_NAME,
|
|
NM.SETTING_WIMAX_SETTING_NAME,
|
|
NM.SETTING_WIRED_SETTING_NAME,
|
|
NM.SETTING_WIRELESS_SETTING_NAME,
|
|
]:
|
|
raise BusErr.InvalidPropertyException(
|
|
'connection.type: unsupported connection type "%s"' % (t)
|
|
)
|
|
|
|
try:
|
|
con_nm = NmUtil.con_hash_to_connection(
|
|
con_hash, do_verify=True, do_normalize=True
|
|
)
|
|
except Exception as e:
|
|
BusErr.raise_nmerror(e)
|
|
|
|
@staticmethod
|
|
def con_hash_get_id(con_hash):
|
|
if NM.SETTING_CONNECTION_SETTING_NAME in con_hash:
|
|
s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME]
|
|
if NM.SETTING_CONNECTION_ID in s_con:
|
|
return s_con[NM.SETTING_CONNECTION_ID]
|
|
return None
|
|
|
|
@staticmethod
|
|
def con_hash_get_uuid(con_hash):
|
|
if NM.SETTING_CONNECTION_SETTING_NAME in con_hash:
|
|
s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME]
|
|
if NM.SETTING_CONNECTION_UUID in s_con:
|
|
return s_con[NM.SETTING_CONNECTION_UUID]
|
|
return None
|
|
|
|
@staticmethod
|
|
def con_hash_get_type(con_hash):
|
|
if NM.SETTING_CONNECTION_SETTING_NAME in con_hash:
|
|
s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME]
|
|
if NM.SETTING_CONNECTION_TYPE in s_con:
|
|
return s_con[NM.SETTING_CONNECTION_TYPE]
|
|
return None
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
class ExportedObj(dbus.service.Object):
|
|
DBusInterface = collections.namedtuple("DBusInterface", ["dbus_iface", "props"])
|
|
|
|
@staticmethod
|
|
def create_path(klass, path_prefix=None):
|
|
if path_prefix is None:
|
|
path_prefix = klass.path_prefix
|
|
path = path_prefix + str(klass.path_counter_next)
|
|
klass.path_counter_next += 1
|
|
return path
|
|
|
|
@staticmethod
|
|
def to_path_array(src):
|
|
return dbus.Array(
|
|
[ExportedObj.to_path(o) for o in src] if src else [],
|
|
signature=dbus.Signature("o"),
|
|
)
|
|
|
|
@staticmethod
|
|
def to_path(src):
|
|
if src:
|
|
return dbus.ObjectPath(src.path)
|
|
return dbus.ObjectPath("/")
|
|
|
|
def __init__(self, object_path, ident=None):
|
|
dbus.service.Object.__init__(self)
|
|
|
|
self._dbus_ifaces = {}
|
|
self.path = object_path
|
|
|
|
# ident is an optional (unique) identifier for the instance.
|
|
# The test driver may set it to reference to the object by
|
|
# this identifier. For NetworkManager, the real ID of an
|
|
# object on D-Bus is the object_path. But that is generated
|
|
# by the stub server only after the test user created the
|
|
# object. The ident parameter may be specified by the user
|
|
# and thus can be hard-coded in the test.
|
|
if ident is None:
|
|
ident = object_path
|
|
self.ident = ident
|
|
|
|
def export(self):
|
|
self.add_to_connection(gl.bus, self.path)
|
|
gl.object_manager.add_object(self)
|
|
|
|
def unexport(self):
|
|
gl.object_manager.remove_object(self)
|
|
self.remove_from_connection()
|
|
|
|
def dbus_interface_add(self, dbus_iface, props):
|
|
self._dbus_ifaces[dbus_iface] = ExportedObj.DBusInterface(dbus_iface, props)
|
|
|
|
def _dbus_interface_get(self, dbus_iface):
|
|
if dbus_iface not in self._dbus_ifaces:
|
|
raise BusErr.UnknownInterfaceException()
|
|
return self._dbus_ifaces[dbus_iface]
|
|
|
|
def _dbus_interface_get_property(self, dbus_interface, propname=None):
|
|
props = dbus_interface.props
|
|
if propname is None:
|
|
return props
|
|
if propname not in props:
|
|
raise BusErr.UnknownPropertyException()
|
|
return props[propname]
|
|
|
|
def _dbus_property_get(self, dbus_iface, propname=None):
|
|
return self._dbus_interface_get_property(
|
|
self._dbus_interface_get(dbus_iface), propname
|
|
)
|
|
|
|
def _dbus_property_set(
|
|
self,
|
|
dbus_iface,
|
|
propname,
|
|
value,
|
|
allow_detect_dbus_iface=False,
|
|
dry_run=False,
|
|
force_update=False,
|
|
):
|
|
if allow_detect_dbus_iface and not dbus_iface:
|
|
props = None
|
|
for p, dbus_interface in self._dbus_ifaces.items():
|
|
if propname in dbus_interface.props:
|
|
if props is not None:
|
|
raise TestError(
|
|
"Cannot uniquely find the property '%s' on object '%s'"
|
|
% (propname, self.path)
|
|
)
|
|
props = dbus_interface.props
|
|
dbus_iface = p
|
|
if props is None:
|
|
raise TestError(
|
|
"Cannot find the property '%s' on object '%s'"
|
|
% (propname, self.path)
|
|
)
|
|
else:
|
|
try:
|
|
dbus_interface = self._dbus_interface_get(dbus_iface)
|
|
props = self._dbus_interface_get_property(dbus_interface)
|
|
except:
|
|
if dry_run:
|
|
raise TestError(
|
|
"No interface '%s' on '%s'" % (dbus_iface, self.path)
|
|
)
|
|
raise
|
|
|
|
if dry_run:
|
|
if propname not in props:
|
|
raise TestError(
|
|
"No property '%s' on '%s' on '%s'"
|
|
% (propname, dbus_iface, self.path)
|
|
)
|
|
|
|
permission_granted = False
|
|
|
|
if isinstance(self, ActiveConnection):
|
|
if dbus_iface == IFACE_ACTIVE_CONNECTION:
|
|
if propname == PRP_ACTIVE_CONNECTION_STATE:
|
|
permission_granted = True
|
|
elif dbus_iface == IFACE_VPN_CONNECTION:
|
|
if propname == PRP_VPN_CONNECTION_VPN_STATE:
|
|
permission_granted = True
|
|
|
|
if not permission_granted:
|
|
raise TestError(
|
|
"Cannot set property '%s' on '%s' on '%s' via D-Bus"
|
|
% (propname, dbus_iface, self.path)
|
|
)
|
|
|
|
return
|
|
|
|
assert propname in props
|
|
|
|
if not force_update:
|
|
if props[propname] == value:
|
|
return
|
|
|
|
props[propname] = value
|
|
self._dbus_property_notify(dbus_iface, propname)
|
|
|
|
def _dbus_property_notify(self, dbus_iface, propname):
|
|
dbus_interface = self._dbus_interface_get(dbus_iface)
|
|
prop = self._dbus_interface_get_property(dbus_interface, propname)
|
|
if propname is not None:
|
|
prop = {propname: prop}
|
|
ExportedObj.PropertiesChanged(self, dbus_iface, prop, [])
|
|
|
|
@dbus.service.signal(dbus.PROPERTIES_IFACE, signature="sa{sv}as")
|
|
def PropertiesChanged(self, iface, changed, invalidated):
|
|
pass
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=dbus.PROPERTIES_IFACE, in_signature="s", out_signature="a{sv}"
|
|
)
|
|
def GetAll(self, dbus_iface):
|
|
return self._dbus_property_get(dbus_iface)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=dbus.PROPERTIES_IFACE, in_signature="ss", out_signature="v"
|
|
)
|
|
def Get(self, dbus_iface, name):
|
|
return self._dbus_property_get(dbus_iface, name)
|
|
|
|
def get_managed_ifaces(self):
|
|
my_ifaces = {}
|
|
for iface in self._dbus_ifaces:
|
|
my_ifaces[iface] = self._dbus_ifaces[iface].props
|
|
return my_ifaces
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_DEVICE_UDI = "Udi"
|
|
PRP_DEVICE_IFACE = "Interface"
|
|
PRP_DEVICE_IPIFACE = "IpInterface"
|
|
PRP_DEVICE_DRIVER = "Driver"
|
|
PRP_DEVICE_STATE = "State"
|
|
PRP_DEVICE_STATE_REASON = "StateReason"
|
|
PRP_DEVICE_ACTIVE_CONNECTION = "ActiveConnection"
|
|
PRP_DEVICE_IP4_CONFIG = "Ip4Config"
|
|
PRP_DEVICE_IP6_CONFIG = "Ip6Config"
|
|
PRP_DEVICE_DHCP4_CONFIG = "Dhcp4Config"
|
|
PRP_DEVICE_DHCP6_CONFIG = "Dhcp6Config"
|
|
PRP_DEVICE_MANAGED = "Managed"
|
|
PRP_DEVICE_AUTOCONNECT = "Autoconnect"
|
|
PRP_DEVICE_DEVICE_TYPE = "DeviceType"
|
|
PRP_DEVICE_AVAILABLE_CONNECTIONS = "AvailableConnections"
|
|
PRP_DEVICE_LLDP_NEIGHBORS = "LldpNeighbors"
|
|
PRP_DEVICE_INTERFACE_FLAGS = "InterfaceFlags"
|
|
|
|
|
|
class Device(ExportedObj):
|
|
path_counter_next = 1
|
|
path_prefix = "/org/freedesktop/NetworkManager/Devices/"
|
|
|
|
def __init__(self, iface, devtype, ident=None):
|
|
if ident is None:
|
|
ident = iface
|
|
|
|
ExportedObj.__init__(self, ExportedObj.create_path(Device), ident)
|
|
|
|
self.applied_con = {}
|
|
self.ip4_config = None
|
|
self.ip6_config = None
|
|
self.dhcp4_config = None
|
|
self.dhcp6_config = None
|
|
self.activation_state_change_delay_ms = 50
|
|
|
|
self.prp_state = NM.DeviceState.UNAVAILABLE
|
|
|
|
if devtype == NM.DeviceType.MODEM:
|
|
udi = "/org/freedesktop/ModemManager1/Modem/0"
|
|
else:
|
|
udi = "/sys/devices/virtual/%s" % iface
|
|
|
|
props = {
|
|
PRP_DEVICE_UDI: udi,
|
|
PRP_DEVICE_IFACE: iface,
|
|
PRP_DEVICE_IPIFACE: iface,
|
|
PRP_DEVICE_DRIVER: "virtual",
|
|
PRP_DEVICE_STATE: dbus.UInt32(self.prp_state),
|
|
PRP_DEVICE_STATE_REASON: dbus.Struct(
|
|
(dbus.UInt32(self.prp_state), dbus.UInt32(NM.DeviceStateReason.NONE))
|
|
),
|
|
PRP_DEVICE_ACTIVE_CONNECTION: ExportedObj.to_path(None),
|
|
PRP_DEVICE_IP4_CONFIG: ExportedObj.to_path(self.ip4_config),
|
|
PRP_DEVICE_IP6_CONFIG: ExportedObj.to_path(self.ip6_config),
|
|
PRP_DEVICE_DHCP4_CONFIG: ExportedObj.to_path(self.dhcp4_config),
|
|
PRP_DEVICE_DHCP6_CONFIG: ExportedObj.to_path(self.dhcp6_config),
|
|
PRP_DEVICE_MANAGED: True,
|
|
PRP_DEVICE_AUTOCONNECT: True,
|
|
PRP_DEVICE_DEVICE_TYPE: dbus.UInt32(devtype),
|
|
PRP_DEVICE_AVAILABLE_CONNECTIONS: ExportedObj.to_path_array([]),
|
|
PRP_DEVICE_INTERFACE_FLAGS: dbus.UInt32(3), # up,lower-up
|
|
PRP_DEVICE_LLDP_NEIGHBORS: dbus.Array(
|
|
[
|
|
dbus.Dictionary(
|
|
{
|
|
"chassis-id-type": dbus.UInt32(6),
|
|
"chassis-id": dbus.String("00:11:22:33:44:00"),
|
|
"port-id-type": dbus.UInt32(7),
|
|
"port-id": dbus.String("Uplink port"),
|
|
"port-description": dbus.String("GigabitEthernet #1"),
|
|
"system-name": dbus.String("test1.example.com"),
|
|
"system-description": dbus.String("Test system #1"),
|
|
"system-capabilities": dbus.UInt32(20),
|
|
"destination": dbus.String("nearest-bridge"),
|
|
}
|
|
),
|
|
dbus.Dictionary(
|
|
{
|
|
"chassis-id-type": dbus.UInt32(2),
|
|
"chassis-id": dbus.String("chassis1"),
|
|
"port-id-type": dbus.UInt32(3),
|
|
"port-id": dbus.String("44:44:44:44:44:44"),
|
|
"port-description": dbus.String("GigabitEthernet #2"),
|
|
"system-name": dbus.String("test2.example.com"),
|
|
"system-description": dbus.String("Test system #2"),
|
|
"system-capabilities": dbus.UInt32(2047),
|
|
"destination": dbus.String("nearest-non-tpmr-bridge"),
|
|
"ieee-802-1-vlans": dbus.Array(
|
|
[
|
|
dbus.Dictionary(
|
|
{
|
|
"vid": dbus.UInt32(80),
|
|
"name": dbus.String("vlan80"),
|
|
},
|
|
signature="sv",
|
|
),
|
|
dbus.Dictionary(
|
|
{
|
|
"vid": dbus.UInt32(4000),
|
|
"name": dbus.String("My VLAN"),
|
|
},
|
|
signature="sv",
|
|
),
|
|
]
|
|
),
|
|
"ieee-802-1-ppvids": dbus.Array(
|
|
[
|
|
dbus.Dictionary(
|
|
{
|
|
"ppvid": dbus.UInt32(4),
|
|
"flags": dbus.UInt32(0x12),
|
|
},
|
|
signature="sv",
|
|
),
|
|
dbus.Dictionary(
|
|
{
|
|
"ppvid": dbus.UInt32(10),
|
|
"flags": dbus.UInt32(0x31),
|
|
},
|
|
signature="sv",
|
|
),
|
|
]
|
|
),
|
|
}
|
|
),
|
|
dbus.Dictionary(
|
|
{
|
|
"chassis-id-type": dbus.UInt32(6),
|
|
"chassis-id": dbus.String("00:11:22:33:44:22"),
|
|
"port-id-type": dbus.UInt32(1),
|
|
"port-id": dbus.String("port1"),
|
|
"port-description": dbus.String("GigabitEthernet #3"),
|
|
"system-name": dbus.String("test3.example.com"),
|
|
"system-description": dbus.String("Test system #3"),
|
|
"system-capabilities": dbus.UInt32(40),
|
|
"destination": dbus.String("nearest-customer-bridge"),
|
|
"management-addresses": dbus.Array(
|
|
[
|
|
dbus.Dictionary(
|
|
{
|
|
"address-subtype": dbus.UInt32(1),
|
|
"address": dbus.ByteArray(
|
|
b"\xc0\xa8\x01\x01"
|
|
),
|
|
"interface-number": dbus.UInt32(4),
|
|
"interface-number-subtype": dbus.UInt32(3),
|
|
"object-id": dbus.ByteArray(
|
|
b"\x01\x02\x03\x04"
|
|
),
|
|
},
|
|
signature="sv",
|
|
),
|
|
dbus.Dictionary(
|
|
{
|
|
"address-subtype": dbus.UInt32(2),
|
|
"address": dbus.ByteArray(
|
|
b"\xfd\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12\x34\x56\x78"
|
|
),
|
|
"interface-number": dbus.UInt32(1),
|
|
"interface-number-subtype": dbus.UInt32(2),
|
|
},
|
|
signature="sv",
|
|
),
|
|
]
|
|
),
|
|
"ieee-802-3-mac-phy-conf": dbus.Dictionary(
|
|
{
|
|
"autoneg": dbus.UInt32(3),
|
|
"pmd-autoneg-cap": dbus.UInt32(0xFE),
|
|
"operational-mau-type": dbus.UInt32(5),
|
|
},
|
|
signature="sv",
|
|
),
|
|
"ieee-802-3-power-via-mdi": dbus.Dictionary(
|
|
{
|
|
"mdi-power-support": dbus.UInt32(7),
|
|
"pse-power-pair": dbus.UInt32(6),
|
|
"power-class": dbus.UInt32(1),
|
|
},
|
|
signature="sv",
|
|
),
|
|
}
|
|
),
|
|
],
|
|
"a{sv}",
|
|
),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_DEVICE, props)
|
|
|
|
def start(self):
|
|
self.ip4_config = IP4Config()
|
|
self._dbus_property_set(
|
|
IFACE_DEVICE, PRP_DEVICE_IP4_CONFIG, ExportedObj.to_path(self.ip4_config)
|
|
)
|
|
self.ip6_config = IP6Config()
|
|
self._dbus_property_set(
|
|
IFACE_DEVICE, PRP_DEVICE_IP6_CONFIG, ExportedObj.to_path(self.ip6_config)
|
|
)
|
|
self.dhcp4_config = Dhcp4Config()
|
|
self._dbus_property_set(
|
|
IFACE_DEVICE,
|
|
PRP_DEVICE_DHCP4_CONFIG,
|
|
ExportedObj.to_path(self.dhcp4_config),
|
|
)
|
|
self.dhcp6_config = Dhcp6Config()
|
|
self._dbus_property_set(
|
|
IFACE_DEVICE,
|
|
PRP_DEVICE_DHCP6_CONFIG,
|
|
ExportedObj.to_path(self.dhcp6_config),
|
|
)
|
|
|
|
def stop(self):
|
|
self._dbus_property_set(
|
|
IFACE_DEVICE, PRP_DEVICE_IP4_CONFIG, ExportedObj.to_path(None)
|
|
)
|
|
if self.ip4_config is not None:
|
|
self.ip4_config.unexport()
|
|
self.ip4_config = None
|
|
self._dbus_property_set(
|
|
IFACE_DEVICE, PRP_DEVICE_IP6_CONFIG, ExportedObj.to_path(None)
|
|
)
|
|
if self.ip6_config is not None:
|
|
self.ip6_config.unexport()
|
|
self.ip6_config = None
|
|
self._dbus_property_set(
|
|
IFACE_DEVICE, PRP_DEVICE_DHCP4_CONFIG, ExportedObj.to_path(None)
|
|
)
|
|
if self.dhcp4_config is not None:
|
|
self.dhcp4_config.unexport()
|
|
self.dhcp4_config = None
|
|
self._dbus_property_set(
|
|
IFACE_DEVICE, PRP_DEVICE_DHCP6_CONFIG, ExportedObj.to_path(None)
|
|
)
|
|
if self.dhcp6_config is not None:
|
|
self.dhcp6_config.unexport()
|
|
self.dhcp6_config = None
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_DEVICE, in_signature="", out_signature="")
|
|
def Disconnect(self):
|
|
pass
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_DEVICE, in_signature="u", out_signature="a{sa{sv}}t"
|
|
)
|
|
def GetAppliedConnection(self, flags):
|
|
ac = self._dbus_property_get(IFACE_DEVICE, PRP_DEVICE_ACTIVE_CONNECTION)
|
|
return (self.applied_con, 0)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_DEVICE, in_signature="a{sa{sv}}tu", out_signature=""
|
|
)
|
|
def Reapply(self, connection, version_id, flags):
|
|
self.applied_con = connection
|
|
pass
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_DEVICE, in_signature="", out_signature="")
|
|
def Delete(self):
|
|
# We don't currently support any software device types, so...
|
|
raise BusErr.NotSoftwareException()
|
|
pass
|
|
|
|
@dbus.service.signal(IFACE_DEVICE, signature="uuu")
|
|
def StateChanged(self, new_state, old_state, reason):
|
|
pass
|
|
|
|
def set_state(self, state, reason):
|
|
# libnm is plugged on notify::state-reason and not on state-changed dbus signal
|
|
# so we must simulate the change of property to emit a state-changed signal on libnm
|
|
self._dbus_property_set(IFACE_DEVICE, PRP_NM_STATE, dbus.UInt32(state))
|
|
self._dbus_property_set(
|
|
IFACE_DEVICE,
|
|
PRP_DEVICE_STATE_REASON,
|
|
(dbus.UInt32(state), dbus.UInt32(reason)),
|
|
)
|
|
old_state = self.prp_state
|
|
self.prp_state = state
|
|
self.StateChanged(
|
|
dbus.UInt32(self.prp_state), dbus.UInt32(old_state), dbus.UInt32(reason)
|
|
)
|
|
|
|
def set_carrier_status(self, carrier_status):
|
|
self._dbus_property_set(IFACE_WIRED, PRP_WIRED_CARRIER, carrier_status)
|
|
|
|
def set_active_connection(self, ac):
|
|
self._dbus_property_set(IFACE_DEVICE, PRP_DEVICE_ACTIVE_CONNECTION, ac)
|
|
if ac is None:
|
|
self.applied_con = {}
|
|
else:
|
|
self.applied_con = ac.con_inst.con_hash
|
|
|
|
def connection_is_available(self, con_inst):
|
|
if con_inst.is_vpn():
|
|
return False
|
|
if isinstance(self, WiredDevice):
|
|
if con_inst.get_type() == NM.SETTING_WIRED_SETTING_NAME:
|
|
return True
|
|
elif isinstance(self, WifiDevice):
|
|
if con_inst.get_type() == NM.SETTING_WIRELESS_SETTING_NAME:
|
|
return True
|
|
elif isinstance(self, VlanDevice):
|
|
if con_inst.get_type() == NM.SETTING_VLAN_SETTING_NAME:
|
|
return True
|
|
return False
|
|
|
|
def available_connections_get(self):
|
|
return [
|
|
c for c in gl.settings.get_connections() if self.connection_is_available(c)
|
|
]
|
|
|
|
def available_connections_update(self):
|
|
self._dbus_property_set(
|
|
IFACE_DEVICE,
|
|
PRP_DEVICE_AVAILABLE_CONNECTIONS,
|
|
ExportedObj.to_path_array(self.available_connections_get()),
|
|
)
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="", out_signature="")
|
|
def Start(self):
|
|
self.start()
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="", out_signature="")
|
|
def Stop(self):
|
|
self.stop()
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_WIRED_HW_ADDRESS = "HwAddress"
|
|
PRP_WIRED_PERM_HW_ADDRESS = "PermHwAddress"
|
|
PRP_WIRED_SPEED = "Speed"
|
|
PRP_WIRED_CARRIER = "Carrier"
|
|
PRP_WIRED_S390_SUBCHANNELS = "S390Subchannels"
|
|
|
|
|
|
class WiredDevice(Device):
|
|
def __init__(self, iface, mac=None, subchannels=None, ident=None):
|
|
Device.__init__(self, iface, NM.DeviceType.ETHERNET, ident)
|
|
|
|
if mac is None:
|
|
mac = Util.random_mac(self.ident)
|
|
if subchannels is None:
|
|
subchannels = dbus.Array(signature="s")
|
|
|
|
props = {
|
|
PRP_WIRED_HW_ADDRESS: mac,
|
|
PRP_WIRED_PERM_HW_ADDRESS: mac,
|
|
PRP_WIRED_SPEED: dbus.UInt32(100),
|
|
PRP_WIRED_CARRIER: True,
|
|
PRP_WIRED_S390_SUBCHANNELS: subchannels,
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_WIRED, props)
|
|
|
|
|
|
###############################################################################
|
|
PM_CURRENT_CAPABILITIES = "CurrentCapabilities"
|
|
PM_MODEM_CAPABILITIES = "ModemCapabilities"
|
|
|
|
# capability to make device seen compatible with GSM connection
|
|
NM_DEVICE_MODEM_CAPABILITY_GSM_UMTS = 0x00000004
|
|
|
|
|
|
class ModemDevice(Device):
|
|
def __init__(self, iface):
|
|
Device.__init__(self, iface, NM.DeviceType.MODEM)
|
|
|
|
props = {
|
|
PM_CURRENT_CAPABILITIES: dbus.UInt32(NM_DEVICE_MODEM_CAPABILITY_GSM_UMTS),
|
|
PM_MODEM_CAPABILITIES: dbus.UInt32(0),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_MODEM, props)
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_VLAN_HW_ADDRESS = "HwAddress"
|
|
PRP_VLAN_CARRIER = "Carrier"
|
|
PRP_VLAN_VLAN_ID = "VlanId"
|
|
|
|
|
|
class VlanDevice(Device):
|
|
def __init__(self, iface, ident=None):
|
|
Device.__init__(self, iface, NM.DeviceType.VLAN, ident)
|
|
|
|
props = {
|
|
PRP_VLAN_HW_ADDRESS: Util.random_mac(self.ident),
|
|
PRP_VLAN_CARRIER: False,
|
|
PRP_VLAN_VLAN_ID: dbus.UInt32(1),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_VLAN, props)
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_WIFI_AP_FLAGS = "Flags"
|
|
PRP_WIFI_AP_WPA_FLAGS = "WpaFlags"
|
|
PRP_WIFI_AP_RSN_FLAGS = "RsnFlags"
|
|
PRP_WIFI_AP_SSID = "Ssid"
|
|
PRP_WIFI_AP_FREQUENCY = "Frequency"
|
|
PRP_WIFI_AP_HW_ADDRESS = "HwAddress"
|
|
PRP_WIFI_AP_MODE = "Mode"
|
|
PRP_WIFI_AP_MAX_BITRATE = "MaxBitrate"
|
|
PRP_WIFI_AP_STRENGTH = "Strength"
|
|
PRP_WIFI_AP_LAST_SEEN = "LastSeen"
|
|
PRP_WIFI_AP_BANDWIDTH = "Bandwidth"
|
|
|
|
|
|
class WifiAp(ExportedObj):
|
|
path_counter_next = 1
|
|
path_prefix = "/org/freedesktop/NetworkManager/AccessPoint/"
|
|
|
|
def __init__(
|
|
self,
|
|
ssid,
|
|
bssid=None,
|
|
flags=None,
|
|
wpaf=None,
|
|
rsnf=None,
|
|
freq=None,
|
|
strength=None,
|
|
ident=None,
|
|
):
|
|
ExportedObj.__init__(self, ExportedObj.create_path(WifiAp), ident)
|
|
|
|
NM_AP_FLAGS = getattr(NM, "80211ApSecurityFlags")
|
|
if flags is None:
|
|
flags = 0x1
|
|
if wpaf is None:
|
|
wpaf = 0x0
|
|
wpaf = wpaf | NM_AP_FLAGS.PAIR_TKIP
|
|
wpaf = wpaf | NM_AP_FLAGS.PAIR_CCMP
|
|
wpaf = wpaf | NM_AP_FLAGS.GROUP_TKIP
|
|
wpaf = wpaf | NM_AP_FLAGS.GROUP_CCMP
|
|
wpaf = wpaf | NM_AP_FLAGS.KEY_MGMT_PSK
|
|
if rsnf is None:
|
|
rsnf = 0x0
|
|
rsnf = rsnf | NM_AP_FLAGS.PAIR_TKIP
|
|
rsnf = rsnf | NM_AP_FLAGS.PAIR_CCMP
|
|
rsnf = rsnf | NM_AP_FLAGS.GROUP_TKIP
|
|
rsnf = rsnf | NM_AP_FLAGS.GROUP_CCMP
|
|
rsnf = rsnf | NM_AP_FLAGS.KEY_MGMT_PSK
|
|
if freq is None:
|
|
freq = 2412
|
|
if bssid is None:
|
|
bssid = Util.random_mac(self.path)
|
|
if strength is None:
|
|
strength = Util.random_int(self.path, 100)
|
|
|
|
self.ssid = ssid
|
|
|
|
props = {
|
|
PRP_WIFI_AP_FLAGS: dbus.UInt32(flags),
|
|
PRP_WIFI_AP_WPA_FLAGS: dbus.UInt32(wpaf),
|
|
PRP_WIFI_AP_RSN_FLAGS: dbus.UInt32(rsnf),
|
|
PRP_WIFI_AP_SSID: dbus.ByteArray(self.ssid.encode("utf-8")),
|
|
PRP_WIFI_AP_FREQUENCY: dbus.UInt32(freq),
|
|
PRP_WIFI_AP_HW_ADDRESS: bssid,
|
|
PRP_WIFI_AP_MODE: dbus.UInt32(getattr(NM, "80211Mode").INFRA),
|
|
PRP_WIFI_AP_MAX_BITRATE: dbus.UInt32(54000),
|
|
PRP_WIFI_AP_STRENGTH: dbus.Byte(strength),
|
|
PRP_WIFI_AP_LAST_SEEN: dbus.Int32(NM.utils_get_timestamp_msec() / 1000),
|
|
PRP_WIFI_AP_BANDWIDTH: dbus.UInt32(40),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_WIFI_AP, props)
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_WIFI_HW_ADDRESS = "HwAddress"
|
|
PRP_WIFI_PERM_HW_ADDRESS = "PermHwAddress"
|
|
PRP_WIFI_MODE = "Mode"
|
|
PRP_WIFI_BITRATE = "Bitrate"
|
|
PRP_WIFI_ACCESS_POINTS = "AccessPoints"
|
|
PRP_WIFI_ACTIVE_ACCESS_POINT = "ActiveAccessPoint"
|
|
PRP_WIFI_WIRELESS_CAPABILITIES = "WirelessCapabilities"
|
|
PRP_WIFI_LAST_SCAN = "LastScan"
|
|
|
|
|
|
class WifiDevice(Device):
|
|
def __init__(self, iface, mac=None, ident=None):
|
|
Device.__init__(self, iface, NM.DeviceType.WIFI, ident)
|
|
|
|
if mac is None:
|
|
mac = Util.random_mac(self.ident)
|
|
|
|
self.aps = []
|
|
self.scan_cb_id = None
|
|
|
|
# Use a randomly older timestamp to trigger RequestScan() from the client
|
|
ts = max(
|
|
0, NM.utils_get_timestamp_msec() - Util.random_int(self.path, 20000, 40000)
|
|
)
|
|
|
|
props = {
|
|
PRP_WIFI_HW_ADDRESS: mac,
|
|
PRP_WIFI_PERM_HW_ADDRESS: mac,
|
|
PRP_WIFI_MODE: dbus.UInt32(getattr(NM, "80211Mode").INFRA),
|
|
PRP_WIFI_BITRATE: dbus.UInt32(21000),
|
|
PRP_WIFI_WIRELESS_CAPABILITIES: dbus.UInt32(0xFF),
|
|
PRP_WIFI_ACCESS_POINTS: ExportedObj.to_path_array(self.aps),
|
|
PRP_WIFI_ACTIVE_ACCESS_POINT: ExportedObj.to_path(None),
|
|
PRP_WIFI_LAST_SCAN: dbus.Int64(ts),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_WIFI, props)
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_WIFI, in_signature="", out_signature="ao")
|
|
def GetAccessPoints(self):
|
|
# only include non-hidden APs
|
|
return ExportedObj.to_path_array([a for a in self.aps if a.ssid])
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_WIFI, in_signature="", out_signature="ao")
|
|
def GetAllAccessPoints(self):
|
|
# include all APs including hidden ones
|
|
return ExportedObj.to_path_array(self.aps)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_WIFI, in_signature="a{sv}", out_signature=""
|
|
)
|
|
def RequestScan(self, props):
|
|
self.scan_cb_id = Util.g_source_remove(self.scan_cb_id)
|
|
|
|
def cb():
|
|
ts = NM.utils_get_timestamp_msec()
|
|
for ap in self.aps:
|
|
ap._dbus_property_set(
|
|
IFACE_WIFI_AP, PRP_WIFI_AP_LAST_SEEN, dbus.Int32(ts / 1000)
|
|
)
|
|
self._dbus_property_set(IFACE_WIFI, PRP_WIFI_LAST_SCAN, dbus.Int64(ts))
|
|
self.scan_cb_id = None
|
|
return False
|
|
|
|
self.scan_cb_id = GLib.idle_add(cb)
|
|
pass
|
|
|
|
@dbus.service.signal(IFACE_WIFI, signature="o")
|
|
def AccessPointAdded(self, ap_path):
|
|
pass
|
|
|
|
def add_ap(self, ap):
|
|
ap.export()
|
|
self.aps.append(ap)
|
|
self._dbus_property_set(
|
|
IFACE_WIFI, PRP_WIFI_ACCESS_POINTS, ExportedObj.to_path_array(self.aps)
|
|
)
|
|
self.AccessPointAdded(ExportedObj.to_path(ap))
|
|
return ap
|
|
|
|
def remove_ap(self, ap):
|
|
self.aps.remove(ap)
|
|
self._dbus_property_set(
|
|
IFACE_WIFI, PRP_WIFI_ACCESS_POINTS, ExportedObj.to_path_array(self.aps)
|
|
)
|
|
self.AccessPointRemoved(ExportedObj.to_path(ap))
|
|
ap.unexport()
|
|
|
|
def stop(self):
|
|
self.scan_cb_id = Util.g_source_remove(self.scan_cb_id)
|
|
super(WifiDevice, self).stop()
|
|
|
|
@dbus.service.signal(IFACE_WIFI, signature="o")
|
|
def AccessPointRemoved(self, ap_path):
|
|
pass
|
|
|
|
def remove_ap_by_path(self, path):
|
|
for ap in self.aps:
|
|
if ap.path == path:
|
|
self.remove_ap(ap)
|
|
return
|
|
raise BusErr.ApNotFoundException("AP %s not found" % path)
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_ACTIVE_CONNECTION_CONNECTION = "Connection"
|
|
PRP_ACTIVE_CONNECTION_SPECIFIC_OBJECT = "SpecificObject"
|
|
PRP_ACTIVE_CONNECTION_ID = "Id"
|
|
PRP_ACTIVE_CONNECTION_UUID = "Uuid"
|
|
PRP_ACTIVE_CONNECTION_TYPE = "Type"
|
|
PRP_ACTIVE_CONNECTION_DEVICES = "Devices"
|
|
PRP_ACTIVE_CONNECTION_STATE = "State"
|
|
PRP_ACTIVE_CONNECTION_DEFAULT = "Default"
|
|
PRP_ACTIVE_CONNECTION_IP4CONFIG = "Ip4Config"
|
|
PRP_ACTIVE_CONNECTION_DHCP4CONFIG = "Dhcp4Config"
|
|
PRP_ACTIVE_CONNECTION_DEFAULT6 = "Default6"
|
|
PRP_ACTIVE_CONNECTION_IP6CONFIG = "Ip6Config"
|
|
PRP_ACTIVE_CONNECTION_DHCP6CONFIG = "Dhcp6Config"
|
|
PRP_ACTIVE_CONNECTION_VPN = "Vpn"
|
|
PRP_ACTIVE_CONNECTION_MASTER = "Master"
|
|
|
|
PRP_VPN_CONNECTION_VPN_STATE = "VpnState"
|
|
PRP_VPN_CONNECTION_BANNER = "Banner"
|
|
|
|
|
|
class ActiveConnection(ExportedObj):
|
|
path_counter_next = 1
|
|
path_prefix = "/org/freedesktop/NetworkManager/ActiveConnection/"
|
|
|
|
def __init__(self, device, con_inst, specific_object):
|
|
ExportedObj.__init__(self, ExportedObj.create_path(ActiveConnection))
|
|
|
|
self.device = device
|
|
self.con_inst = con_inst
|
|
self.is_vpn = con_inst.is_vpn()
|
|
|
|
self.activation_state_change_delay_ms = device.activation_state_change_delay_ms
|
|
self._activation_id = None
|
|
self._deactivation_id = None
|
|
|
|
s_con = con_inst.con_hash[NM.SETTING_CONNECTION_SETTING_NAME]
|
|
|
|
props = {
|
|
PRP_ACTIVE_CONNECTION_CONNECTION: ExportedObj.to_path(con_inst),
|
|
PRP_ACTIVE_CONNECTION_SPECIFIC_OBJECT: ExportedObj.to_path(specific_object),
|
|
PRP_ACTIVE_CONNECTION_ID: s_con[NM.SETTING_CONNECTION_ID],
|
|
PRP_ACTIVE_CONNECTION_UUID: s_con[NM.SETTING_CONNECTION_UUID],
|
|
PRP_ACTIVE_CONNECTION_TYPE: s_con[NM.SETTING_CONNECTION_TYPE],
|
|
PRP_ACTIVE_CONNECTION_DEVICES: ExportedObj.to_path_array([self.device]),
|
|
PRP_ACTIVE_CONNECTION_STATE: dbus.UInt32(NM.ActiveConnectionState.UNKNOWN),
|
|
PRP_ACTIVE_CONNECTION_DEFAULT: False,
|
|
PRP_ACTIVE_CONNECTION_IP4CONFIG: ExportedObj.to_path(None),
|
|
PRP_ACTIVE_CONNECTION_DHCP4CONFIG: ExportedObj.to_path(None),
|
|
PRP_ACTIVE_CONNECTION_DEFAULT6: False,
|
|
PRP_ACTIVE_CONNECTION_IP6CONFIG: ExportedObj.to_path(None),
|
|
PRP_ACTIVE_CONNECTION_DHCP6CONFIG: ExportedObj.to_path(None),
|
|
PRP_ACTIVE_CONNECTION_VPN: self.is_vpn,
|
|
PRP_ACTIVE_CONNECTION_MASTER: ExportedObj.to_path(None),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_ACTIVE_CONNECTION, props)
|
|
|
|
if self.is_vpn:
|
|
props = {
|
|
PRP_VPN_CONNECTION_VPN_STATE: dbus.UInt32(
|
|
NM.VpnConnectionState.UNKNOWN
|
|
),
|
|
PRP_VPN_CONNECTION_BANNER: "*** VPN connection %s ***"
|
|
% (con_inst.get_id()),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_VPN_CONNECTION, props)
|
|
|
|
def _set_state(self, state, reason):
|
|
state = dbus.UInt32(state)
|
|
self._dbus_property_set(
|
|
IFACE_ACTIVE_CONNECTION, PRP_ACTIVE_CONNECTION_STATE, state
|
|
)
|
|
self.StateChanged(state, dbus.UInt32(reason))
|
|
|
|
def activation_cancel(self):
|
|
self._activation_id = Util.g_source_remove(self._activation_id)
|
|
|
|
def _activation_step2(self):
|
|
assert self._activation_id is not None
|
|
self._activation_id = None
|
|
|
|
s_con = self.con_inst.con_hash[NM.SETTING_CONNECTION_SETTING_NAME]
|
|
conn_id = s_con[NM.SETTING_CONNECTION_ID]
|
|
|
|
if gl.force_activation_failure.get(conn_id, False):
|
|
self._set_state(
|
|
NM.ActiveConnectionState.DEACTIVATED,
|
|
NM.ActiveConnectionStateReason.UNKNOWN,
|
|
)
|
|
self.device.set_state(NM.DeviceState.FAILED, NM.DeviceStateReason.UNKNOWN)
|
|
else:
|
|
self._set_state(
|
|
NM.ActiveConnectionState.ACTIVATED,
|
|
NM.ActiveConnectionStateReason.UNKNOWN,
|
|
)
|
|
self.device.set_state(NM.DeviceState.ACTIVATED, NM.DeviceStateReason.NONE)
|
|
return False
|
|
|
|
def _activation_step1(self):
|
|
assert self._activation_id is not None
|
|
self._activation_id = GLib.timeout_add(
|
|
self.activation_state_change_delay_ms, self._activation_step2
|
|
)
|
|
self.device.set_active_connection(self)
|
|
self.device.set_state(NM.DeviceState.PREPARE, NM.DeviceStateReason.NONE)
|
|
self._set_state(
|
|
NM.ActiveConnectionState.ACTIVATING, NM.ActiveConnectionStateReason.UNKNOWN
|
|
)
|
|
return False
|
|
|
|
def _deactivation_step1(self):
|
|
assert self._deactivation_id is not None
|
|
self._deactivation_id = None
|
|
self.device.set_state(
|
|
NM.DeviceState.DISCONNECTED, NM.DeviceStateReason.USER_REQUESTED
|
|
)
|
|
self._set_state(
|
|
NM.ActiveConnectionState.DEACTIVATED,
|
|
NM.ActiveConnectionStateReason.USER_DISCONNECTED,
|
|
)
|
|
|
|
return False
|
|
|
|
def set_state(self, state, reason):
|
|
self._set_state(state, reason)
|
|
|
|
def start_activation(self):
|
|
assert self._activation_id is None
|
|
if self.activation_state_change_delay_ms == 0:
|
|
self.device.set_active_connection(self)
|
|
self._set_state(
|
|
NM.ActiveConnectionState.ACTIVATED,
|
|
NM.ActiveConnectionStateReason.UNKNOWN,
|
|
)
|
|
self.device.set_state(NM.DeviceState.ACTIVATED, NM.DeviceStateReason.NONE)
|
|
else:
|
|
self._activation_id = GLib.timeout_add(
|
|
self.activation_state_change_delay_ms, self._activation_step1
|
|
)
|
|
|
|
def start_deactivation(self):
|
|
assert self._deactivation_id is None
|
|
self.device.set_state(
|
|
NM.DeviceState.DEACTIVATING, NM.DeviceStateReason.USER_REQUESTED
|
|
)
|
|
self._set_state(
|
|
NM.ActiveConnectionState.DEACTIVATING,
|
|
NM.ActiveConnectionStateReason.USER_DISCONNECTED,
|
|
)
|
|
self._deactivation_id = GLib.timeout_add(50, self._deactivation_step1)
|
|
|
|
@dbus.service.signal(IFACE_ACTIVE_CONNECTION, signature="uu")
|
|
def StateChanged(self, state, reason):
|
|
pass
|
|
|
|
@dbus.service.signal(IFACE_VPN_CONNECTION, signature="uu")
|
|
def VpnStateChanged(self, state, reason):
|
|
pass
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_NM_DEVICES = "Devices"
|
|
PRP_NM_ALL_DEVICES = "AllDevices"
|
|
PRP_NM_NETWORKING_ENABLED = "NetworkingEnabled"
|
|
PRP_NM_WWAN_ENABLED = "WwanEnabled"
|
|
PRP_NM_WWAN_HARDWARE_ENABLED = "WwanHardwareEnabled"
|
|
PRP_NM_WIRELESS_ENABLED = "WirelessEnabled"
|
|
PRP_NM_WIRELESS_HARDWARE_ENABLED = "WirelessHardwareEnabled"
|
|
PRP_NM_WIMAX_ENABLED = "WimaxEnabled"
|
|
PRP_NM_WIMAX_HARDWARE_ENABLED = "WimaxHardwareEnabled"
|
|
PRP_NM_ACTIVE_CONNECTIONS = "ActiveConnections"
|
|
PRP_NM_PRIMARY_CONNECTION = "PrimaryConnection"
|
|
PRP_NM_ACTIVATING_CONNECTION = "ActivatingConnection"
|
|
PRP_NM_STARTUP = "Startup"
|
|
PRP_NM_STATE = "State"
|
|
PRP_NM_VERSION = "Version"
|
|
PRP_NM_CONNECTIVITY = "Connectivity"
|
|
|
|
|
|
class NetworkManager(ExportedObj):
|
|
def __init__(self):
|
|
ExportedObj.__init__(self, "/org/freedesktop/NetworkManager")
|
|
self.devices = []
|
|
self.active_connections = []
|
|
|
|
props = {
|
|
PRP_NM_DEVICES: ExportedObj.to_path_array(self.devices),
|
|
PRP_NM_ALL_DEVICES: ExportedObj.to_path_array(self.devices),
|
|
PRP_NM_NETWORKING_ENABLED: True,
|
|
PRP_NM_WWAN_ENABLED: True,
|
|
PRP_NM_WWAN_HARDWARE_ENABLED: True,
|
|
PRP_NM_WIRELESS_ENABLED: True,
|
|
PRP_NM_WIRELESS_HARDWARE_ENABLED: True,
|
|
PRP_NM_WIMAX_ENABLED: True,
|
|
PRP_NM_WIMAX_HARDWARE_ENABLED: True,
|
|
PRP_NM_ACTIVE_CONNECTIONS: ExportedObj.to_path_array(
|
|
self.active_connections
|
|
),
|
|
PRP_NM_PRIMARY_CONNECTION: ExportedObj.to_path(None),
|
|
PRP_NM_ACTIVATING_CONNECTION: ExportedObj.to_path(None),
|
|
PRP_NM_STARTUP: False,
|
|
PRP_NM_STATE: dbus.UInt32(NM.State.DISCONNECTED),
|
|
PRP_NM_VERSION: os.environ.get(
|
|
"NM_TEST_NETWORKMANAGER_SERVICE_VERSION", "0.9.9.0"
|
|
),
|
|
PRP_NM_CONNECTIVITY: dbus.UInt32(NM.ConnectivityState.NONE),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_NM, props)
|
|
self.export()
|
|
|
|
@dbus.service.signal(IFACE_NM, signature="u")
|
|
def StateChanged(self, new_state):
|
|
pass
|
|
|
|
def set_state(self, new_state):
|
|
self._dbus_property_set(IFACE_NM, PRP_NM_STATE, new_state)
|
|
self.StateChanged(dbus.UInt32(new_state))
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_NM, in_signature="", out_signature="ao")
|
|
def GetDevices(self):
|
|
return ExportedObj.to_path_array(self.devices)
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_NM, in_signature="", out_signature="ao")
|
|
def GetAllDevices(self):
|
|
return ExportedObj.to_path_array(self.devices)
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_NM, in_signature="s", out_signature="o")
|
|
def GetDeviceByIpIface(self, ip_iface):
|
|
d = self.find_device_first(
|
|
ip_iface=ip_iface, require=BusErr.UnknownDeviceException
|
|
)
|
|
return ExportedObj.to_path(d)
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_NM, in_signature="ooo", out_signature="o")
|
|
def ActivateConnection(self, conpath, devpath, specific_object):
|
|
try:
|
|
con_inst = gl.settings.get_connection(conpath)
|
|
except Exception as e:
|
|
raise BusErr.UnknownConnectionException("Connection not found")
|
|
|
|
con_hash = con_inst.con_hash
|
|
con_type = NmUtil.con_hash_get_type(con_hash)
|
|
|
|
device = self.find_device_first(path=devpath)
|
|
if not device:
|
|
if con_type == NM.SETTING_WIRED_SETTING_NAME:
|
|
device = self.find_device_first(dev_type=WiredDevice)
|
|
elif con_type == NM.SETTING_WIRELESS_SETTING_NAME:
|
|
device = self.find_device_first(dev_type=WifiDevice)
|
|
elif con_type == NM.SETTING_VLAN_SETTING_NAME:
|
|
ifname = con_hash[NM.SETTING_CONNECTION_SETTING_NAME]["interface-name"]
|
|
device = VlanDevice(ifname)
|
|
self.add_device(device)
|
|
elif con_type == NM.SETTING_VPN_SETTING_NAME:
|
|
for ac in self.active_connections:
|
|
if ac.is_vpn:
|
|
continue
|
|
if ac.device:
|
|
device = ac.device
|
|
break
|
|
|
|
if not device:
|
|
raise BusErr.UnknownDeviceException(
|
|
"No device found for the requested iface."
|
|
)
|
|
|
|
# See if we need secrets. For the moment, we only support WPA
|
|
if "802-11-wireless-security" in con_hash:
|
|
s_wsec = con_hash["802-11-wireless-security"]
|
|
if s_wsec["key-mgmt"] == "wpa-psk" and "psk" not in s_wsec:
|
|
secrets = gl.agent_manager.get_secrets(
|
|
con_hash, conpath, "802-11-wireless-security"
|
|
)
|
|
if secrets is None:
|
|
raise BusErr.NoSecretsException("No secret agent available")
|
|
if "802-11-wireless-security" not in secrets:
|
|
raise BusErr.NoSecretsException("No secrets provided")
|
|
s_wsec = secrets["802-11-wireless-security"]
|
|
if "psk" not in s_wsec:
|
|
raise BusErr.NoSecretsException("No secrets provided")
|
|
|
|
ac = ActiveConnection(device, con_inst, None)
|
|
self.active_connection_add(ac)
|
|
|
|
gl.manager.devices_available_connections_update()
|
|
|
|
return ExportedObj.to_path(ac)
|
|
|
|
def active_connection_add(self, ac):
|
|
ac.export()
|
|
self.active_connections.append(ac)
|
|
self._dbus_property_set(
|
|
IFACE_NM,
|
|
PRP_NM_ACTIVE_CONNECTIONS,
|
|
ExportedObj.to_path_array(self.active_connections),
|
|
)
|
|
ac.start_activation()
|
|
|
|
def active_connection_remove(self, ac):
|
|
ac.activation_cancel()
|
|
self.active_connections.remove(ac)
|
|
self._dbus_property_set(
|
|
IFACE_NM,
|
|
PRP_NM_ACTIVE_CONNECTIONS,
|
|
ExportedObj.to_path_array(self.active_connections),
|
|
)
|
|
ac.unexport()
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_NM, in_signature="a{sa{sv}}oo", out_signature="oo"
|
|
)
|
|
def AddAndActivateConnection(self, con_hash, devpath, specific_object):
|
|
conpath, acpath, result = self.AddAndActivateConnection2(
|
|
con_hash, devpath, specific_object, dict()
|
|
)
|
|
return (conpath, acpath)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_NM,
|
|
in_signature="a{sa{sv}}ooa{sv}",
|
|
out_signature="ooa{sv}",
|
|
)
|
|
def AddAndActivateConnection2(self, con_hash, devpath, specific_object, options):
|
|
device = self.find_device_first(
|
|
path=devpath, require=BusErr.UnknownDeviceException
|
|
)
|
|
conpath = gl.settings.AddConnection(con_hash)
|
|
return (conpath, self.ActivateConnection(conpath, devpath, specific_object), [])
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_NM, in_signature="o", out_signature="")
|
|
def DeactivateConnection(self, active_connection):
|
|
# Look for an active connection with the same object path
|
|
for ac in self.active_connections:
|
|
if ac.path == str(active_connection):
|
|
ac.activation_cancel()
|
|
ac.start_deactivation()
|
|
return
|
|
|
|
raise BusErr.UnknownConnectionException(
|
|
"Connection not found: %s" % str(active_connection)
|
|
)
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_NM, in_signature="b", out_signature="")
|
|
def Sleep(self, do_sleep):
|
|
if do_sleep:
|
|
state = NM.State.ASLEEP
|
|
else:
|
|
state = NM.State.DISCONNECTED
|
|
self.set_state(state)
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_NM, in_signature="b", out_signature="")
|
|
def Enable(self, do_enable):
|
|
pass
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_NM, in_signature="", out_signature="a{ss}"
|
|
)
|
|
def GetPermissions(self):
|
|
return {
|
|
"org.freedesktop.NetworkManager.enable-disable-network": "yes",
|
|
"org.freedesktop.NetworkManager.sleep-wake": "no",
|
|
"org.freedesktop.NetworkManager.enable-disable-wifi": "yes",
|
|
"org.freedesktop.NetworkManager.enable-disable-wwan": "yes",
|
|
"org.freedesktop.NetworkManager.enable-disable-wimax": "yes",
|
|
"org.freedesktop.NetworkManager.network-control": "yes",
|
|
"org.freedesktop.NetworkManager.wifi.share.protected": "yes",
|
|
"org.freedesktop.NetworkManager.wifi.share.open": "yes",
|
|
"org.freedesktop.NetworkManager.settings.modify.own": "yes",
|
|
"org.freedesktop.NetworkManager.settings.modify.system": "yes",
|
|
"org.freedesktop.NetworkManager.settings.modify.hostname": "yes",
|
|
"org.freedesktop.NetworkManager.settings.modify.global-dns": "no",
|
|
"org.freedesktop.NetworkManager.reload": "no",
|
|
}
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_NM, in_signature="ss", out_signature="")
|
|
def SetLogging(self, level, domains):
|
|
pass
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_NM, in_signature="", out_signature="ss")
|
|
def GetLogging(self):
|
|
return ("info", "HW,RFKILL,CORE,DEVICE,WIFI,ETHER")
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_NM, in_signature="", out_signature="u")
|
|
def CheckConnectivity(self):
|
|
raise BusErr.PermissionDeniedException("You fail")
|
|
|
|
@dbus.service.signal(IFACE_NM, signature="o")
|
|
def DeviceAdded(self, devpath):
|
|
pass
|
|
|
|
def find_devices(
|
|
self,
|
|
ident=_DEFAULT_ARG,
|
|
path=_DEFAULT_ARG,
|
|
iface=_DEFAULT_ARG,
|
|
ip_iface=_DEFAULT_ARG,
|
|
dev_type=_DEFAULT_ARG,
|
|
):
|
|
r = None
|
|
for d in self.devices:
|
|
if ident is not _DEFAULT_ARG:
|
|
if d.ident != ident:
|
|
continue
|
|
if path is not _DEFAULT_ARG:
|
|
if d.path != path:
|
|
continue
|
|
if iface is not _DEFAULT_ARG:
|
|
if d.iface != iface:
|
|
continue
|
|
if ip_iface is not _DEFAULT_ARG:
|
|
# ignore iface/ip_iface distinction for now
|
|
if d.iface != ip_iface:
|
|
continue
|
|
if dev_type is not _DEFAULT_ARG:
|
|
if not isinstance(d, dev_type):
|
|
continue
|
|
yield d
|
|
|
|
def find_device_first(
|
|
self,
|
|
ident=_DEFAULT_ARG,
|
|
path=_DEFAULT_ARG,
|
|
iface=_DEFAULT_ARG,
|
|
ip_iface=_DEFAULT_ARG,
|
|
dev_type=_DEFAULT_ARG,
|
|
require=None,
|
|
):
|
|
r = None
|
|
for d in self.find_devices(
|
|
ident=ident, path=path, iface=iface, ip_iface=ip_iface, dev_type=dev_type
|
|
):
|
|
r = d
|
|
break
|
|
if r is None and require:
|
|
if require is TestError:
|
|
raise TestError("Device not found")
|
|
raise BusErr.UnknownDeviceException("Device not found")
|
|
return r
|
|
|
|
def add_device(self, device):
|
|
if self.find_device_first(ident=device.ident, path=device.path) is not None:
|
|
raise TestError(
|
|
"Duplicate device ident=%s / path=%s" % (device.ident, device.path)
|
|
)
|
|
device.export()
|
|
self.devices.append(device)
|
|
self._dbus_property_set(
|
|
IFACE_NM, PRP_NM_DEVICES, ExportedObj.to_path_array(self.devices)
|
|
)
|
|
self._dbus_property_set(
|
|
IFACE_NM, PRP_NM_ALL_DEVICES, ExportedObj.to_path_array(self.devices)
|
|
)
|
|
self.DeviceAdded(ExportedObj.to_path(device))
|
|
device.start()
|
|
return device
|
|
|
|
def remove_device(self, device):
|
|
device.stop()
|
|
self.devices.remove(device)
|
|
self._dbus_property_set(
|
|
IFACE_NM, PRP_NM_DEVICES, ExportedObj.to_path_array(self.devices)
|
|
)
|
|
self._dbus_property_set(
|
|
IFACE_NM, PRP_NM_ALL_DEVICES, ExportedObj.to_path_array(self.devices)
|
|
)
|
|
self.DeviceRemoved(ExportedObj.to_path(device))
|
|
device.unexport()
|
|
|
|
def devices_available_connections_update(self):
|
|
for d in self.devices:
|
|
d.available_connections_update()
|
|
|
|
@dbus.service.signal(IFACE_NM, signature="o")
|
|
def DeviceRemoved(self, devpath):
|
|
pass
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="", out_signature="")
|
|
def Quit(self):
|
|
gl.mainloop.quit()
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="a{ss}", out_signature="a(sss)")
|
|
def FindConnections(self, selector_args):
|
|
return [
|
|
(c.path, c.get_uuid(), c.get_id())
|
|
for c in gl.settings.find_connections(**selector_args)
|
|
]
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="a(oa(sa(sv)))", out_signature="")
|
|
def SetProperties(self, all_args):
|
|
for i in [0, 1]:
|
|
for path, iface_args in all_args:
|
|
o = gl.object_manager.find_object(path)
|
|
if o is None:
|
|
raise TestError("Object %s does not exist" % (path))
|
|
for iface_name, args in iface_args:
|
|
for propname, value in args:
|
|
o._dbus_property_set(
|
|
iface_name,
|
|
propname,
|
|
value,
|
|
allow_detect_dbus_iface=True,
|
|
dry_run=(i == 0),
|
|
)
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="sa{sv}", out_signature="o")
|
|
def AddObj(self, class_name, args):
|
|
if class_name in ["WiredDevice", "WifiDevice"]:
|
|
py_class = globals()[class_name]
|
|
d = py_class(**args)
|
|
return ExportedObj.to_path(self.add_device(d))
|
|
elif class_name in ["WifiAp"]:
|
|
if "device" not in args:
|
|
raise TestError('missing "device" paramter')
|
|
d = self.find_device_first(ident=args["device"], require=TestError)
|
|
del args["device"]
|
|
if "ssid" not in args:
|
|
args["ssid"] = d.ident + "-ap-" + str(WifiAp.path_counter_next)
|
|
ap = WifiAp(**args)
|
|
return ExportedObj.to_path(d.add_ap(ap))
|
|
raise TestError('Invalid python type "%s"' % (class_name))
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="ssas", out_signature="o")
|
|
def AddWiredDevice(self, ifname, mac, subchannels):
|
|
dev = WiredDevice(ifname, mac, subchannels)
|
|
return ExportedObj.to_path(self.add_device(dev))
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="s", out_signature="o")
|
|
def AddModemDevice(self, ifname):
|
|
dev = ModemDevice(ifname)
|
|
return ExportedObj.to_path(self.add_device(dev))
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="s", out_signature="o")
|
|
def AddWifiDevice(self, ifname):
|
|
dev = WifiDevice(ifname)
|
|
return ExportedObj.to_path(self.add_device(dev))
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="o", out_signature="")
|
|
def RemoveDevice(self, path):
|
|
d = self.find_device_first(path=path, require=TestError)
|
|
self.remove_device(d)
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="sss", out_signature="o")
|
|
def AddWifiAp(self, ident, ssid, bssid):
|
|
d = self.find_device_first(ident=ident, require=TestError)
|
|
ap = WifiAp(ssid, bssid)
|
|
return ExportedObj.to_path(d.add_ap(ap))
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="so", out_signature="")
|
|
def RemoveWifiAp(self, ident, ap_path):
|
|
d = self.find_device_first(ident=ident, require=TestError)
|
|
d.remove_ap_by_path(ap_path)
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="", out_signature="")
|
|
def AutoRemoveNextConnection(self):
|
|
gl.settings.auto_remove_next_connection()
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_TEST, in_signature="a{sa{sv}}b", out_signature="o"
|
|
)
|
|
def AddConnection(self, con_hash, do_verify_strict):
|
|
return gl.settings.add_connection(con_hash, do_verify_strict)
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_TEST, in_signature="sb", out_signature="")
|
|
def SetActiveConnectionFailure(self, connection_id, failure):
|
|
gl.force_activation_failure[connection_id] = failure
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_TEST, in_signature="ou", out_signature="")
|
|
def SetActiveConnectionStateChangedDelay(self, devpath, delay_ms):
|
|
for d in self.devices:
|
|
if d.path == devpath:
|
|
d.activation_state_change_delay_ms = delay_ms
|
|
return
|
|
raise BusErr.UnknownDeviceException(
|
|
"Device with iface '%s' not found" % devpath
|
|
)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_TEST, in_signature="ouu", out_signature=""
|
|
)
|
|
def SetActiveConnectionState(self, devpath, state, reason):
|
|
for ac in reversed(self.active_connections):
|
|
if ac.device.path == devpath:
|
|
ac.set_state(state, reason)
|
|
return
|
|
raise BusErr.UnknownDeviceException(
|
|
"Device with iface '%s' not found" % devpath
|
|
)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_TEST, in_signature="ouu", out_signature=""
|
|
)
|
|
def SetDeviceState(self, devpath, state, reason):
|
|
try:
|
|
nmstate = NM.DeviceState(state)
|
|
except ValueError as e:
|
|
raise BusErr.InvalidPropertyException("Invalid device state: " % e)
|
|
|
|
try:
|
|
nmreason = NM.DeviceStateReason(reason)
|
|
except ValueError as e:
|
|
raise BusErr.InvalidPropertyException("Invalid device state reason: " % e)
|
|
|
|
for d in self.devices:
|
|
if d.path == devpath:
|
|
d.set_state(nmstate, nmreason)
|
|
return
|
|
raise BusErr.UnknownDeviceException(
|
|
"Device with iface '%s' not found" % devpath
|
|
)
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_TEST, in_signature="ob", out_signature="")
|
|
def SetCarrierStatus(self, devpath, status):
|
|
for d in self.devices:
|
|
if d.path == devpath:
|
|
d.set_carrier_status(status)
|
|
return
|
|
raise BusErr.UnknownDeviceException(
|
|
"Device with iface '%s' not found" % devpath
|
|
)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_TEST, in_signature="sa{sa{sv}}b", out_signature=""
|
|
)
|
|
def UpdateConnection(self, path, con_hash, do_verify_strict):
|
|
return gl.settings.update_connection(con_hash, path, do_verify_strict)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_TEST, in_signature="ba{ss}", out_signature=""
|
|
)
|
|
def ConnectionSetVisible(self, vis, selector_args):
|
|
cons = list(gl.settings.find_connections(**selector_args))
|
|
assert len(cons) == 1
|
|
cons[0].SetVisible(vis)
|
|
|
|
@dbus.service.method(dbus_interface=IFACE_TEST, in_signature="", out_signature="")
|
|
def Restart(self):
|
|
gl.bus.release_name("org.freedesktop.NetworkManager")
|
|
gl.bus.request_name("org.freedesktop.NetworkManager")
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_CONNECTION_UNSAVED = "Unsaved"
|
|
PRP_CONNECTION_FILENAME = "Filename"
|
|
|
|
|
|
class Connection(ExportedObj):
|
|
def __init__(self, path_counter, con_hash, do_verify_strict=True):
|
|
path = "/org/freedesktop/NetworkManager/Settings/Connection/%s" % (path_counter)
|
|
|
|
ExportedObj.__init__(self, path)
|
|
|
|
s_con = con_hash.get(NM.SETTING_CONNECTION_SETTING_NAME)
|
|
if s_con is None:
|
|
s_con = {}
|
|
con_hash[NM.SETTING_CONNECTION_SETTING_NAME] = s_con
|
|
if NmUtil.con_hash_get_id(con_hash) is None:
|
|
s_con[NM.SETTING_CONNECTION_ID] = "connection-%s" % (path_counter)
|
|
if NmUtil.con_hash_get_uuid(con_hash) is None:
|
|
s_con[NM.SETTING_CONNECTION_UUID] = str(
|
|
uuid.uuid3(uuid.NAMESPACE_URL, path)
|
|
)
|
|
|
|
NmUtil.con_hash_verify(con_hash, do_verify_strict=do_verify_strict)
|
|
|
|
self.path = path
|
|
self.con_hash = con_hash
|
|
self.visible = True
|
|
|
|
props = {
|
|
PRP_CONNECTION_UNSAVED: False,
|
|
PRP_CONNECTION_FILENAME: "/etc/NetworkManager/system-connections/"
|
|
+ self.get_id(),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_CONNECTION, props)
|
|
|
|
def get_id(self):
|
|
return NmUtil.con_hash_get_id(self.con_hash)
|
|
|
|
def get_uuid(self):
|
|
return NmUtil.con_hash_get_uuid(self.con_hash)
|
|
|
|
def get_type(self):
|
|
return NmUtil.con_hash_get_type(self.con_hash)
|
|
|
|
def is_vpn(self):
|
|
return self.get_type() == NM.SETTING_VPN_SETTING_NAME
|
|
|
|
def update_connection(self, con_hash, do_verify_strict):
|
|
NmUtil.con_hash_verify(con_hash, do_verify_strict=do_verify_strict)
|
|
|
|
old_uuid = self.get_uuid()
|
|
new_uuid = NmUtil.con_hash_get_uuid(con_hash)
|
|
if old_uuid != new_uuid:
|
|
raise BusErr.InvalidPropertyException(
|
|
"connection.uuid: cannot change the uuid from %s to %s"
|
|
% (old_uuid, new_uuid)
|
|
)
|
|
|
|
self.con_hash = con_hash
|
|
self.Updated()
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_CONNECTION, in_signature="", out_signature="a{sa{sv}}"
|
|
)
|
|
def GetSettings(self):
|
|
if hasattr(self, "_remove_next_connection_cb"):
|
|
self._remove_next_connection_cb()
|
|
raise BusErr.UnknownConnectionException("Connection not found")
|
|
if not self.visible:
|
|
raise BusErr.PermissionDeniedException()
|
|
return self.con_hash
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_CONNECTION, in_signature="b", out_signature=""
|
|
)
|
|
def SetVisible(self, vis):
|
|
self.visible = vis
|
|
self.Updated()
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_CONNECTION, in_signature="", out_signature=""
|
|
)
|
|
def Delete(self):
|
|
gl.settings.delete_connection(self)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_CONNECTION, in_signature="a{sa{sv}}", out_signature=""
|
|
)
|
|
def Update(self, con_hash):
|
|
self.update_connection(con_hash, True)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_CONNECTION,
|
|
in_signature="a{sa{sv}}ua{sv}",
|
|
out_signature="a{sv}",
|
|
)
|
|
def Update2(self, con_hash, flags, args):
|
|
for setting_name in ALIASED_PROPERTIES.keys():
|
|
if setting_name in con_hash:
|
|
setting = con_hash[setting_name]
|
|
for pty in ALIASED_PROPERTIES[setting_name]:
|
|
if pty.new_name in setting and pty.old_name in setting:
|
|
del setting[pty.new_name]
|
|
|
|
self.update_connection(con_hash, True)
|
|
return []
|
|
|
|
@dbus.service.signal(IFACE_CONNECTION, signature="")
|
|
def Removed(self):
|
|
pass
|
|
|
|
@dbus.service.signal(IFACE_CONNECTION, signature="")
|
|
def Updated(self):
|
|
pass
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_SETTINGS_HOSTNAME = "Hostname"
|
|
PRP_SETTINGS_CAN_MODIFY = "CanModify"
|
|
PRP_SETTINGS_CONNECTIONS = "Connections"
|
|
|
|
|
|
class Settings(ExportedObj):
|
|
def __init__(self):
|
|
ExportedObj.__init__(self, "/org/freedesktop/NetworkManager/Settings")
|
|
|
|
self.connections = {}
|
|
self.c_counter = 0
|
|
self.remove_next_connection = False
|
|
|
|
props = {
|
|
PRP_SETTINGS_HOSTNAME: "foobar.baz",
|
|
PRP_SETTINGS_CAN_MODIFY: True,
|
|
PRP_SETTINGS_CONNECTIONS: dbus.Array([], "o"),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_SETTINGS, props)
|
|
self.export()
|
|
|
|
def auto_remove_next_connection(self):
|
|
self.remove_next_connection = True
|
|
|
|
def get_connection(self, path):
|
|
return self.connections[path]
|
|
|
|
def get_connections(self, stable_order=True):
|
|
cons = list(self.connections.values())
|
|
if stable_order:
|
|
cons.sort(
|
|
key=lambda c: (Util.random_int(c.get_id()), Util.random_int(c.path))
|
|
)
|
|
return cons
|
|
|
|
def get_connection_paths(self, stable_order=True):
|
|
return [c.path for c in self.get_connections(stable_order=stable_order)]
|
|
|
|
def find_connections(self, path=None, con_id=None, con_uuid=None):
|
|
for c in self.get_connections():
|
|
if path is not None:
|
|
if c.path != path:
|
|
continue
|
|
if con_id is not None:
|
|
if c.get_id() != con_id:
|
|
continue
|
|
if con_uuid is not None:
|
|
if c.get_uuid() != con_uuid:
|
|
continue
|
|
yield c
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_SETTINGS, in_signature="", out_signature="ao"
|
|
)
|
|
def ListConnections(self):
|
|
return self.get_connection_paths()
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_SETTINGS, in_signature="a{sa{sv}}", out_signature="o"
|
|
)
|
|
def AddConnection(self, con_hash):
|
|
return self.add_connection(con_hash)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_SETTINGS, in_signature="", out_signature="b"
|
|
)
|
|
def ReloadConnections(self):
|
|
return True
|
|
|
|
def add_connection(self, con_hash, do_verify_strict=True):
|
|
self.c_counter += 1
|
|
con_inst = Connection(self.c_counter, con_hash, do_verify_strict)
|
|
|
|
uuid = con_inst.get_uuid()
|
|
if uuid in [c.get_uuid() for c in self.get_connections(stable_order=False)]:
|
|
raise BusErr.InvalidSettingException(
|
|
"cannot add duplicate connection with uuid %s" % (uuid)
|
|
)
|
|
|
|
con_inst.export()
|
|
self.connections[con_inst.path] = con_inst
|
|
self.NewConnection(con_inst.path)
|
|
self._dbus_property_set(
|
|
IFACE_SETTINGS,
|
|
PRP_SETTINGS_CONNECTIONS,
|
|
dbus.Array(self.get_connection_paths(), "o"),
|
|
)
|
|
|
|
gl.manager.devices_available_connections_update()
|
|
|
|
if self.remove_next_connection:
|
|
self.remove_next_connection = False
|
|
|
|
def cb():
|
|
if hasattr(con_inst, "_remove_next_connection_cb"):
|
|
del con_inst._remove_next_connection_cb
|
|
self.delete_connection(con_inst)
|
|
return False
|
|
|
|
# We will delete the connection right away on an idle handler. However,
|
|
# the test races with initializing the connection (calling GetSettings()).
|
|
# To avoid the race, we will check in GetSettings() whether the profile
|
|
# is about to be deleted, and delete it first.
|
|
con_inst._remove_next_connection_cb = cb
|
|
GLib.idle_add(cb)
|
|
|
|
return con_inst.path
|
|
|
|
def update_connection(self, con_hash, path=None, do_verify_strict=True):
|
|
if path not in self.connections:
|
|
raise BusErr.UnknownConnectionException("Connection not found")
|
|
self.connections[path].update_connection(con_hash, do_verify_strict)
|
|
|
|
def delete_connection(self, con_inst):
|
|
del self.connections[con_inst.path]
|
|
self._dbus_property_set(
|
|
IFACE_SETTINGS,
|
|
PRP_SETTINGS_CONNECTIONS,
|
|
dbus.Array(self.get_connection_paths(), "o"),
|
|
)
|
|
con_inst.Removed()
|
|
con_inst.unexport()
|
|
|
|
gl.manager.devices_available_connections_update()
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_SETTINGS, in_signature="s", out_signature=""
|
|
)
|
|
def SaveHostname(self, hostname):
|
|
# Arbitrary requirement to test error handling
|
|
if hostname.find(".") == -1:
|
|
raise BusErr.InvalidHostnameException()
|
|
self._dbus_property_set(IFACE_SETTINGS, PRP_SETTINGS_HOSTNAME, hostname)
|
|
|
|
@dbus.service.signal(IFACE_SETTINGS, signature="o")
|
|
def NewConnection(self, path):
|
|
pass
|
|
|
|
@dbus.service.method(IFACE_SETTINGS, in_signature="", out_signature="")
|
|
def Quit(self):
|
|
gl.mainloop.quit()
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_IP4_CONFIG_ADDRESSES = "Addresses"
|
|
PRP_IP4_CONFIG_ADDRESSDATA = "AddressData"
|
|
PRP_IP4_CONFIG_GATEWAY = "Gateway"
|
|
PRP_IP4_CONFIG_ROUTES = "Routes"
|
|
PRP_IP4_CONFIG_ROUTEDATA = "RouteData"
|
|
PRP_IP4_CONFIG_NAMESERVERS = "Nameservers"
|
|
PRP_IP4_CONFIG_DOMAINS = "Domains"
|
|
PRP_IP4_CONFIG_SEARCHES = "Searches"
|
|
PRP_IP4_CONFIG_DNSOPTIONS = "DnsOptions"
|
|
PRP_IP4_CONFIG_DNSPRIORITY = "DnsPriority"
|
|
PRP_IP4_CONFIG_WINSSERVERS = "WinsServers"
|
|
|
|
|
|
class IP4Config(ExportedObj):
|
|
path_counter_next = 1
|
|
path_prefix = "/org/freedesktop/NetworkManager/IP4Config/"
|
|
|
|
def __init__(self, generate_seed=_DEFAULT_ARG):
|
|
ExportedObj.__init__(self, ExportedObj.create_path(IP4Config))
|
|
|
|
if generate_seed is _DEFAULT_ARG:
|
|
generate_seed = self.path
|
|
|
|
props = self._props_generate(generate_seed)
|
|
self.dbus_interface_add(IFACE_IP4_CONFIG, props)
|
|
self.export()
|
|
|
|
def _props_generate(self, generate_seed):
|
|
seed = Util.RandomSeed.wrap(generate_seed)
|
|
|
|
gateway = None
|
|
if seed:
|
|
if Util.random_bool(seed):
|
|
gateway = Util.random_ip(seed, net="192.168.0.0/16")[0]
|
|
|
|
addrs = []
|
|
if seed:
|
|
for n in range(0, Util.random_int(seed, 4)):
|
|
a = {
|
|
"addr": Util.random_ip(seed, net="192.168.0.0/16")[0],
|
|
"prefix": Util.random_int(seed, 17, 32),
|
|
"gateway": gateway if n == 0 else None,
|
|
}
|
|
addrs.append(a)
|
|
|
|
routes = []
|
|
if seed:
|
|
for n in range(0, Util.random_int(seed, 4)):
|
|
a = {
|
|
"dest": Util.random_ip(seed, net="192.168.0.0/16")[0],
|
|
"prefix": Util.random_int(seed, 17, 32),
|
|
"next-hop": (
|
|
None
|
|
if (Util.random_int(seed) % 3 == 0)
|
|
else Util.random_ip(seed, net="192.168.0.0/16")[0]
|
|
),
|
|
"metric": (
|
|
-1
|
|
if (Util.random_int(seed) % 3 == 0)
|
|
else Util.random_int(seed, 0, 0xFFFFFFFF)
|
|
),
|
|
}
|
|
routes.append(a)
|
|
|
|
nameservers = []
|
|
if seed:
|
|
nameservers = list(
|
|
[
|
|
Util.random_ip(seed, net="192.168.0.0/16")[0]
|
|
for x in range(Util.random_int(seed, 4))
|
|
]
|
|
)
|
|
|
|
names_selection = [
|
|
"foo1.bar",
|
|
"foo2.bar",
|
|
"foo3.bar",
|
|
"foo4.bar",
|
|
"fo.o.bar",
|
|
"fo.x.y",
|
|
]
|
|
|
|
domains = []
|
|
if seed:
|
|
domains = Util.random_subset(seed, ["dom4." + s for s in names_selection])
|
|
|
|
searches = []
|
|
if seed:
|
|
domains = Util.random_subset(seed, ["sear4." + s for s in names_selection])
|
|
|
|
dnsoptions = []
|
|
if seed:
|
|
dnsoptions = Util.random_subset(
|
|
seed, ["dns4-opt1", "dns4-opt2", "dns4-opt3", "dns4-opt4"]
|
|
)
|
|
|
|
dnspriority = 0
|
|
if seed:
|
|
dnspriority = Util.random_int(seed, -10000, 10000)
|
|
|
|
winsservers = []
|
|
if seed:
|
|
winsservers = list(
|
|
[
|
|
Util.random_ip(seed, net="192.168.0.0/16")[0]
|
|
for x in range(Util.random_int(seed, 4))
|
|
]
|
|
)
|
|
|
|
return {
|
|
PRP_IP4_CONFIG_ADDRESSES: dbus.Array(
|
|
[
|
|
[
|
|
Util.ip4_addr_be32(a["addr"]),
|
|
a["prefix"],
|
|
Util.ip4_addr_be32(a["gateway"]) if a["gateway"] else 0,
|
|
]
|
|
for a in addrs
|
|
],
|
|
"au",
|
|
),
|
|
PRP_IP4_CONFIG_ADDRESSDATA: dbus.Array(
|
|
[
|
|
dbus.Dictionary(
|
|
collections.OrderedDict(
|
|
[
|
|
("address", dbus.String(a["addr"])),
|
|
("prefix", dbus.UInt32(a["prefix"])),
|
|
]
|
|
+ (
|
|
[("gateway", dbus.String(a["gateway"]))]
|
|
if a["gateway"]
|
|
else []
|
|
)
|
|
),
|
|
"sv",
|
|
)
|
|
for a in addrs
|
|
],
|
|
"a{sv}",
|
|
),
|
|
PRP_IP4_CONFIG_GATEWAY: dbus.String(gateway) if gateway else "",
|
|
PRP_IP4_CONFIG_ROUTES: dbus.Array(
|
|
[
|
|
[
|
|
Util.ip4_addr_be32(a["dest"]),
|
|
a["prefix"],
|
|
Util.ip4_addr_be32(a["next-hop"] or "0.0.0.0"),
|
|
max(a["metric"], 0),
|
|
]
|
|
for a in routes
|
|
],
|
|
"au",
|
|
),
|
|
PRP_IP4_CONFIG_ROUTEDATA: dbus.Array(
|
|
[
|
|
dbus.Dictionary(
|
|
collections.OrderedDict(
|
|
[
|
|
("dest", dbus.String(a["dest"])),
|
|
("prefix", dbus.UInt32(a["prefix"])),
|
|
]
|
|
+ (
|
|
[("next-hop", dbus.String(a["next-hop"]))]
|
|
if a["next-hop"]
|
|
else []
|
|
)
|
|
+ (
|
|
[("metric", dbus.UInt32(a["metric"]))]
|
|
if a["metric"] != -1
|
|
else []
|
|
)
|
|
),
|
|
"sv",
|
|
)
|
|
for a in routes
|
|
],
|
|
"a{sv}",
|
|
),
|
|
PRP_IP4_CONFIG_NAMESERVERS: dbus.Array(
|
|
[dbus.UInt32(Util.ip4_addr_be32(n)) for n in nameservers], "u"
|
|
),
|
|
PRP_IP4_CONFIG_DOMAINS: dbus.Array(domains, "s"),
|
|
PRP_IP4_CONFIG_SEARCHES: dbus.Array(searches, "s"),
|
|
PRP_IP4_CONFIG_DNSOPTIONS: dbus.Array(dnsoptions, "s"),
|
|
PRP_IP4_CONFIG_DNSPRIORITY: dbus.Int32(dnspriority),
|
|
PRP_IP4_CONFIG_WINSSERVERS: dbus.Array(
|
|
[dbus.UInt32(Util.ip4_addr_be32(n)) for n in winsservers], "u"
|
|
),
|
|
}
|
|
|
|
def props_regenerate(self, generate_seed):
|
|
props = self.generate_props(generate_seed)
|
|
for k, v in props.items():
|
|
self._dbus_property_set(IFACE_IP4_CONFIG, k, v)
|
|
|
|
@dbus.service.method(IFACE_TEST, in_signature="s", out_signature="")
|
|
def SetGateway(self, gateway):
|
|
self._dbus_property_set(IFACE_IP4_CONFIG, PRP_IP4_CONFIG_GATEWAY, gateway)
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_IP6_CONFIG_ADDRESSES = "Addresses"
|
|
PRP_IP6_CONFIG_ADDRESSDATA = "AddressData"
|
|
PRP_IP6_CONFIG_GATEWAY = "Gateway"
|
|
PRP_IP6_CONFIG_ROUTES = "Routes"
|
|
PRP_IP6_CONFIG_ROUTEDATA = "RouteData"
|
|
PRP_IP6_CONFIG_NAMESERVERS = "Nameservers"
|
|
PRP_IP6_CONFIG_DOMAINS = "Domains"
|
|
PRP_IP6_CONFIG_SEARCHES = "Searches"
|
|
PRP_IP6_CONFIG_DNSOPTIONS = "DnsOptions"
|
|
PRP_IP6_CONFIG_DNSPRIORITY = "DnsPriority"
|
|
|
|
|
|
class IP6Config(ExportedObj):
|
|
path_counter_next = 1
|
|
path_prefix = "/org/freedesktop/NetworkManager/IP6Config/"
|
|
|
|
def __init__(self, generate_seed=_DEFAULT_ARG):
|
|
ExportedObj.__init__(self, ExportedObj.create_path(IP6Config))
|
|
|
|
if generate_seed is _DEFAULT_ARG:
|
|
generate_seed = self.path
|
|
|
|
props = self._props_generate(generate_seed)
|
|
self.dbus_interface_add(IFACE_IP6_CONFIG, props)
|
|
self.export()
|
|
|
|
def _props_generate(self, generate_seed):
|
|
seed = Util.RandomSeed.wrap(generate_seed)
|
|
|
|
gateway = None
|
|
if seed:
|
|
if Util.random_bool(seed):
|
|
gateway = Util.random_ip(seed, net="2001:a::/64")[0]
|
|
|
|
addrs = []
|
|
if seed:
|
|
for n in range(0, Util.random_int(seed, 4)):
|
|
a = {
|
|
"addr": Util.random_ip(seed, net="2001:a::/64")[0],
|
|
"prefix": Util.random_int(seed, 65, 128),
|
|
"gateway": gateway if n == 0 else None,
|
|
}
|
|
addrs.append(a)
|
|
|
|
routes = []
|
|
if seed:
|
|
for n in range(0, Util.random_int(seed, 4)):
|
|
a = {
|
|
"dest": Util.random_ip(seed, net="2001:a::/64")[0],
|
|
"prefix": Util.random_int(seed, 65, 128),
|
|
"next-hop": (
|
|
None
|
|
if (Util.random_int(seed) % 3 == 0)
|
|
else Util.random_ip(seed, net="2001:a::/64")[0]
|
|
),
|
|
"metric": (
|
|
-1
|
|
if (Util.random_int(seed) % 3 == 0)
|
|
else Util.random_int(seed, 0, 0xFFFFFFFF)
|
|
),
|
|
}
|
|
routes.append(a)
|
|
|
|
nameservers = []
|
|
if seed:
|
|
nameservers = list(
|
|
[
|
|
Util.random_ip(seed, net="2001:a::/64")[0]
|
|
for x in range(Util.random_int(seed, 4))
|
|
]
|
|
)
|
|
|
|
names_selection = [
|
|
"foo1.bar",
|
|
"foo2.bar",
|
|
"foo3.bar",
|
|
"foo4.bar",
|
|
"fo.o.bar",
|
|
"fo.x.y",
|
|
]
|
|
|
|
domains = []
|
|
if seed:
|
|
domains = Util.random_subset(seed, ["dom6." + s for s in names_selection])
|
|
|
|
searches = []
|
|
if seed:
|
|
domains = Util.random_subset(seed, ["sear6." + s for s in names_selection])
|
|
|
|
dnsoptions = []
|
|
if seed:
|
|
dnsoptions = Util.random_subset(
|
|
seed, ["dns6-opt1", "dns6-opt2", "dns6-opt3", "dns6-opt4"]
|
|
)
|
|
|
|
dnspriority = 0
|
|
if seed:
|
|
dnspriority = Util.random_int(seed, -10000, 10000)
|
|
|
|
return {
|
|
PRP_IP6_CONFIG_ADDRESSES: dbus.Array(
|
|
[
|
|
[
|
|
Util.ip6_addr_ay(a["addr"]),
|
|
a["prefix"],
|
|
Util.ip6_addr_ay(a["gateway"] or "::"),
|
|
]
|
|
for a in addrs
|
|
],
|
|
"(ayuay)",
|
|
),
|
|
PRP_IP6_CONFIG_ADDRESSDATA: dbus.Array(
|
|
[
|
|
dbus.Dictionary(
|
|
collections.OrderedDict(
|
|
[
|
|
("address", dbus.String(a["addr"])),
|
|
("prefix", dbus.UInt32(a["prefix"])),
|
|
]
|
|
+ (
|
|
[("gateway", dbus.String(a["gateway"]))]
|
|
if a["gateway"]
|
|
else []
|
|
)
|
|
),
|
|
"sv",
|
|
)
|
|
for a in addrs
|
|
],
|
|
"a{sv}",
|
|
),
|
|
PRP_IP6_CONFIG_GATEWAY: dbus.String(gateway) if gateway else "",
|
|
PRP_IP6_CONFIG_ROUTES: dbus.Array(
|
|
[
|
|
[
|
|
Util.ip6_addr_ay(a["dest"]),
|
|
a["prefix"],
|
|
Util.ip6_addr_ay(a["next-hop"] or "::"),
|
|
max(a["metric"], 0),
|
|
]
|
|
for a in routes
|
|
],
|
|
"(ayuayu)",
|
|
),
|
|
PRP_IP6_CONFIG_ROUTEDATA: dbus.Array(
|
|
[
|
|
dbus.Dictionary(
|
|
collections.OrderedDict(
|
|
[
|
|
("dest", dbus.String(a["dest"])),
|
|
("prefix", dbus.UInt32(a["prefix"])),
|
|
]
|
|
+ (
|
|
[("next-hop", dbus.String(a["next-hop"]))]
|
|
if a["next-hop"]
|
|
else []
|
|
)
|
|
+ (
|
|
[("metric", dbus.UInt32(a["metric"]))]
|
|
if a["metric"] != -1
|
|
else []
|
|
)
|
|
),
|
|
"sv",
|
|
)
|
|
for a in routes
|
|
],
|
|
"a{sv}",
|
|
),
|
|
PRP_IP6_CONFIG_NAMESERVERS: dbus.Array(
|
|
[Util.ip6_addr_ay(n) for n in nameservers], "ay"
|
|
),
|
|
PRP_IP6_CONFIG_DOMAINS: dbus.Array(domains, "s"),
|
|
PRP_IP6_CONFIG_SEARCHES: dbus.Array(searches, "s"),
|
|
PRP_IP6_CONFIG_DNSOPTIONS: dbus.Array(dnsoptions, "s"),
|
|
PRP_IP6_CONFIG_DNSPRIORITY: dbus.Int32(dnspriority),
|
|
}
|
|
|
|
def props_regenerate(self, generate_seed):
|
|
props = self.generate_props(generate_seed)
|
|
for k, v in props.items():
|
|
self._dbus_property_set(IFACE_IP6_CONFIG, k, v)
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_DHCP4_CONFIG_OPTIONS = "Options"
|
|
|
|
|
|
class Dhcp4Config(ExportedObj):
|
|
path_counter_next = 1
|
|
path_prefix = "/org/freedesktop/NetworkManager/DHCP4Config/"
|
|
|
|
def __init__(self, generate_seed=_DEFAULT_ARG):
|
|
ExportedObj.__init__(self, ExportedObj.create_path(Dhcp4Config))
|
|
|
|
if generate_seed is _DEFAULT_ARG:
|
|
generate_seed = self.path
|
|
|
|
props = self._props_generate(generate_seed)
|
|
self.dbus_interface_add(IFACE_DHCP4_CONFIG, props)
|
|
self.export()
|
|
|
|
def _props_generate(self, generate_seed):
|
|
seed = Util.RandomSeed.wrap(generate_seed)
|
|
|
|
options = []
|
|
if seed:
|
|
options = Util.random_subset(
|
|
seed, [("dhcp-4-opt-" + str(i), "val-" + str(i)) for i in range(10)]
|
|
)
|
|
|
|
return {
|
|
PRP_DHCP4_CONFIG_OPTIONS: dbus.Dictionary(
|
|
collections.OrderedDict(options), "sv"
|
|
)
|
|
}
|
|
|
|
def props_regenerate(self, generate_seed):
|
|
props = self.generate_props(generate_seed)
|
|
for k, v in props.items():
|
|
self._dbus_property_set(IFACE_DHCP4_CONFIG, k, v)
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_DHCP6_CONFIG_OPTIONS = "Options"
|
|
|
|
|
|
class Dhcp6Config(ExportedObj):
|
|
path_counter_next = 1
|
|
path_prefix = "/org/freedesktop/NetworkManager/DHCP6Config/"
|
|
|
|
def __init__(self, generate_seed=_DEFAULT_ARG):
|
|
ExportedObj.__init__(self, ExportedObj.create_path(Dhcp6Config))
|
|
|
|
if generate_seed is _DEFAULT_ARG:
|
|
generate_seed = self.path
|
|
|
|
props = self._props_generate(generate_seed)
|
|
self.dbus_interface_add(IFACE_DHCP6_CONFIG, props)
|
|
self.export()
|
|
|
|
def _props_generate(self, generate_seed):
|
|
seed = Util.RandomSeed.wrap(generate_seed)
|
|
|
|
options = []
|
|
if seed:
|
|
options = Util.random_subset(
|
|
seed, [("dhcp-6-opt-" + str(i), "val-" + str(i)) for i in range(10)]
|
|
)
|
|
|
|
return {
|
|
PRP_DHCP4_CONFIG_OPTIONS: dbus.Dictionary(
|
|
collections.OrderedDict(options), "sv"
|
|
)
|
|
}
|
|
|
|
def props_regenerate(self, generate_seed):
|
|
props = self.generate_props(generate_seed)
|
|
for k, v in props.items():
|
|
self._dbus_property_set(IFACE_DHCP6_CONFIG, k, v)
|
|
|
|
|
|
###############################################################################
|
|
|
|
PRP_DNS_MANAGER_MODE = "Mode"
|
|
PRP_DNS_MANAGER_RC_MANAGER = "RcManager"
|
|
PRP_DNS_MANAGER_CONFIGURATION = "Configuration"
|
|
|
|
|
|
class DnsManager(ExportedObj):
|
|
def __init__(self):
|
|
ExportedObj.__init__(self, "/org/freedesktop/NetworkManager/DnsManager")
|
|
|
|
props = {
|
|
PRP_DNS_MANAGER_MODE: "dnsmasq",
|
|
PRP_DNS_MANAGER_RC_MANAGER: "symlink",
|
|
PRP_DNS_MANAGER_CONFIGURATION: dbus.Array(
|
|
[
|
|
dbus.Dictionary(
|
|
{
|
|
"nameservers": dbus.Array(["1.2.3.4", "5.6.7.8"], "s"),
|
|
"priority": dbus.Int32(100),
|
|
},
|
|
"sv",
|
|
)
|
|
],
|
|
"a{sv}",
|
|
),
|
|
}
|
|
|
|
self.dbus_interface_add(IFACE_DNS_MANAGER, props)
|
|
self.export()
|
|
|
|
|
|
###############################################################################
|
|
|
|
PATH_SECRET_AGENT = "/org/freedesktop/NetworkManager/SecretAgent"
|
|
|
|
FLAG_ALLOW_INTERACTION = 0x1
|
|
FLAG_REQUEST_NEW = 0x2
|
|
FLAG_USER_REQUESTED = 0x4
|
|
|
|
|
|
class AgentManager(dbus.service.Object):
|
|
def __init__(self):
|
|
dbus.service.Object.__init__(
|
|
self, gl.bus, "/org/freedesktop/NetworkManager/AgentManager"
|
|
)
|
|
self.agents = {}
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_AGENT_MANAGER,
|
|
in_signature="s",
|
|
out_signature="",
|
|
sender_keyword="sender",
|
|
)
|
|
def Register(self, name, sender=None):
|
|
self.RegisterWithCapabilities(name, 0, sender)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_AGENT_MANAGER,
|
|
in_signature="su",
|
|
out_signature="",
|
|
sender_keyword="sender",
|
|
)
|
|
def RegisterWithCapabilities(self, name, caps, sender=None):
|
|
self.agents[sender] = gl.bus.get_object(sender, PATH_SECRET_AGENT)
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_AGENT_MANAGER,
|
|
in_signature="",
|
|
out_signature="",
|
|
sender_keyword="sender",
|
|
)
|
|
def Unregister(self, sender=None):
|
|
del self.agents[sender]
|
|
|
|
def get_secrets(self, con_hash, path, setting_name):
|
|
if len(self.agents) == 0:
|
|
return None
|
|
|
|
secrets = {}
|
|
for sender in self.agents:
|
|
agent = self.agents[sender]
|
|
try:
|
|
secrets = agent.GetSecrets(
|
|
con_hash,
|
|
path,
|
|
setting_name,
|
|
dbus.Array([], "s"),
|
|
FLAG_ALLOW_INTERACTION | FLAG_USER_REQUESTED,
|
|
dbus_interface=IFACE_AGENT,
|
|
)
|
|
break
|
|
except dbus.DBusException as e:
|
|
if e.get_dbus_name() == IFACE_AGENT + ".UserCanceled":
|
|
raise BusErr.UserCanceledException("User canceled")
|
|
continue
|
|
return secrets
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
class ObjectManager(dbus.service.Object):
|
|
def __init__(self, object_path):
|
|
dbus.service.Object.__init__(self, gl.bus, object_path)
|
|
self.objs = []
|
|
|
|
def find_object(self, path):
|
|
for o in self.objs:
|
|
if path == o.path:
|
|
return o
|
|
return None
|
|
|
|
def add_object(self, obj):
|
|
self.objs.append(obj)
|
|
self.InterfacesAdded(obj.path, obj.get_managed_ifaces())
|
|
|
|
def remove_object(self, obj):
|
|
self.objs.remove(obj)
|
|
self.InterfacesRemoved(obj.path, obj.get_managed_ifaces().keys())
|
|
|
|
@dbus.service.signal(IFACE_OBJECT_MANAGER, signature="oa{sa{sv}}")
|
|
def InterfacesAdded(self, name, ifaces):
|
|
pass
|
|
|
|
@dbus.service.signal(IFACE_OBJECT_MANAGER, signature="oas")
|
|
def InterfacesRemoved(self, name, ifaces):
|
|
pass
|
|
|
|
@dbus.service.method(
|
|
dbus_interface=IFACE_OBJECT_MANAGER,
|
|
in_signature="",
|
|
out_signature="a{oa{sa{sv}}}",
|
|
sender_keyword="sender",
|
|
)
|
|
def GetManagedObjects(self, sender=None):
|
|
managed_objects = {}
|
|
for obj in self.objs:
|
|
managed_objects[obj.path] = obj.get_managed_ifaces()
|
|
return managed_objects
|
|
|
|
|
|
def setup_log_file():
|
|
"""
|
|
Get environment variable for the log file , if any
|
|
We accept a %p placeholder to replace with the pid of the current
|
|
process
|
|
"""
|
|
try:
|
|
log_file_name = os.environ["NM_TEST_NETWORKMANAGER_SERVICE_LOGFILE"]
|
|
except KeyError:
|
|
return None
|
|
if "%p" in log_file_name:
|
|
log_file_name = log_file_name.replace("%p", str(os.getpid()))
|
|
try:
|
|
log_file = open(log_file_name, "w")
|
|
except Exception:
|
|
log_file = None
|
|
|
|
return log_file
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
def main():
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
random.seed()
|
|
|
|
global gl
|
|
gl = Global()
|
|
|
|
gl.log_file = setup_log_file()
|
|
gl.mainloop = GLib.MainLoop()
|
|
gl.bus = dbus.SessionBus()
|
|
gl.force_activation_failure = {}
|
|
|
|
gl.object_manager = ObjectManager("/org/freedesktop")
|
|
gl.manager = NetworkManager()
|
|
gl.settings = Settings()
|
|
gl.dns_manager = DnsManager()
|
|
gl.agent_manager = AgentManager()
|
|
|
|
if not gl.bus.request_name("org.freedesktop.NetworkManager"):
|
|
raise AssertionError(
|
|
"Failure to request D-Bus name org.freedesktop.NetworkManager"
|
|
)
|
|
|
|
# Watch stdin; if it closes, assume our parent has crashed, and exit
|
|
id1 = GLib.io_add_watch(
|
|
GLib.IOChannel.unix_new(0),
|
|
GLib.PRIORITY_DEFAULT,
|
|
GLib.IO_HUP,
|
|
lambda io, condition: gl.mainloop.quit() or True,
|
|
)
|
|
|
|
gl.mainloop.run()
|
|
|
|
GLib.source_remove(id1)
|
|
|
|
gl.agent_manager.remove_from_connection()
|
|
gl.dns_manager.unexport()
|
|
gl.settings.unexport()
|
|
gl.manager.unexport()
|
|
gl.object_manager.remove_from_connection()
|
|
if gl.log_file:
|
|
gl.log_file.close()
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|