ipfw(8): add ioctl/instruction generation tests

Differential Revision: https://reviews.freebsd.org/D40488
MFC after:	2 weeks
This commit is contained in:
Alexander V. Chernikov 2023-06-11 08:12:04 +00:00
parent fa273fa154
commit 9f44a47fd0
17 changed files with 2007 additions and 6 deletions

View file

@ -450,6 +450,8 @@
..
ifconfig
..
ipfw
..
md5
..
mdconfig

View file

@ -587,6 +587,13 @@ stringnum_cmp(const char *a, const char *b)
return (strcmp(a, b));
}
struct debug_header {
uint16_t cmd_type;
uint16_t spare1;
uint32_t opt_name;
uint32_t total_len;
uint32_t spare2;
};
/*
* conditionally runs the command.
@ -597,8 +604,18 @@ do_cmd(int optname, void *optval, uintptr_t optlen)
{
int i;
if (g_co.debug_only) {
struct debug_header dbg = {
.cmd_type = 1,
.opt_name = optname,
.total_len = optlen + sizeof(struct debug_header),
};
write(1, &dbg, sizeof(dbg));
write(1, optval, optlen);
}
if (g_co.test_only)
return 0;
return (0);
if (ipfw_socket == -1)
ipfw_socket = socket(AF_INET, SOCK_RAW, IPPROTO_RAW);
@ -617,7 +634,7 @@ do_cmd(int optname, void *optval, uintptr_t optlen)
} else {
i = setsockopt(ipfw_socket, IPPROTO_IP, optname, optval, optlen);
}
return i;
return (i);
}
/*
@ -634,6 +651,18 @@ int
do_set3(int optname, ip_fw3_opheader *op3, size_t optlen)
{
op3->opcode = optname;
if (g_co.debug_only) {
struct debug_header dbg = {
.cmd_type = 2,
.opt_name = optname,
.total_len = optlen, sizeof(struct debug_header),
};
write(1, &dbg, sizeof(dbg));
write(1, op3, optlen);
}
if (g_co.test_only)
return (0);
@ -642,7 +671,6 @@ do_set3(int optname, ip_fw3_opheader *op3, size_t optlen)
if (ipfw_socket < 0)
err(EX_UNAVAILABLE, "socket");
op3->opcode = optname;
return (setsockopt(ipfw_socket, IPPROTO_IP, IP_FW3, op3, optlen));
}
@ -663,6 +691,18 @@ do_get3(int optname, ip_fw3_opheader *op3, size_t *optlen)
int error;
socklen_t len;
op3->opcode = optname;
if (g_co.debug_only) {
struct debug_header dbg = {
.cmd_type = 3,
.opt_name = optname,
.total_len = *optlen + sizeof(struct debug_header),
};
write(1, &dbg, sizeof(dbg));
write(1, op3, *optlen);
}
if (g_co.test_only)
return (0);
@ -671,7 +711,6 @@ do_get3(int optname, ip_fw3_opheader *op3, size_t *optlen)
if (ipfw_socket < 0)
err(EX_UNAVAILABLE, "socket");
op3->opcode = optname;
len = *optlen;
error = getsockopt(ipfw_socket, IPPROTO_IP, IP_FW3, op3, &len);

View file

@ -48,6 +48,7 @@ struct cmdline_opts {
int test_only; /* only check syntax */
int comment_only; /* only print action and comment */
int verbose; /* be verbose on some commands */
int debug_only; /* output ioctl i/o on stdout */
/* The options below can have multiple values. */

View file

