freebsd-src/tests/sys/netlink/test_rtnl_iface.py

356 lines
14 KiB
Python

import errno
import socket
import pytest
from atf_python.sys.netlink.netlink_route import IflattrType
from atf_python.sys.netlink.netlink_route import IflinkInfo
from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan
from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage
from atf_python.sys.netlink.netlink import NetlinkTestTemplate
from atf_python.sys.netlink.attrs import NlAttrNested
from atf_python.sys.netlink.attrs import NlAttrStr
from atf_python.sys.netlink.attrs import NlAttrStrn
from atf_python.sys.netlink.attrs import NlAttrU16
from atf_python.sys.netlink.attrs import NlAttrU32
from atf_python.sys.netlink.utils import NlConst
from atf_python.sys.netlink.base_headers import NlmBaseFlags
from atf_python.sys.netlink.base_headers import NlmNewFlags
from atf_python.sys.netlink.base_headers import NlMsgType
from atf_python.sys.netlink.netlink_route import NlRtMsgType
from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs
from atf_python.sys.net.vnet import SingleVnetTestTemplate
from atf_python.sys.net.tools import ToolsHelper
class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate):
def setup_method(self, method):
super().setup_method(method)
self.setup_netlink(NlConst.NETLINK_ROUTE)
def get_interface_byname(self, ifname):
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
msg.nl_hdr.nlmsg_flags = (
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
self.write_message(msg)
while True:
rx_msg = self.read_message()
if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
if rx_msg.error_code != 0:
raise ValueError("unable to get interface {}".format(ifname))
elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
return rx_msg
else:
raise ValueError("bad message")
def test_get_iface_byname_error(self):
"""Tests error on fetching non-existing interface name"""
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
msg.nl_hdr.nlmsg_flags = (
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == errno.ENODEV
def test_get_iface_byindex_error(self):
"""Tests error on fetching non-existing interface index"""
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
msg.nl_hdr.nlmsg_flags = (
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.base_hdr.ifi_index = 2147483647
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == errno.ENODEV
@pytest.mark.require_user("root")
def test_create_iface_plain(self):
"""Tests loopback creation w/o any parameters"""
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
msg.nl_hdr.nlmsg_flags = (
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
msg.add_nla(
NlAttrNested(
IflattrType.IFLA_LINKINFO,
[
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
],
)
)
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == 0
self.get_interface_byname("lo10")
@pytest.mark.require_user("root")
def test_create_iface_plain_retvals(self):
"""Tests loopback creation w/o any parameters"""
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
msg.nl_hdr.nlmsg_flags = (
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
msg.add_nla(
NlAttrNested(
IflattrType.IFLA_LINKINFO,
[
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
],
)
)
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == 0
assert rx_msg.cookie is not None
nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
nla_map = {n.nla_type: n for n in nla_list}
assert IflattrType.IFLA_IFNAME.value in nla_map
assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10"
assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map
assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0
lo_msg = self.get_interface_byname("lo10")
assert (
lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
)
@pytest.mark.require_user("root")
def test_create_iface_attrs(self):
"""Tests interface creation with additional properties"""
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
msg.nl_hdr.nlmsg_flags = (
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
msg.add_nla(
NlAttrNested(
IflattrType.IFLA_LINKINFO,
[
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
],
)
)
# Custom attributes
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == 0
iface_msg = self.get_interface_byname("lo10")
assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
@pytest.mark.require_user("root")
def test_modify_iface_attrs(self):
"""Tests interface modifications"""
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
msg.nl_hdr.nlmsg_flags = (
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
msg.add_nla(
NlAttrNested(
IflattrType.IFLA_LINKINFO,
[
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
],
)
)
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == 0
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
msg.nl_hdr.nlmsg_flags = (
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
# Custom attributes
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == 0
iface_msg = self.get_interface_byname("lo10")
assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
@pytest.mark.require_user("root")
def test_delete_iface(self):
"""Tests interface modifications"""
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
msg.nl_hdr.nlmsg_flags = (
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
msg.add_nla(
NlAttrNested(
IflattrType.IFLA_LINKINFO,
[
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
],
)
)
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == 0
iface_msg = self.get_interface_byname("lo10")
iface_idx = iface_msg.base_hdr.ifi_index
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)
msg.nl_hdr.nlmsg_flags = (
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.base_hdr.ifi_index = iface_idx
# msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == 0
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
msg.nl_hdr.nlmsg_flags = (
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.base_hdr.ifi_index = 2147483647
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == errno.ENODEV
@pytest.mark.require_user("root")
def test_dump_ifaces_many(self):
"""Tests if interface dummp is not missing interfaces"""
ifmap = {}
ifmap[socket.if_nametoindex("lo0")] = "lo0"
for i in range(40):
ifname = "lo{}".format(i + 1)
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
msg.nl_hdr.nlmsg_flags = (
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
msg.add_nla(
NlAttrNested(
IflattrType.IFLA_LINKINFO,
[
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
],
)
)
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
nla_map = {n.nla_type: n for n in nla_list}
assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname
ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
assert ifindex > 0
assert ifindex not in ifmap
ifmap[ifindex] = ifname
# Dump all interfaces and check if the output matches ifmap
kernel_ifmap = {}
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
msg.nl_hdr.nlmsg_flags = (
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
self.write_message(msg)
while True:
rx_msg = self.read_message()
if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq:
raise ValueError(
"unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq)
)
if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
raise ValueError("unexpected message {}".format(rx_msg))
if rx_msg.is_type(NlMsgType.NLMSG_DONE):
break
if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
raise ValueError("unexpected message {}".format(rx_msg))
ifindex = rx_msg.base_hdr.ifi_index
assert ifindex == rx_msg.base_hdr.ifi_index
ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text
if ifname.startswith("lo"):
kernel_ifmap[ifindex] = ifname
assert kernel_ifmap == ifmap
#
# *
# * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},
# * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},
# * {{nla_len=8, nla_type=IFLA_LINK}, 2},
# * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},
# * {{nla_len=24, nla_type=IFLA_LINKINFO},
# * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},
# * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}
# */
@pytest.mark.require_user("root")
def test_create_vlan_plain(self):
"""Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""
self.require_module("if_vlan")
os_ifname = self.vnet.iface_alias_map["if1"].name
ifindex = socket.if_nametoindex(os_ifname)
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
msg.nl_hdr.nlmsg_flags = (
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
)
msg.base_hdr.ifi_index = ifindex
msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))
msg.add_nla(
NlAttrNested(
IflattrType.IFLA_LINKINFO,
[
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),
NlAttrNested(
IflinkInfo.IFLA_INFO_DATA,
[
NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),
],
),
],
)
)
rx_msg = self.get_reply(msg)
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
assert rx_msg.error_code == 0
ToolsHelper.print_net_debug()
self.get_interface_byname("vlan22")
# ToolsHelper.print_net_debug()