mirror of
https://github.com/freebsd/freebsd-src
synced 2024-10-04 15:40:44 +00:00
pf: test rules evaluation in the face of multiple IPv6 fragment headers
Send an ICMPv6 echo request packet with multiple IPv6 fragment headers. Set rules to pass all packets, except for ICMPv6 echo requests. pf ought to drop the echo request, but doesn't because it reassembles the packet, and then doesn't handle the second fragment header. In other words: it fails to detect the ICMPv6 echo header. Reported by: Enrico Bassetti bassetti@di.uniroma1.it (NetSecurityLab @ Sapienza University of Rome) MFC after: instant Sponsored by: Rubicon Communications, LLC ("Netgate")
This commit is contained in:
parent
76afcbb524
commit
b23dbabb7f
|
@ -1,6 +1,7 @@
|
|||
#!/usr/local/bin/python3
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
|
||||
class ToolsHelper(object):
|
||||
|
@ -13,6 +14,26 @@ def get_output(cls, cmd: str, verbose=False) -> str:
|
|||
print("run: '{}'".format(cmd))
|
||||
return os.popen(cmd).read()
|
||||
|
||||
@classmethod
|
||||
def pf_rules(cls, rules, verbose=True):
|
||||
pf_conf = ""
|
||||
for r in rules:
|
||||
pf_conf = pf_conf + r + "\n"
|
||||
|
||||
if verbose:
|
||||
print("Set rules:")
|
||||
print(pf_conf)
|
||||
|
||||
ps = subprocess.Popen("/sbin/pfctl -g -f -", shell=True,
|
||||
stdin=subprocess.PIPE)
|
||||
ps.communicate(bytes(pf_conf, 'utf-8'))
|
||||
ret = ps.wait()
|
||||
if ret != 0:
|
||||
raise Exception("Failed to set pf rules %d" % ret)
|
||||
|
||||
if verbose:
|
||||
cls.print_output("/sbin/pfctl -sr")
|
||||
|
||||
@classmethod
|
||||
def print_output(cls, cmd: str, verbose=True):
|
||||
if verbose:
|
||||
|
|
|
@ -40,6 +40,8 @@ ATF_TESTS_SH+= altq \
|
|||
table \
|
||||
tos
|
||||
|
||||
ATF_TESTS_PYTEST+= frag6.py
|
||||
|
||||
# Tests reuse jail names and so cannot run in parallel.
|
||||
TEST_METADATA+= is_exclusive=true
|
||||
|
||||
|
|
60
tests/sys/netpfil/pf/frag6.py
Normal file
60
tests/sys/netpfil/pf/frag6.py
Normal file
|
@ -0,0 +1,60 @@
|
|||
import pytest
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
logging.getLogger("scapy").setLevel(logging.CRITICAL)
|
||||
from atf_python.sys.net.tools import ToolsHelper
|
||||
from atf_python.sys.net.vnet import VnetTestTemplate
|
||||
|
||||
class DelayedSend(threading.Thread):
|
||||
def __init__(self, packet):
|
||||
threading.Thread.__init__(self)
|
||||
self._packet = packet
|
||||
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
import scapy.all as sp
|
||||
time.sleep(1)
|
||||
sp.send(self._packet)
|
||||
|
||||
class TestFrag6(VnetTestTemplate):
|
||||
REQUIRED_MODULES = ["pf"]
|
||||
TOPOLOGY = {
|
||||
"vnet1": {"ifaces": ["if1"]},
|
||||
"vnet2": {"ifaces": ["if1"]},
|
||||
"if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
|
||||
}
|
||||
|
||||
def vnet2_handler(self, vnet):
|
||||
ToolsHelper.print_output("/sbin/pfctl -e")
|
||||
ToolsHelper.pf_rules([
|
||||
"scrub fragment reassemble",
|
||||
"pass",
|
||||
"block in inet6 proto icmp6 icmp6-type echoreq",
|
||||
])
|
||||
|
||||
def check_ping_reply(self, packet):
|
||||
print(packet)
|
||||
return False
|
||||
|
||||
@pytest.mark.require_user("root")
|
||||
def test_dup_frag_hdr(self):
|
||||
"Test packets with duplicate fragment headers"
|
||||
srv_vnet = self.vnet_map["vnet2"]
|
||||
|
||||
# Import in the correct vnet, so at to not confuse Scapy
|
||||
import scapy.all as sp
|
||||
|
||||
packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
|
||||
/ sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
|
||||
/ sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
|
||||
/ sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128))
|
||||
|
||||
# Delay the send so the sniffer is running when we transmit.
|
||||
s = DelayedSend(packet)
|
||||
|
||||
packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
|
||||
timeout=3)
|
||||
for p in packets:
|
||||
assert not p.getlayer(sp.ICMPv6EchoReply)
|
Loading…
Reference in a new issue