@ -277,7 +277,7 @@ ipfw_main(int oldac, char **oldav)
optind = optreset = 1; /* restart getopt() */
if (is_ipfw()) {
while ((ch = getopt(ac, av, "abcdDefhinNp:qs:STtv")) != -1)
while ((ch = getopt(ac, av, "abcdDefhinNp:qs:STtvx")) != -1)
switch (ch) {
case 'a':
do_acct = 1;
@ -354,6 +354,10 @@ ipfw_main(int oldac, char **oldav)
g_co.verbose = 1;
break;
case 'x': /* debug output */
g_co.debug_only = 1;
break;
default:
free(save_av);
return 1;

5
sbin/ipfw/tests/Makefile Normal file
View file

@ -0,0 +1,5 @@
PACKAGE= tests
ATF_TESTS_PYTEST+= test_add_rule.py
.include <bsd.test.mk>

400
sbin/ipfw/tests/test_add_rule.py Executable file
View file

@ -0,0 +1,400 @@
import errno
import json
import os
import socket
import struct
import subprocess
import sys
from ctypes import c_byte
from ctypes import c_char
from ctypes import c_int
from ctypes import c_long
from ctypes import c_uint32
from ctypes import c_uint8
from ctypes import c_ulong
from ctypes import c_ushort
from ctypes import sizeof
from ctypes import Structure
from enum import Enum
from typing import Any
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Union
import pytest
from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode
from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode
from atf_python.sys.netpfil.ipfw.insns import Insn
from atf_python.sys.netpfil.ipfw.insns import InsnComment
from atf_python.sys.netpfil.ipfw.insns import InsnEmpty
from atf_python.sys.netpfil.ipfw.insns import InsnIp
from atf_python.sys.netpfil.ipfw.insns import InsnIp6
from atf_python.sys.netpfil.ipfw.insns import InsnPorts
from atf_python.sys.netpfil.ipfw.insns import InsnProb
from atf_python.sys.netpfil.ipfw.insns import InsnProto
from atf_python.sys.netpfil.ipfw.insns import InsnReject
from atf_python.sys.netpfil.ipfw.insns import InsnTable
from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode
from atf_python.sys.netpfil.ipfw.ioctl import CTlv
from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule
from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType
from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule
from atf_python.sys.netpfil.ipfw.ioctl import NTlv
from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType
from atf_python.sys.netpfil.ipfw.ioctl import RawRule
from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader
from atf_python.sys.netpfil.ipfw.utils import enum_from_int
from atf_python.utils import BaseTest
IPFW_PATH = "/sbin/ipfw"
def differ(w_obj, g_obj, w_stack=[], g_stack=[]):
if bytes(w_obj) == bytes(g_obj):
return True
num_objects = 0
for i, w_child in enumerate(w_obj.obj_list):
if i > len(g_obj.obj_list):
print("MISSING object from chain {}".format(" / ".join(w_stack)))
w_child.print_obj()
print("==========================")
return False
g_child = g_obj.obj_list[i]
if bytes(w_child) == bytes(g_child):
num_objects += 1
continue
w_stack.append(w_obj.obj_name)
g_stack.append(g_obj.obj_name)
if not differ(w_child, g_child, w_stack, g_stack):
return False
break
if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list):
g_child = g_obj.obj_list[num_objects]
print("EXTRA object from chain {}".format(" / ".join(g_stack)))
g_child.print_obj()
print("==========================")
return False
print("OBJECTS DIFFER")
print("WANTED CHAIN: {}".format(" / ".join(w_stack)))
w_obj.print_obj()
w_obj.print_obj_hex()
print("==========================")
print("GOT CHAIN: {}".format(" / ".join(g_stack)))
g_obj.print_obj()
g_obj.print_obj_hex()
print("==========================")
return False
class TestAddRule(BaseTest):
def compile_rule(self, out):
tlvs = []
if "objs" in out:
tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"]))
rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"])
tlvs.append(CTlvRule(obj_list=[rule]))
return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs)
def verify_rule(self, in_data: str, out_data):
# Prepare the desired output
expected = self.compile_rule(out_data)
reader = DebugIoReader(IPFW_PATH)
ioctls = reader.get_records(in_data)
assert len(ioctls) == 1 # Only 1 ioctl request expected
got = ioctls[0]
if not differ(expected, got):
print("=> CMD: {}".format(in_data))
print("=> WANTED:")
expected.print_obj()
print("==========================")
print("=> GOT:")
got.print_obj()
print("==========================")
assert bytes(got) == bytes(expected)
@pytest.mark.parametrize(
"rule",
[
pytest.param(
{
"in": "add 200 allow ip from any to any",
"out": {
"insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)],
"rulenum": 200,
},
},
id="test_rulenum",
),
pytest.param(
{
"in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any",
"out": {
"insns": [
InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True),
InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"),
InsnEmpty(IpFwOpcode.O_ACCEPT),
],
},
},
id="test_or",
),
pytest.param(
{
"in": "add allow ip from table(AAA) to table(BBB)",
"out": {
"objs": [
NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
],
"insns": [
InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1),
InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2),
InsnEmpty(IpFwOpcode.O_ACCEPT),
],
},
},
id="test_tables",
),
pytest.param(
{
"in": "add allow ip from any to 1.2.3.4 // test comment",
"out": {
"insns": [
InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
InsnComment(comment="test comment"),
InsnEmpty(IpFwOpcode.O_ACCEPT),
],
},
},
id="test_comment",
),
],
)
def test_add_rule(self, rule):
"""Tests if the compiled rule is sane and matches the spec"""
self.verify_rule(rule["in"], rule["out"])
@pytest.mark.parametrize(
"action",
[
pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"),
pytest.param(
(
"abort",
Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT),
),
id="abort",
),
pytest.param(
(
"abort6",
Insn(
IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT
),
),
id="abort6",
),
pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"),
pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"),
pytest.param(
(
"reject",
Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST),
),
id="reject",
),
pytest.param(
(
"reset",
Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST),
),
id="reset",
),
pytest.param(
(
"reset6",
Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST),
),
id="reset6",
),
pytest.param(
(
"unreach port",
InsnReject(
IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
),
),
id="unreach_port",
),
pytest.param(
(
"unreach port",
InsnReject(
IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
),
),
id="unreach_port",
),
pytest.param(
(
"unreach needfrag",
InsnReject(
IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG
),
),
id="unreach_needfrag",
),
pytest.param(
(
"unreach needfrag 1420",
InsnReject(
IpFwOpcode.O_REJECT,
arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG,
mtu=1420,
),
),
id="unreach_needfrag_mtu",
),
pytest.param(
(
"unreach6 port",
Insn(
IpFwOpcode.O_UNREACH6,
arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT,
),
),
id="unreach6_port",
),
pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"),
# TOK_NAT
pytest.param(
("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42"
),
pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"),
pytest.param(
("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42"
),
pytest.param(
("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42"
),
pytest.param(
("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42"
),
pytest.param(
("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42"
),
pytest.param(
("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd"
),
pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"),
pytest.param(
("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420"
),
# TOK_FORWARD
# TOK_COMMENT
pytest.param(
("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)),
id="setfib_1",
marks=pytest.mark.skip("needs net.fibs>1"),
),
pytest.param(
("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)),
id="setdscp_42",
),
pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"),
pytest.param(
("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return"
),
],
)
def test_add_action(self, action):
"""Tests if the rule action is compiled properly"""
rule_in = "add {} ip from any to any".format(action[0])
rule_out = {"insns": [action[1]]}
self.verify_rule(rule_in, rule_out)
@pytest.mark.parametrize(
"insn",
[
pytest.param(
{
"in": "add prob 0.7 allow ip from any to any",
"out": InsnProb(prob=0.7),
},
id="test_prob",
),
pytest.param(
{
"in": "add allow tcp from any to any",
"out": InsnProto(arg1=6),
},
id="test_proto",
),
pytest.param(
{
"in": "add allow ip from any to any 57",
"out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]),
},
id="test_ports",
),
],
)
def test_add_single_instruction(self, insn):
"""Tests if the compiled rule is sane and matches the spec"""
# Prepare the desired output
out = {
"insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)],
}
self.verify_rule(insn["in"], out)
@pytest.mark.parametrize(
"opcode",
[
pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"),
pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"),
],
)
@pytest.mark.parametrize(
"params",
[
pytest.param(
{
"in": "57",
"out": [(57, 57)],
},
id="test_single",
),
pytest.param(
{
"in": "57-59",
"out": [(57, 59)],
},
id="test_range",
),
pytest.param(
{
"in": "57-59,41",
"out": [(57, 59), (41, 41)],
},
id="test_ranges",
),
],
)
def test_add_ports(self, params, opcode):
if opcode == IpFwOpcode.O_IP_DSTPORT:
txt = "add allow ip from any to any " + params["in"]
else:
txt = "add allow ip from any " + params["in"] + " to any"
out = {
"insns": [
InsnPorts(opcode, port_pairs=params["out"]),
InsnEmpty(IpFwOpcode.O_ACCEPT),
]
}
self.verify_rule(txt, out)

View file

@ -3,7 +3,7 @@
.PATH: ${.CURDIR}
FILES= __init__.py
SUBDIR= net netlink
SUBDIR= net netlink netpfil
.include <bsd.own.mk>
FILESDIR= ${TESTSBASE}/atf_python/sys

View file

@ -0,0 +1,11 @@
.include <src.opts.mk>
.PATH: ${.CURDIR}
FILES= __init__.py
SUBDIR= ipfw
.include <bsd.own.mk>
FILESDIR= ${TESTSBASE}/atf_python/sys/netpfil
.include <bsd.prog.mk>

View file

View file

@ -0,0 +1,12 @@
.include <src.opts.mk>
.PATH: ${.CURDIR}
FILES= __init__.py insns.py insn_headers.py ioctl.py ioctl_headers.py \
ipfw.py utils.py
.include <bsd.own.mk>
FILESDIR= ${TESTSBASE}/atf_python/sys/netpfil/ipfw
.include <bsd.prog.mk>

View file

@ -0,0 +1,198 @@
from enum import Enum
class IpFwOpcode(Enum):
O_NOP = 0
O_IP_SRC = 1
O_IP_SRC_MASK = 2
O_IP_SRC_ME = 3
O_IP_SRC_SET = 4
O_IP_DST = 5
O_IP_DST_MASK = 6
O_IP_DST_ME = 7
O_IP_DST_SET = 8
O_IP_SRCPORT = 9
O_IP_DSTPORT = 10
O_PROTO = 11
O_MACADDR2 = 12
O_MAC_TYPE = 13
O_LAYER2 = 14
O_IN = 15
O_FRAG = 16
O_RECV = 17
O_XMIT = 18
O_VIA = 19
O_IPOPT = 20
O_IPLEN = 21
O_IPID = 22
O_IPTOS = 23
O_IPPRECEDENCE = 24
O_IPTTL = 25
O_IPVER = 26
O_UID = 27
O_GID = 28
O_ESTAB = 29
O_TCPFLAGS = 30
O_TCPWIN = 31
O_TCPSEQ = 32
O_TCPACK = 33
O_ICMPTYPE = 34
O_TCPOPTS = 35
O_VERREVPATH = 36
O_VERSRCREACH = 37
O_PROBE_STATE = 38
O_KEEP_STATE = 39
O_LIMIT = 40
O_LIMIT_PARENT = 41
O_LOG = 42
O_PROB = 43
O_CHECK_STATE = 44
O_ACCEPT = 45
O_DENY = 46
O_REJECT = 47
O_COUNT = 48
O_SKIPTO = 49
O_PIPE = 50
O_QUEUE = 51
O_DIVERT = 52
O_TEE = 53
O_FORWARD_IP = 54
O_FORWARD_MAC = 55
O_NAT = 56
O_REASS = 57
O_IPSEC = 58
O_IP_SRC_LOOKUP = 59
O_IP_DST_LOOKUP = 60
O_ANTISPOOF = 61
O_JAIL = 62
O_ALTQ = 63
O_DIVERTED = 64
O_TCPDATALEN = 65
O_IP6_SRC = 66
O_IP6_SRC_ME = 67
O_IP6_SRC_MASK = 68
O_IP6_DST = 69
O_IP6_DST_ME = 70
O_IP6_DST_MASK = 71
O_FLOW6ID = 72
O_ICMP6TYPE = 73
O_EXT_HDR = 74
O_IP6 = 75
O_NETGRAPH = 76
O_NGTEE = 77
O_IP4 = 78
O_UNREACH6 = 79
O_TAG = 80
O_TAGGED = 81
O_SETFIB = 82
O_FIB = 83
O_SOCKARG = 84
O_CALLRETURN = 85
O_FORWARD_IP6 = 86
O_DSCP = 87
O_SETDSCP = 88
O_IP_FLOW_LOOKUP = 89
O_EXTERNAL_ACTION = 90
O_EXTERNAL_INSTANCE = 91
O_EXTERNAL_DATA = 92
O_SKIP_ACTION = 93
O_TCPMSS = 94
O_MAC_SRC_LOOKUP = 95
O_MAC_DST_LOOKUP = 96
O_SETMARK = 97
O_MARK = 98
O_LAST_OPCODE = 99
class Op3CmdType(Enum):
IP_FW_TABLE_XADD = 86
IP_FW_TABLE_XDEL = 87
IP_FW_TABLE_XGETSIZE = 88
IP_FW_TABLE_XLIST = 89
IP_FW_TABLE_XDESTROY = 90
IP_FW_TABLES_XLIST = 92
IP_FW_TABLE_XINFO = 93
IP_FW_TABLE_XFLUSH = 94
IP_FW_TABLE_XCREATE = 95
IP_FW_TABLE_XMODIFY = 96
IP_FW_XGET = 97
IP_FW_XADD = 98
IP_FW_XDEL = 99
IP_FW_XMOVE = 100
IP_FW_XZERO = 101
IP_FW_XRESETLOG = 102
IP_FW_SET_SWAP = 103
IP_FW_SET_MOVE = 104
IP_FW_SET_ENABLE = 105
IP_FW_TABLE_XFIND = 106
IP_FW_XIFLIST = 107
IP_FW_TABLES_ALIST = 108
IP_FW_TABLE_XSWAP = 109
IP_FW_TABLE_VLIST = 110
IP_FW_NAT44_XCONFIG = 111
IP_FW_NAT44_DESTROY = 112
IP_FW_NAT44_XGETCONFIG = 113
IP_FW_NAT44_LIST_NAT = 114
IP_FW_NAT44_XGETLOG = 115
IP_FW_DUMP_SOPTCODES = 116
IP_FW_DUMP_SRVOBJECTS = 117
IP_FW_NAT64STL_CREATE = 130
IP_FW_NAT64STL_DESTROY = 131
IP_FW_NAT64STL_CONFIG = 132
IP_FW_NAT64STL_LIST = 133
IP_FW_NAT64STL_STATS = 134
IP_FW_NAT64STL_RESET_STATS = 135
IP_FW_NAT64LSN_CREATE = 140
IP_FW_NAT64LSN_DESTROY = 141
IP_FW_NAT64LSN_CONFIG = 142
IP_FW_NAT64LSN_LIST = 143
IP_FW_NAT64LSN_STATS = 144
IP_FW_NAT64LSN_LIST_STATES = 145
IP_FW_NAT64LSN_RESET_STATS = 146
IP_FW_NPTV6_CREATE = 150
IP_FW_NPTV6_DESTROY = 151
IP_FW_NPTV6_CONFIG = 152
IP_FW_NPTV6_LIST = 153
IP_FW_NPTV6_STATS = 154
IP_FW_NPTV6_RESET_STATS = 155
IP_FW_NAT64CLAT_CREATE = 160
IP_FW_NAT64CLAT_DESTROY = 161
IP_FW_NAT64CLAT_CONFIG = 162
IP_FW_NAT64CLAT_LIST = 163
IP_FW_NAT64CLAT_STATS = 164
IP_FW_NAT64CLAT_RESET_STATS = 165
class IcmpRejectCode(Enum):
ICMP_UNREACH_NET = 0
ICMP_UNREACH_HOST = 1
ICMP_UNREACH_PROTOCOL = 2
ICMP_UNREACH_PORT = 3
ICMP_UNREACH_NEEDFRAG = 4
ICMP_UNREACH_SRCFAIL = 5
ICMP_UNREACH_NET_UNKNOWN = 6
ICMP_UNREACH_HOST_UNKNOWN = 7
ICMP_UNREACH_ISOLATED = 8
ICMP_UNREACH_NET_PROHIB = 9
ICMP_UNREACH_HOST_PROHIB = 10
ICMP_UNREACH_TOSNET = 11
ICMP_UNREACH_TOSHOST = 12
ICMP_UNREACH_FILTER_PROHIB = 13
ICMP_UNREACH_HOST_PRECEDENCE = 14
ICMP_UNREACH_PRECEDENCE_CUTOFF = 15
ICMP_REJECT_RST = 256
ICMP_REJECT_ABORT = 257
class Icmp6RejectCode(Enum):
ICMP6_DST_UNREACH_NOROUTE = 0
ICMP6_DST_UNREACH_ADMIN = 1
ICMP6_DST_UNREACH_BEYONDSCOPE = 2
ICMP6_DST_UNREACH_NOTNEIGHBOR = 2
ICMP6_DST_UNREACH_ADDR = 3
ICMP6_DST_UNREACH_NOPORT = 4
ICMP6_DST_UNREACH_POLICY = 5
ICMP6_DST_UNREACH_REJECT = 6
ICMP6_DST_UNREACH_SRCROUTE = 7
ICMP6_UNREACH_RST = 256
ICMP6_UNREACH_ABORT = 257

View file

@ -0,0 +1,555 @@
#!/usr/bin/env python3
import os
import socket
import struct
import subprocess
import sys
from ctypes import c_byte
from ctypes import c_char
from ctypes import c_int
from ctypes import c_long
from ctypes import c_uint32
from ctypes import c_uint8
from ctypes import c_ulong
from ctypes import c_ushort
from ctypes import sizeof
from ctypes import Structure
from enum import Enum
from typing import Any
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Union
from atf_python.sys.netpfil.ipfw.insn_headers import IpFwOpcode
from atf_python.sys.netpfil.ipfw.insn_headers import IcmpRejectCode
from atf_python.sys.netpfil.ipfw.insn_headers import Icmp6RejectCode
from atf_python.sys.netpfil.ipfw.utils import AttrDescr
from atf_python.sys.netpfil.ipfw.utils import enum_or_int
from atf_python.sys.netpfil.ipfw.utils import enum_from_int
from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
insn_actions = (
IpFwOpcode.O_CHECK_STATE.value,
IpFwOpcode.O_REJECT.value,
IpFwOpcode.O_UNREACH6.value,
IpFwOpcode.O_ACCEPT.value,
IpFwOpcode.O_DENY.value,
IpFwOpcode.O_COUNT.value,
IpFwOpcode.O_NAT.value,
IpFwOpcode.O_QUEUE.value,
IpFwOpcode.O_PIPE.value,
IpFwOpcode.O_SKIPTO.value,
IpFwOpcode.O_NETGRAPH.value,
IpFwOpcode.O_NGTEE.value,
IpFwOpcode.O_DIVERT.value,
IpFwOpcode.O_TEE.value,
IpFwOpcode.O_CALLRETURN.value,
IpFwOpcode.O_FORWARD_IP.value,
IpFwOpcode.O_FORWARD_IP6.value,
IpFwOpcode.O_SETFIB.value,
IpFwOpcode.O_SETDSCP.value,
IpFwOpcode.O_REASS.value,
IpFwOpcode.O_SETMARK.value,
IpFwOpcode.O_EXTERNAL_ACTION.value,
)
class IpFwInsn(Structure):
_fields_ = [
("opcode", c_uint8),
("length", c_uint8),
("arg1", c_ushort),
]
class BaseInsn(object):
obj_enum_class = IpFwOpcode
def __init__(self, opcode, is_or, is_not, arg1):
if isinstance(opcode, Enum):
self.obj_type = opcode.value
self._enum = opcode
else:
self.obj_type = opcode
self._enum = enum_from_int(self.obj_enum_class, self.obj_type)
self.is_or = is_or
self.is_not = is_not
self.arg1 = arg1
self.is_action = self.obj_type in insn_actions
self.ilen = 1
self.obj_list = []
@property
def obj_name(self):
if self._enum is not None:
return self._enum.name
else:
return "opcode#{}".format(self.obj_type)
@staticmethod
def get_insn_len(data: bytes) -> int:
(opcode_len,) = struct.unpack("@B", data[1:2])
return opcode_len & 0x3F
@classmethod
def _validate_len(cls, data, valid_options=None):
if len(data) < 4:
raise ValueError("opcode too short")
opcode_type, opcode_len = struct.unpack("@BB", data[:2])
if len(data) != ((opcode_len & 0x3F) * 4):
raise ValueError("wrong length")
if valid_options and len(data) not in valid_options:
raise ValueError(
"len {} not in {} for {}".format(
len(data), valid_options,
enum_from_int(cls.obj_enum_class, data[0])
)
)
@classmethod
def _validate(cls, data):
cls._validate_len(data)
@classmethod
def _parse(cls, data):
insn = IpFwInsn.from_buffer_copy(data[:4])
is_or = (insn.length & 0x40) != 0
is_not = (insn.length & 0x80) != 0
return cls(opcode=insn.opcode, is_or=is_or, is_not=is_not, arg1=insn.arg1)
@classmethod
def from_bytes(cls, data, attr_type_enum):
cls._validate(data)
opcode = cls._parse(data)
opcode._enum = attr_type_enum
return opcode
def __bytes__(self):
raise NotImplementedError()
def print_obj(self, prepend=""):
is_or = ""
if self.is_or:
is_or = " [OR]\\"
is_not = ""
if self.is_not:
is_not = "[!] "
print(
"{}{}len={} type={}({}){}{}".format(
prepend,
is_not,
len(bytes(self)),
self.obj_name,
self.obj_type,
self._print_obj_value(),
is_or,
)
)
def _print_obj_value(self):
raise NotImplementedError()
def print_obj_hex(self, prepend=""):
print(prepend)
print()
print(" ".join(["x{:02X}".format(b) for b in bytes(self)]))
@staticmethod
def parse_insns(data, attr_map):
ret = []
off = 0
while off + sizeof(IpFwInsn) <= len(data):
hdr = IpFwInsn.from_buffer_copy(data[off : off + sizeof(IpFwInsn)])
insn_len = (hdr.length & 0x3F) * 4
if off + insn_len > len(data):
raise ValueError("wrng length")
# print("GET insn type {} len {}".format(hdr.opcode, insn_len))
attr = attr_map.get(hdr.opcode, None)
if attr is None:
cls = InsnUnknown
type_enum = enum_from_int(BaseInsn.obj_enum_class, hdr.opcode)
else:
cls = attr["ad"].cls
type_enum = attr["ad"].val
insn = cls.from_bytes(data[off : off + insn_len], type_enum)
ret.append(insn)
off += insn_len
if off != len(data):
raise ValueError("empty space")
return ret
class Insn(BaseInsn):
def __init__(self, opcode, is_or=False, is_not=False, arg1=0):
super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
@classmethod
def _validate(cls, data):
cls._validate_len(data, [4])
def __bytes__(self):
length = self.ilen
if self.is_or:
length |= 0x40
if self.is_not:
length | 0x80
insn = IpFwInsn(opcode=self.obj_type, length=length, arg1=enum_or_int(self.arg1))
return bytes(insn)
def _print_obj_value(self):
return " arg1={}".format(self.arg1)
class InsnUnknown(Insn):
@classmethod
def _validate(cls, data):
cls._validate_len(data)
@classmethod
def _parse(cls, data):
self = super()._parse(data)
self._data = data
return self
def __bytes__(self):
return self._data
def _print_obj_value(self):
return " " + " ".join(["x{:02X}".format(b) for b in self._data])
class InsnEmpty(Insn):
@classmethod
def _validate(cls, data):
cls._validate_len(data, [4])
insn = IpFwInsn.from_buffer_copy(data[:4])
if insn.arg1 != 0:
raise ValueError("arg1 should be empty")
def _print_obj_value(self):
return ""
class InsnComment(Insn):
def __init__(self, opcode=IpFwOpcode.O_NOP, is_or=False, is_not=False, arg1=0, comment=""):
super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
if comment:
self.comment = comment
else:
self.comment = ""
@classmethod
def _validate(cls, data):
cls._validate_len(data)
if len(data) > 88:
raise ValueError("comment too long")
@classmethod
def _parse(cls, data):
self = super()._parse(data)
# Comment encoding can be anything,
# use utf-8 to ease debugging
max_len = 0
for b in range(4, len(data)):
if data[b] == b"\0":
break
max_len += 1
self.comment = data[4:max_len].decode("utf-8")
return self
def __bytes__(self):
ret = super().__bytes__()
comment_bytes = self.comment.encode("utf-8") + b"\0"
if len(comment_bytes) % 4 > 0:
comment_bytes += b"\0" * (4 - (len(comment_bytes) % 4))
ret += comment_bytes
return ret
def _print_obj_value(self):
return " comment='{}'".format(self.comment)
class InsnProto(Insn):
def __init__(self, opcode=IpFwOpcode.O_PROTO, is_or=False, is_not=False, arg1=0):
super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
def _print_obj_value(self):
known_map = {6: "TCP", 17: "UDP", 41: "IPV6"}
proto = self.arg1
if proto in known_map:
return " proto={}".format(known_map[proto])
else:
return " proto=#{}".format(proto)
class InsnU32(Insn):
def __init__(self, opcode, is_or=False, is_not=False, arg1=0, u32=0):
super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
self.u32 = u32
self.ilen = 2
@classmethod
def _validate(cls, data):
cls._validate_len(data, [8])
@classmethod
def _parse(cls, data):
self = super()._parse(data[:4])
self.u32 = struct.unpack("@I", data[4:8])[0]
return self
def __bytes__(self):
return super().__bytes__() + struct.pack("@I", self.u32)
def _print_obj_value(self):
return " arg1={} u32={}".format(self.arg1, self.u32)
class InsnProb(InsnU32):
def __init__(
self,
opcode=IpFwOpcode.O_PROB,
is_or=False,
is_not=False,
arg1=0,
u32=0,
prob=0.0,
):
super().__init__(opcode, is_or=is_or, is_not=is_not)
self.prob = prob
@property
def prob(self):
return 1.0 * self.u32 / 0x7FFFFFFF
@prob.setter
def prob(self, prob: float):
self.u32 = int(prob * 0x7FFFFFFF)
def _print_obj_value(self):
return " prob={}".format(round(self.prob, 5))
class InsnIp(InsnU32):
def __init__(self, opcode, is_or=False, is_not=False, arg1=0, u32=0, ip=None):
super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1, u32=u32)
if ip:
self.ip = ip
@property
def ip(self):
return socket.inet_ntop(socket.AF_INET, struct.pack("@I", self.u32))
@ip.setter
def ip(self, ip: str):
ip_bin = socket.inet_pton(socket.AF_INET, ip)
self.u32 = struct.unpack("@I", ip_bin)[0]
def _print_opcode_value(self):
return " ip={}".format(self.ip)
class InsnTable(Insn):
@classmethod
def _validate(cls, data):
cls._validate_len(data, [4, 8])
@classmethod
def _parse(cls, data):
self = super()._parse(data)
if len(data) == 8:
(self.val,) = struct.unpack("@I", data[4:8])
self.ilen = 2
else:
self.val = None
return self
def __bytes__(self):
ret = super().__bytes__()
if getattr(self, "val", None) is not None:
ret += struct.pack("@I", self.val)
return ret
def _print_obj_value(self):
if getattr(self, "val", None) is not None:
return " table={} value={}".format(self.arg1, self.val)
else:
return " table={}".format(self.arg1)
class InsnReject(Insn):
def __init__(self, opcode, is_or=False, is_not=False, arg1=0, mtu=None):
super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
self.mtu = mtu
if self.mtu is not None:
self.ilen = 2
@classmethod
def _validate(cls, data):
cls._validate_len(data, [4, 8])
@classmethod
def _parse(cls, data):
self = super()._parse(data)
if len(data) == 8:
(self.mtu,) = struct.unpack("@I", data[4:8])
self.ilen = 2
else:
self.mtu = None
return self
def __bytes__(self):
ret = super().__bytes__()
if getattr(self, "mtu", None) is not None:
ret += struct.pack("@I", self.mtu)
return ret
def _print_obj_value(self):
code = enum_from_int(IcmpRejectCode, self.arg1)
if getattr(self, "mtu", None) is not None:
return " code={} mtu={}".format(code, self.mtu)
else:
return " code={}".format(code)
class InsnPorts(Insn):
def __init__(self, opcode, is_or=False, is_not=False, arg1=0, port_pairs=[]):
super().__init__(opcode, is_or=is_or, is_not=is_not)
self.port_pairs = []
if port_pairs:
self.port_pairs = port_pairs
@classmethod
def _validate(cls, data):
if len(data) < 8:
raise ValueError("no ports specified")
cls._validate_len(data)
@classmethod
def _parse(cls, data):
self = super()._parse(data)
off = 4
port_pairs = []
while off + 4 <= len(data):
low, high = struct.unpack("@HH", data[off : off + 4])
port_pairs.append((low, high))
off += 4
self.port_pairs = port_pairs
return self
def __bytes__(self):
ret = super().__bytes__()
if getattr(self, "val", None) is not None:
ret += struct.pack("@I", self.val)
return ret
def _print_obj_value(self):
ret = []
for p in self.port_pairs:
if p[0] == p[1]:
ret.append(str(p[0]))
else:
ret.append("{}-{}".format(p[0], p[1]))
return " ports={}".format(",".join(ret))
class IpFwInsnIp6(Structure):
_fields_ = [
("o", IpFwInsn),
("addr6", c_byte * 16),
("mask6", c_byte * 16),
]
class InsnIp6(Insn):
def __init__(self, opcode, is_or=False, is_not=False, arg1=0, ip6=None, mask6=None):
super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
self.ip6 = ip6
self.mask6 = mask6
if mask6 is not None:
self.ilen = 9
else:
self.ilen = 5
@classmethod
def _validate(cls, data):
cls._validate_len(data, [4 + 16, 4 + 16 * 2])
@classmethod
def _parse(cls, data):
self = super()._parse(data)
self.ip6 = socket.inet_ntop(socket.AF_INET6, data[4:20])
if len(data) == 4 + 16 * 2:
self.mask6 = socket.inet_ntop(socket.AF_INET6, data[20:36])
self.ilen = 9
else:
self.mask6 = None
self.ilen = 5
return self
def __bytes__(self):
ret = super().__bytes__() + socket.inet_pton(socket.AF_INET6, self.ip6)
if self.mask6 is not None:
ret += socket.inet_pton(socket.AF_INET6, self.mask6)
return ret
def _print_obj_value(self):
if self.mask6:
return " ip6={}/{}".format(self.ip6, self.mask6)
else:
return " ip6={}".format(self.ip6)
insn_attrs = prepare_attrs_map(
[
AttrDescr(IpFwOpcode.O_CHECK_STATE, Insn),
AttrDescr(IpFwOpcode.O_ACCEPT, InsnEmpty),
AttrDescr(IpFwOpcode.O_COUNT, InsnEmpty),
AttrDescr(IpFwOpcode.O_REJECT, InsnReject),
AttrDescr(IpFwOpcode.O_UNREACH6, Insn),
AttrDescr(IpFwOpcode.O_DENY, InsnEmpty),
AttrDescr(IpFwOpcode.O_DIVERT, Insn),
AttrDescr(IpFwOpcode.O_COUNT, InsnEmpty),
AttrDescr(IpFwOpcode.O_QUEUE, Insn),
AttrDescr(IpFwOpcode.O_PIPE, Insn),
AttrDescr(IpFwOpcode.O_SKIPTO, Insn),
AttrDescr(IpFwOpcode.O_NETGRAPH, Insn),
AttrDescr(IpFwOpcode.O_NGTEE, Insn),
AttrDescr(IpFwOpcode.O_DIVERT, Insn),
AttrDescr(IpFwOpcode.O_TEE, Insn),
AttrDescr(IpFwOpcode.O_CALLRETURN, Insn),
AttrDescr(IpFwOpcode.O_SETFIB, Insn),
AttrDescr(IpFwOpcode.O_SETDSCP, Insn),
AttrDescr(IpFwOpcode.O_REASS, InsnEmpty),
AttrDescr(IpFwOpcode.O_SETMARK, Insn),
AttrDescr(IpFwOpcode.O_NOP, InsnComment),
AttrDescr(IpFwOpcode.O_PROTO, InsnProto),
AttrDescr(IpFwOpcode.O_PROB, InsnProb),
AttrDescr(IpFwOpcode.O_IP_DST_ME, InsnEmpty),
AttrDescr(IpFwOpcode.O_IP_SRC_ME, InsnEmpty),
AttrDescr(IpFwOpcode.O_IP6_DST_ME, InsnEmpty),
AttrDescr(IpFwOpcode.O_IP6_SRC_ME, InsnEmpty),
AttrDescr(IpFwOpcode.O_IP_SRC, InsnIp),
AttrDescr(IpFwOpcode.O_IP_DST, InsnIp),
AttrDescr(IpFwOpcode.O_IP6_DST, InsnIp6),
AttrDescr(IpFwOpcode.O_IP6_SRC, InsnIp6),
AttrDescr(IpFwOpcode.O_IP_SRC_LOOKUP, InsnTable),
AttrDescr(IpFwOpcode.O_IP_DST_LOOKUP, InsnTable),
AttrDescr(IpFwOpcode.O_IP_SRCPORT, InsnPorts),
AttrDescr(IpFwOpcode.O_IP_DSTPORT, InsnPorts),
AttrDescr(IpFwOpcode.O_PROBE_STATE, Insn),
AttrDescr(IpFwOpcode.O_KEEP_STATE, Insn),
]
)

View file

@ -0,0 +1,505 @@
#!/usr/bin/env python3
import os
import socket
import struct
import subprocess
import sys
from ctypes import c_byte
from ctypes import c_char
from ctypes import c_int
from ctypes import c_long
from ctypes import c_uint32
from ctypes import c_uint8
from ctypes import c_ulong
from ctypes import c_ushort
from ctypes import sizeof
from ctypes import Structure
from enum import Enum
from typing import Any
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Union
import pytest
from atf_python.sys.netpfil.ipfw.insns import BaseInsn
from atf_python.sys.netpfil.ipfw.insns import insn_attrs
from atf_python.sys.netpfil.ipfw.ioctl_headers import IpFwTableLookupType
from atf_python.sys.netpfil.ipfw.ioctl_headers import IpFwTlvType
from atf_python.sys.netpfil.ipfw.ioctl_headers import Op3CmdType
from atf_python.sys.netpfil.ipfw.utils import align8
from atf_python.sys.netpfil.ipfw.utils import AttrDescr
from atf_python.sys.netpfil.ipfw.utils import enum_from_int
from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
class IpFw3OpHeader(Structure):
_fields_ = [
("opcode", c_ushort),
("version", c_ushort),
("reserved1", c_ushort),
("reserved2", c_ushort),
]
class IpFwObjTlv(Structure):
_fields_ = [
("n_type", c_ushort),
("flags", c_ushort),
("length", c_uint32),
]
class BaseTlv(object):
obj_enum_class = IpFwTlvType
def __init__(self, obj_type):
if isinstance(obj_type, Enum):
self.obj_type = obj_type.value
self._enum = obj_type
else:
self.obj_type = obj_type
self._enum = enum_from_int(self.obj_enum_class, obj_type)
self.obj_list = []
def add_obj(self, obj):
self.obj_list.append(obj)
@property
def len(self):
return len(bytes(self))
@property
def obj_name(self):
if self._enum is not None:
return self._enum.name
else:
return "tlv#{}".format(self.obj_type)
def print_hdr(self, prepend=""):
print(
"{}len={} type={}({}){}".format(
prepend, self.len, self.obj_name, self.obj_type, self._print_obj_value()
)
)
def print_obj(self, prepend=""):
self.print_hdr(prepend)
prepend = " " + prepend
for obj in self.obj_list:
obj.print_obj(prepend)
@classmethod
def _validate(cls, data):
if len(data) < sizeof(IpFwObjTlv):
raise ValueError("TLV too short")
hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
if len(data) != hdr.length:
raise ValueError("wrong TLV size")
@classmethod
def _parse(cls, data, attr_map):
hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
return cls(hdr.n_type)
@classmethod
def from_bytes(cls, data, attr_map=None):
cls._validate(data)
obj = cls._parse(data, attr_map)
return obj
def __bytes__(self):
raise NotImplementedError()
def _print_obj_value(self):
return " " + " ".join(
["x{:02X}".format(b) for b in self._data[sizeof(IpFwObjTlv) :]]
)
def as_hexdump(self):
return " ".join(["x{:02X}".format(b) for b in bytes(self)])
class UnknownTlv(BaseTlv):
def __init__(self, obj_type, data):
super().__init__(obj_type)
self._data = data
@classmethod
def _validate(cls, data):
if len(data) < sizeof(IpFwObjNTlv):
raise ValueError("TLV size is too short")
hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
if len(data) != hdr.length:
raise ValueError("wrong TLV size")
@classmethod
def _parse(cls, data, attr_map):
hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
self = cls(hdr.n_type, data)
return self
def __bytes__(self):
return self._data
class Tlv(BaseTlv):
@staticmethod
def parse_tlvs(data, attr_map):
# print("PARSING " + " ".join(["x{:02X}".format(b) for b in data]))
off = 0
ret = []
while off + sizeof(IpFwObjTlv) <= len(data):
hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)])
if off + hdr.length > len(data):
raise ValueError("TLV size do not match")
obj_data = data[off : off + hdr.length]
obj_descr = attr_map.get(hdr.n_type, None)
if obj_descr is None:
# raise ValueError("unknown child TLV {}".format(hdr.n_type))
cls = UnknownTlv
child_map = {}
else:
cls = obj_descr["ad"].cls
child_map = obj_descr.get("child", {})
# print("FOUND OBJECT type {}".format(cls))
# print()
obj = cls.from_bytes(obj_data, child_map)
ret.append(obj)
off += hdr.length
return ret
class IpFwObjNTlv(Structure):
_fields_ = [
("head", IpFwObjTlv),
("idx", c_ushort),
("n_set", c_uint8),
("n_type", c_uint8),
("spare", c_uint32),
("name", c_char * 64),
]
class NTlv(Tlv):
def __init__(self, obj_type, idx=0, n_set=0, n_type=0, name=None):
super().__init__(obj_type)
self.n_idx = idx
self.n_set = n_set
self.n_type = n_type
self.n_name = name
@classmethod
def _validate(cls, data):
if len(data) != sizeof(IpFwObjNTlv):
raise ValueError("TLV size is not correct")
hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
if len(data) != hdr.length:
raise ValueError("wrong TLV size")
@classmethod
def _parse(cls, data, attr_map):
hdr = IpFwObjNTlv.from_buffer_copy(data[: sizeof(IpFwObjNTlv)])
name = hdr.name.decode("utf-8")
self = cls(hdr.head.n_type, hdr.idx, hdr.n_set, hdr.n_type, name)
return self
def __bytes__(self):
name_bytes = self.n_name.encode("utf-8")
if len(name_bytes) < 64:
name_bytes += b"\0" * (64 - len(name_bytes))
hdr = IpFwObjNTlv(
head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)),
idx=self.n_idx,
n_set=self.n_set,
n_type=self.n_type,
name=name_bytes[:64],
)
return bytes(hdr)
def _print_obj_value(self):
return " " + "type={} set={} idx={} name={}".format(
self.n_type, self.n_set, self.n_idx, self.n_name
)
class IpFwObjCTlv(Structure):
_fields_ = [
("head", IpFwObjTlv),
("count", c_uint32),
("objsize", c_ushort),
("version", c_uint8),
("flags", c_uint8),
]
class CTlv(Tlv):
def __init__(self, obj_type, obj_list=[]):
super().__init__(obj_type)
if obj_list:
self.obj_list.extend(obj_list)
@classmethod
def _validate(cls, data):
if len(data) < sizeof(IpFwObjCTlv):
raise ValueError("TLV too short")
hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])
if len(data) != hdr.head.length:
raise ValueError("wrong TLV size")
@classmethod
def _parse(cls, data, attr_map):
hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])
tlv_list = cls.parse_tlvs(data[sizeof(IpFwObjCTlv) :], attr_map)
if len(tlv_list) != hdr.count:
raise ValueError("wrong number of objects")
self = cls(hdr.head.n_type, obj_list=tlv_list)
return self
def __bytes__(self):
ret = b""
for obj in self.obj_list:
ret += bytes(obj)
length = len(ret) + sizeof(IpFwObjCTlv)
if self.obj_list:
objsize = len(bytes(self.obj_list[0]))
else:
objsize = 0
hdr = IpFwObjCTlv(
head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)),
count=len(self.obj_list),
objsize=objsize,
)
return bytes(hdr) + ret
def _print_obj_value(self):
return ""
class IpFwRule(Structure):
_fields_ = [
("act_ofs", c_ushort),
("cmd_len", c_ushort),
("spare", c_ushort),
("n_set", c_uint8),
("flags", c_uint8),
("rulenum", c_uint32),
("n_id", c_uint32),
]
class RawRule(Tlv):
def __init__(self, obj_type=0, n_set=0, rulenum=0, obj_list=[]):
super().__init__(obj_type)
self.n_set = n_set
self.rulenum = rulenum
if obj_list:
self.obj_list.extend(obj_list)
@classmethod
def _validate(cls, data):
min_size = sizeof(IpFwRule)
if len(data) < min_size:
raise ValueError("rule TLV too short")
rule = IpFwRule.from_buffer_copy(data[:min_size])
if len(data) != min_size + rule.cmd_len * 4:
raise ValueError("rule TLV cmd_len incorrect")
@classmethod
def _parse(cls, data, attr_map):
hdr = IpFwRule.from_buffer_copy(data[: sizeof(IpFwRule)])
self = cls(
n_set=hdr.n_set,
rulenum=hdr.rulenum,
obj_list=BaseInsn.parse_insns(data[sizeof(IpFwRule) :], insn_attrs),
)
return self
def __bytes__(self):
act_ofs = 0
cmd_len = 0
ret = b""
for obj in self.obj_list:
if obj.is_action and act_ofs == 0:
act_ofs = cmd_len
obj_bytes = bytes(obj)
cmd_len += len(obj_bytes) // 4
ret += obj_bytes
hdr = IpFwRule(
act_ofs=act_ofs,
cmd_len=cmd_len,
n_set=self.n_set,
rulenum=self.rulenum,
)
return bytes(hdr) + ret
@property
def obj_name(self):
return "rule#{}".format(self.rulenum)
def _print_obj_value(self):
cmd_len = sum([len(bytes(obj)) for obj in self.obj_list]) // 4
return " set={} cmd_len={}".format(self.n_set, cmd_len)
class CTlvRule(CTlv):
def __init__(self, obj_type=IpFwTlvType.IPFW_TLV_RULE_LIST, obj_list=[]):
super().__init__(obj_type, obj_list)
@classmethod
def _parse(cls, data, attr_map):
chdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])
rule_list = []
off = sizeof(IpFwObjCTlv)
while off + sizeof(IpFwRule) <= len(data):
hdr = IpFwRule.from_buffer_copy(data[off : off + sizeof(IpFwRule)])
rule_len = sizeof(IpFwRule) + hdr.cmd_len * 4
# print("FOUND RULE len={} cmd_len={}".format(rule_len, hdr.cmd_len))
if off + rule_len > len(data):
raise ValueError("wrong rule size")
rule = RawRule.from_bytes(data[off : off + rule_len])
rule_list.append(rule)
off += align8(rule_len)
if off != len(data):
raise ValueError("rule bytes left: off={} len={}".format(off, len(data)))
return cls(chdr.head.n_type, obj_list=rule_list)
# XXX: _validate
def __bytes__(self):
ret = b""
for rule in self.obj_list:
rule_bytes = bytes(rule)
remainder = len(rule_bytes) % 8
if remainder > 0:
rule_bytes += b"\0" * (8 - remainder)
ret += rule_bytes
hdr = IpFwObjCTlv(
head=IpFwObjTlv(
n_type=self.obj_type, length=len(ret) + sizeof(IpFwObjCTlv)
),
count=len(self.obj_list),
)
return bytes(hdr) + ret
class BaseIpFwMessage(object):
messages = []
def __init__(self, msg_type, obj_list=[]):
if isinstance(msg_type, Enum):
self.obj_type = msg_type.value
self._enum = msg_type
else:
self.obj_type = msg_type
self._enum = enum_from_int(self.messages, self.obj_type)
self.obj_list = []
if obj_list:
self.obj_list.extend(obj_list)
def add_obj(self, obj):
self.obj_list.append(obj)
def get_obj(self, obj_type):
obj_type_raw = enum_or_int(obj_type)
for obj in self.obj_list:
if obj.obj_type == obj_type_raw:
return obj
return None
@staticmethod
def parse_header(data: bytes):
if len(data) < sizeof(IpFw3OpHeader):
raise ValueError("length less than op3 message header")
return IpFw3OpHeader.from_buffer_copy(data), sizeof(IpFw3OpHeader)
def parse_obj_list(self, data: bytes):
off = 0
while off < len(data):
# print("PARSE off={} rem={}".format(off, len(data) - off))
hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)])
# print(" tlv len {}".format(hdr.length))
if hdr.length + off > len(data):
raise ValueError("TLV too big")
tlv = Tlv(hdr.n_type, data[off : off + hdr.length])
self.add_obj(tlv)
off += hdr.length
def is_type(self, msg_type):
return enum_or_int(msg_type) == self.msg_type
@property
def obj_name(self):
if self._enum is not None:
return self._enum.name
else:
return "msg#{}".format(self.obj_type)
def print_hdr(self, prepend=""):
print("{}len={}, type={}".format(prepend, len(bytes(self)), self.obj_name))
@classmethod
def from_bytes(cls, data):
try:
hdr, hdrlen = cls.parse_header(data)
self = cls(hdr.opcode)
self._orig_data = data
except ValueError as e:
print("Failed to parse op3 header: {}".format(e))
cls.print_as_bytes(data)
raise
tlv_list = Tlv.parse_tlvs(data[hdrlen:], self.attr_map)
self.obj_list.extend(tlv_list)
return self
def __bytes__(self):
ret = bytes(IpFw3OpHeader(opcode=self.obj_type))
for obj in self.obj_list:
ret += bytes(obj)
return ret
def print_obj(self):
self.print_hdr()
for obj in self.obj_list:
obj.print_obj(" ")
@staticmethod
def print_as_bytes(data: bytes, descr: str):
print("===vv {} (len:{:3d}) vv===".format(descr, len(data)))
off = 0
step = 16
while off < len(data):
for i in range(step):
if off + i < len(data):
print(" {:02X}".format(data[off + i]), end="")
print("")
off += step
print("--------------------")
rule_attrs = prepare_attrs_map(
[
AttrDescr(
IpFwTlvType.IPFW_TLV_TBLNAME_LIST,
CTlv,
[
AttrDescr(IpFwTlvType.IPFW_TLV_TBL_NAME, NTlv),
AttrDescr(IpFwTlvType.IPFW_TLV_STATE_NAME, NTlv),
],
True,
),
AttrDescr(IpFwTlvType.IPFW_TLV_RULE_LIST, CTlvRule),
]
)
class IpFwXRule(BaseIpFwMessage):
messages = [Op3CmdType.IP_FW_XADD]
attr_map = rule_attrs
legacy_classes = []
set3_classes = []
get3_classes = [IpFwXRule]

