netlink: add support for decoding genl ops/groups in pytest

MFC after:	2 weeks
This commit is contained in:
Alexander V. Chernikov 2023-06-01 10:45:29 +00:00
parent fd7edfcdc3
commit 54b955f4df
4 changed files with 73 additions and 10 deletions

View file

@ -194,11 +194,32 @@ def from_bytes(cls, helper, data):
raise
return self
def parse_child(self, data: bytes, attr_key, attr_map):
attrs, _ = self.parse_attrs(data, attr_map)
return NlAttrNested(attr_key, attrs)
def parse_child_array(self, data: bytes, attr_key, attr_map):
ret = []
off = 0
while len(data) - off >= 4:
nla_len, raw_nla_type = struct.unpack("@HH", data[off : off + 4])
if nla_len + off > len(data):
raise ValueError(
"attr length {} > than the remaining length {}".format(
nla_len, len(data) - off
)
)
nla_type = raw_nla_type & 0x3FFF
val = self.parse_child(data[off + 4 : off + nla_len], nla_type, attr_map)
ret.append(val)
off += align4(nla_len)
return NlAttrNested(attr_key, ret)
def parse_attrs(self, data: bytes, attr_map):
ret = []
off = 0
while len(data) - off >= 4:
nla_len, raw_nla_type = struct.unpack("@HH", data[off:off + 4])
nla_len, raw_nla_type = struct.unpack("@HH", data[off : off + 4])
if nla_len + off > len(data):
raise ValueError(
"attr length {} > than the remaining length {}".format(
@ -208,16 +229,20 @@ def parse_attrs(self, data: bytes, attr_map):
nla_type = raw_nla_type & 0x3FFF
if nla_type in attr_map:
v = attr_map[nla_type]
val = v["ad"].cls.from_bytes(data[off:off + nla_len], v["ad"].val)
val = v["ad"].cls.from_bytes(data[off : off + nla_len], v["ad"].val)
if "child" in v:
# nested
attrs, _ = self.parse_attrs(
data[off + 4:off + nla_len], v["child"]
)
val = NlAttrNested(v["ad"].val, attrs)
child_data = data[off + 4 : off + nla_len]
if v.get("is_array", False):
# Array of nested attributes
val = self.parse_child_array(
child_data, v["ad"].val, v["child"]
)
else:
val = self.parse_child(child_data, v["ad"].val, v["child"])
else:
# unknown attribute
val = NlAttr(raw_nla_type, data[off + 4:off + nla_len])
val = NlAttr(raw_nla_type, data[off + 4 : off + nla_len])
ret.append(val)
off += align4(nla_len)
return ret, off

View file

@ -265,6 +265,9 @@ def set_groups(self, mask: int):
# k = struct.pack("@BBHII", 12, 38, 0, self.pid, mask)
# self.sock_fd.bind(k)
def join_group(self, group_id: int):
self.sock_fd.setsockopt(270, 1, group_id)
def write_message(self, msg, verbose=True):
if verbose:
print("vvvvvvvv OUT vvvvvvvv")

View file

@ -9,6 +9,7 @@
from atf_python.sys.netlink.attrs import NlAttr
from atf_python.sys.netlink.attrs import NlAttrIp4
from atf_python.sys.netlink.attrs import NlAttrIp6
from atf_python.sys.netlink.attrs import NlAttrNested
from atf_python.sys.netlink.attrs import NlAttrS32
from atf_python.sys.netlink.attrs import NlAttrStr
from atf_python.sys.netlink.attrs import NlAttrU16
@ -94,6 +95,16 @@ class GenlCtrlAttrType(Enum):
CTRL_ATTR_OP = 10
class GenlCtrlAttrOpType(Enum):
CTRL_ATTR_OP_ID = 1
CTRL_ATTR_OP_FLAGS = 2
class GenlCtrlAttrMcastGroupsType(Enum):
CTRL_ATTR_MCAST_GRP_NAME = 1
CTRL_ATTR_MCAST_GRP_ID = 2
genl_ctrl_attrs = prepare_attrs_map(
[
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_ID, NlAttrU16),
@ -101,6 +112,28 @@ class GenlCtrlAttrType(Enum):
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_VERSION, NlAttrU32),
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_HDRSIZE, NlAttrU32),
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_MAXATTR, NlAttrU32),
AttrDescr(
GenlCtrlAttrType.CTRL_ATTR_OPS,
NlAttrNested,
[
AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_ID, NlAttrU32),
AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_FLAGS, NlAttrU32),
],
True,
),
AttrDescr(
GenlCtrlAttrType.CTRL_ATTR_MCAST_GROUPS,
NlAttrNested,
[
AttrDescr(
GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_NAME, NlAttrStr
),
AttrDescr(
GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_ID, NlAttrU32
),
],
True,
),
]
)
@ -220,13 +253,13 @@ def _print_attr_value(self):
@staticmethod
def _validate(data):
assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN])
nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN])
assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
@classmethod
def _parse(cls, data):
nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN])
val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN:])
nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN])
val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN :])
return cls(nla_type, val)
def __bytes__(self):

View file

@ -34,6 +34,7 @@ class AttrDescr(NamedTuple):
val: Enum
cls: "NlAttr"
child_map: Any = None
is_array: bool = False
def prepare_attrs_map(attrs: List[AttrDescr]) -> Dict[str, Dict]:
@ -42,6 +43,7 @@ def prepare_attrs_map(attrs: List[AttrDescr]) -> Dict[str, Dict]:
ret[ad.val.value] = {"ad": ad}
if ad.child_map:
ret[ad.val.value]["child"] = prepare_attrs_map(ad.child_map)
ret[ad.val.value]["is_array"] = ad.is_array
return ret