mirror of
https://github.com/freebsd/freebsd-src
synced 2024-10-02 22:54:52 +00:00
356 lines
14 KiB
Python
356 lines
14 KiB
Python
import errno
|
|
import socket
|
|
|
|
import pytest
|
|
from atf_python.sys.netlink.netlink_route import IflattrType
|
|
from atf_python.sys.netlink.netlink_route import IflinkInfo
|
|
from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan
|
|
from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage
|
|
from atf_python.sys.netlink.netlink import NetlinkTestTemplate
|
|
from atf_python.sys.netlink.attrs import NlAttrNested
|
|
from atf_python.sys.netlink.attrs import NlAttrStr
|
|
from atf_python.sys.netlink.attrs import NlAttrStrn
|
|
from atf_python.sys.netlink.attrs import NlAttrU16
|
|
from atf_python.sys.netlink.attrs import NlAttrU32
|
|
from atf_python.sys.netlink.utils import NlConst
|
|
from atf_python.sys.netlink.base_headers import NlmBaseFlags
|
|
from atf_python.sys.netlink.base_headers import NlmNewFlags
|
|
from atf_python.sys.netlink.base_headers import NlMsgType
|
|
from atf_python.sys.netlink.netlink_route import NlRtMsgType
|
|
from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs
|
|
from atf_python.sys.net.vnet import SingleVnetTestTemplate
|
|
from atf_python.sys.net.tools import ToolsHelper
|
|
|
|
|
|
class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate):
|
|
def setup_method(self, method):
|
|
super().setup_method(method)
|
|
self.setup_netlink(NlConst.NETLINK_ROUTE)
|
|
|
|
def get_interface_byname(self, ifname):
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
|
|
self.write_message(msg)
|
|
while True:
|
|
rx_msg = self.read_message()
|
|
if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
|
|
if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
|
|
if rx_msg.error_code != 0:
|
|
raise ValueError("unable to get interface {}".format(ifname))
|
|
elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
|
|
return rx_msg
|
|
else:
|
|
raise ValueError("bad message")
|
|
|
|
def test_get_iface_byname_error(self):
|
|
"""Tests error on fetching non-existing interface name"""
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == errno.ENODEV
|
|
|
|
def test_get_iface_byindex_error(self):
|
|
"""Tests error on fetching non-existing interface index"""
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.base_hdr.ifi_index = 2147483647
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == errno.ENODEV
|
|
|
|
@pytest.mark.require_user("root")
|
|
def test_create_iface_plain(self):
|
|
"""Tests loopback creation w/o any parameters"""
|
|
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
|
|
msg.add_nla(
|
|
NlAttrNested(
|
|
IflattrType.IFLA_LINKINFO,
|
|
[
|
|
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
|
|
],
|
|
)
|
|
)
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == 0
|
|
|
|
self.get_interface_byname("lo10")
|
|
|
|
@pytest.mark.require_user("root")
|
|
def test_create_iface_plain_retvals(self):
|
|
"""Tests loopback creation w/o any parameters"""
|
|
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
|
|
msg.add_nla(
|
|
NlAttrNested(
|
|
IflattrType.IFLA_LINKINFO,
|
|
[
|
|
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
|
|
],
|
|
)
|
|
)
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == 0
|
|
assert rx_msg.cookie is not None
|
|
nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
|
|
nla_map = {n.nla_type: n for n in nla_list}
|
|
assert IflattrType.IFLA_IFNAME.value in nla_map
|
|
assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10"
|
|
assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map
|
|
assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0
|
|
|
|
lo_msg = self.get_interface_byname("lo10")
|
|
assert (
|
|
lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
|
|
)
|
|
|
|
@pytest.mark.require_user("root")
|
|
def test_create_iface_attrs(self):
|
|
"""Tests interface creation with additional properties"""
|
|
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
|
|
msg.add_nla(
|
|
NlAttrNested(
|
|
IflattrType.IFLA_LINKINFO,
|
|
[
|
|
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
|
|
],
|
|
)
|
|
)
|
|
|
|
# Custom attributes
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
|
|
msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == 0
|
|
|
|
iface_msg = self.get_interface_byname("lo10")
|
|
assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
|
|
assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
|
|
|
|
@pytest.mark.require_user("root")
|
|
def test_modify_iface_attrs(self):
|
|
"""Tests interface modifications"""
|
|
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
|
|
msg.add_nla(
|
|
NlAttrNested(
|
|
IflattrType.IFLA_LINKINFO,
|
|
[
|
|
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
|
|
],
|
|
)
|
|
)
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == 0
|
|
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
|
|
|
|
# Custom attributes
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
|
|
msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == 0
|
|
|
|
iface_msg = self.get_interface_byname("lo10")
|
|
assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
|
|
assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
|
|
|
|
@pytest.mark.require_user("root")
|
|
def test_delete_iface(self):
|
|
"""Tests interface modifications"""
|
|
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
|
|
msg.add_nla(
|
|
NlAttrNested(
|
|
IflattrType.IFLA_LINKINFO,
|
|
[
|
|
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
|
|
],
|
|
)
|
|
)
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == 0
|
|
|
|
iface_msg = self.get_interface_byname("lo10")
|
|
iface_idx = iface_msg.base_hdr.ifi_index
|
|
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.base_hdr.ifi_index = iface_idx
|
|
# msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == 0
|
|
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.base_hdr.ifi_index = 2147483647
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == errno.ENODEV
|
|
|
|
@pytest.mark.require_user("root")
|
|
def test_dump_ifaces_many(self):
|
|
"""Tests if interface dummp is not missing interfaces"""
|
|
|
|
ifmap = {}
|
|
ifmap[socket.if_nametoindex("lo0")] = "lo0"
|
|
|
|
for i in range(40):
|
|
ifname = "lo{}".format(i + 1)
|
|
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
|
|
msg.add_nla(
|
|
NlAttrNested(
|
|
IflattrType.IFLA_LINKINFO,
|
|
[
|
|
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
|
|
],
|
|
)
|
|
)
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
|
|
nla_map = {n.nla_type: n for n in nla_list}
|
|
assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname
|
|
ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
|
|
assert ifindex > 0
|
|
assert ifindex not in ifmap
|
|
ifmap[ifindex] = ifname
|
|
|
|
# Dump all interfaces and check if the output matches ifmap
|
|
kernel_ifmap = {}
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
self.write_message(msg)
|
|
while True:
|
|
rx_msg = self.read_message()
|
|
if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq:
|
|
raise ValueError(
|
|
"unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq)
|
|
)
|
|
if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
|
|
raise ValueError("unexpected message {}".format(rx_msg))
|
|
if rx_msg.is_type(NlMsgType.NLMSG_DONE):
|
|
break
|
|
if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
|
|
raise ValueError("unexpected message {}".format(rx_msg))
|
|
|
|
ifindex = rx_msg.base_hdr.ifi_index
|
|
assert ifindex == rx_msg.base_hdr.ifi_index
|
|
ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text
|
|
if ifname.startswith("lo"):
|
|
kernel_ifmap[ifindex] = ifname
|
|
assert kernel_ifmap == ifmap
|
|
|
|
#
|
|
# *
|
|
# * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},
|
|
# * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},
|
|
# * {{nla_len=8, nla_type=IFLA_LINK}, 2},
|
|
# * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},
|
|
# * {{nla_len=24, nla_type=IFLA_LINKINFO},
|
|
# * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},
|
|
# * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}
|
|
# */
|
|
@pytest.mark.require_user("root")
|
|
def test_create_vlan_plain(self):
|
|
"""Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""
|
|
self.require_module("if_vlan")
|
|
os_ifname = self.vnet.iface_alias_map["if1"].name
|
|
ifindex = socket.if_nametoindex(os_ifname)
|
|
|
|
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
|
|
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
|
|
msg.nl_hdr.nlmsg_flags = (
|
|
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
|
|
)
|
|
msg.base_hdr.ifi_index = ifindex
|
|
|
|
msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))
|
|
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))
|
|
|
|
msg.add_nla(
|
|
NlAttrNested(
|
|
IflattrType.IFLA_LINKINFO,
|
|
[
|
|
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),
|
|
NlAttrNested(
|
|
IflinkInfo.IFLA_INFO_DATA,
|
|
[
|
|
NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),
|
|
],
|
|
),
|
|
],
|
|
)
|
|
)
|
|
|
|
rx_msg = self.get_reply(msg)
|
|
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
|
|
assert rx_msg.error_code == 0
|
|
|
|
ToolsHelper.print_net_debug()
|
|
self.get_interface_byname("vlan22")
|
|
# ToolsHelper.print_net_debug()
|