View file

@ -0,0 +1,90 @@
from enum import Enum
class Op3CmdType(Enum):
IP_FW_TABLE_XADD = 86
IP_FW_TABLE_XDEL = 87
IP_FW_TABLE_XGETSIZE = 88
IP_FW_TABLE_XLIST = 89
IP_FW_TABLE_XDESTROY = 90
IP_FW_TABLES_XLIST = 92
IP_FW_TABLE_XINFO = 93
IP_FW_TABLE_XFLUSH = 94
IP_FW_TABLE_XCREATE = 95
IP_FW_TABLE_XMODIFY = 96
IP_FW_XGET = 97
IP_FW_XADD = 98
IP_FW_XDEL = 99
IP_FW_XMOVE = 100
IP_FW_XZERO = 101
IP_FW_XRESETLOG = 102
IP_FW_SET_SWAP = 103
IP_FW_SET_MOVE = 104
IP_FW_SET_ENABLE = 105
IP_FW_TABLE_XFIND = 106
IP_FW_XIFLIST = 107
IP_FW_TABLES_ALIST = 108
IP_FW_TABLE_XSWAP = 109
IP_FW_TABLE_VLIST = 110
IP_FW_NAT44_XCONFIG = 111
IP_FW_NAT44_DESTROY = 112
IP_FW_NAT44_XGETCONFIG = 113
IP_FW_NAT44_LIST_NAT = 114
IP_FW_NAT44_XGETLOG = 115
IP_FW_DUMP_SOPTCODES = 116
IP_FW_DUMP_SRVOBJECTS = 117
IP_FW_NAT64STL_CREATE = 130
IP_FW_NAT64STL_DESTROY = 131
IP_FW_NAT64STL_CONFIG = 132
IP_FW_NAT64STL_LIST = 133
IP_FW_NAT64STL_STATS = 134
IP_FW_NAT64STL_RESET_STATS = 135
IP_FW_NAT64LSN_CREATE = 140
IP_FW_NAT64LSN_DESTROY = 141
IP_FW_NAT64LSN_CONFIG = 142
IP_FW_NAT64LSN_LIST = 143
IP_FW_NAT64LSN_STATS = 144
IP_FW_NAT64LSN_LIST_STATES = 145
IP_FW_NAT64LSN_RESET_STATS = 146
IP_FW_NPTV6_CREATE = 150
IP_FW_NPTV6_DESTROY = 151
IP_FW_NPTV6_CONFIG = 152
IP_FW_NPTV6_LIST = 153
IP_FW_NPTV6_STATS = 154
IP_FW_NPTV6_RESET_STATS = 155
IP_FW_NAT64CLAT_CREATE = 160
IP_FW_NAT64CLAT_DESTROY = 161
IP_FW_NAT64CLAT_CONFIG = 162
IP_FW_NAT64CLAT_LIST = 163
IP_FW_NAT64CLAT_STATS = 164
IP_FW_NAT64CLAT_RESET_STATS = 165
class IpFwTableLookupType(Enum):
LOOKUP_DST_IP = 0
LOOKUP_SRC_IP = 1
LOOKUP_DST_PORT = 2
LOOKUP_SRC_PORT = 3
LOOKUP_UID = 4
LOOKUP_JAIL = 5
LOOKUP_DSCP = 6
LOOKUP_DST_MAC = 7
LOOKUP_SRC_MAC = 8
LOOKUP_MARK = 9
class IpFwTlvType(Enum):
IPFW_TLV_TBL_NAME = 1
IPFW_TLV_TBLNAME_LIST = 2
IPFW_TLV_RULE_LIST = 3
IPFW_TLV_DYNSTATE_LIST = 4
IPFW_TLV_TBL_ENT = 5
IPFW_TLV_DYN_ENT = 6
IPFW_TLV_RULE_ENT = 7
IPFW_TLV_TBLENT_LIST = 8
IPFW_TLV_RANGE = 9
IPFW_TLV_EACTION = 10
IPFW_TLV_COUNTERS = 11
IPFW_TLV_OBJDATA = 12
IPFW_TLV_STATE_NAME = 14
IPFW_TLV_EACTION_BASE = 1000

View file

@ -0,0 +1,118 @@
#!/usr/bin/env python3
import os
import socket
import struct
import subprocess
import sys
from ctypes import c_byte
from ctypes import c_char
from ctypes import c_int
from ctypes import c_long
from ctypes import c_uint32
from ctypes import c_uint8
from ctypes import c_ulong
from ctypes import c_ushort
from ctypes import sizeof
from ctypes import Structure
from enum import Enum
from typing import Any
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Union
from atf_python.sys.netpfil.ipfw.ioctl import get3_classes
from atf_python.sys.netpfil.ipfw.ioctl import legacy_classes
from atf_python.sys.netpfil.ipfw.ioctl import set3_classes
from atf_python.sys.netpfil.ipfw.utils import AttrDescr
from atf_python.sys.netpfil.ipfw.utils import enum_from_int
from atf_python.sys.netpfil.ipfw.utils import enum_or_int
from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
class DebugHeader(Structure):
_fields_ = [
("cmd_type", c_ushort),
("spare1", c_ushort),
("opt_name", c_uint32),
("total_len", c_uint32),
("spare2", c_uint32),
]
class DebugType(Enum):
DO_CMD = 1
DO_SET3 = 2
DO_GET3 = 3
class DebugIoReader(object):
HANDLER_CLASSES = {
DebugType.DO_CMD: legacy_classes,
DebugType.DO_SET3: set3_classes,
DebugType.DO_GET3: get3_classes,
}
def __init__(self, ipfw_path):
self._msgmap = self.build_msgmap()
self.ipfw_path = ipfw_path
def build_msgmap(self):
xmap = {}
for debug_type, handler_classes in self.HANDLER_CLASSES.items():
debug_type = enum_or_int(debug_type)
if debug_type not in xmap:
xmap[debug_type] = {}
for handler_class in handler_classes:
for msg in handler_class.messages:
xmap[debug_type][enum_or_int(msg)] = handler_class
return xmap
def print_obj_header(self, hdr):
debug_type = "#{}".format(hdr.cmd_type)
for _type in self.HANDLER_CLASSES.keys():
if _type.value == hdr.cmd_type:
debug_type = _type.name.lower()
break
print(
"@@ record for {} len={} optname={}".format(
debug_type, hdr.total_len, hdr.opt_name
)
)
def parse_record(self, data):
hdr = DebugHeader.from_buffer_copy(data[: sizeof(DebugHeader)])
data = data[sizeof(DebugHeader) :]
cls = self._msgmap[hdr.cmd_type].get(hdr.opt_name)
if cls is not None:
return cls.from_bytes(data)
raise ValueError(
"unsupported cmd_type={} opt_name={}".format(hdr.cmd_type, hdr.opt_name)
)
def get_record_from_stdin(self):
data = sys.stdin.buffer.peek(sizeof(DebugHeader))
if len(data) == 0:
return None
hdr = DebugHeader.from_buffer_copy(data)
data = sys.stdin.buffer.read(hdr.total_len)
return self.parse_record(data)
def get_records_from_buffer(self, data):
off = 0
ret = []
while off + sizeof(DebugHeader) <= len(data):
hdr = DebugHeader.from_buffer_copy(data[off : off + sizeof(DebugHeader)])
ret.append(self.parse_record(data[off : off + hdr.total_len]))
off += hdr.total_len
return ret
def run_ipfw(self, cmd: str) -> bytes:
args = [self.ipfw_path, "-xqn"] + cmd.split()
r = subprocess.run(args, capture_output=True)
return r.stdout
def get_records(self, cmd: str):
return self.get_records_from_buffer(self.run_ipfw(cmd))

View file

@ -0,0 +1,61 @@
#!/usr/bin/env python3
import os
import socket
import struct
import subprocess
import sys
from enum import Enum
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from typing import Any
from typing import NamedTuple
import pytest
def roundup2(val: int, num: int) -> int:
if val % num:
return (val | (num - 1)) + 1
else:
return val
def align8(val: int) -> int:
return roundup2(val, 8)
def enum_or_int(val) -> int:
if isinstance(val, Enum):
return val.value
return val
def enum_from_int(enum_class: Enum, val) -> Enum:
if isinstance(val, Enum):
return val
for item in enum_class:
if val == item.value:
return item
return None
class AttrDescr(NamedTuple):
val: Enum
cls: Any
child_map: Any = None
is_array: bool = False
def prepare_attrs_map(attrs: List[AttrDescr]) -> Dict[str, Dict]:
ret = {}
for ad in attrs:
ret[ad.val.value] = {"ad": ad}
if ad.child_map:
ret[ad.val.value]["child"] = prepare_attrs_map(ad.child_map)
ret[ad.val.value]["is_array"] = ad.is_array
return ret