netpfil tests: improve sniffer.py

Multiple improvements to sniffer.py:

* Remove ambiguity of configuring recvif, it must be now explicitly specified.
* Don't catch exceptions around creating the sniffer, let it properly
  fail and display the whole stack trace.
* Count correct packets so that duplicates can be found.

MFC after:	1 week
Sponsored by:	InnoGames GmbH
Differential Revision:	https://reviews.freebsd.org/D38120
This commit is contained in:
Kajetan Staszkiewicz 2023-01-20 02:40:34 +01:00 committed by Kristof Provost
parent 9cb6ba29cb
commit a39dedeb31
6 changed files with 20 additions and 26 deletions

View File

@ -61,11 +61,11 @@ def main():
args = parser.parse_args()
sniffer = Sniffer(args, check_pcp, recvif=args.recvif[0], timeout=20)
sniffer = Sniffer(args, check_pcp, args.recvif[0], timeout=20)
sniffer.join()
if sniffer.foundCorrectPacket:
if sniffer.correctPackets:
sys.exit(0)
sys.exit(1)

View File

@ -100,14 +100,14 @@ def main():
args = parser.parse_args()
sniffer = Sniffer(args, check_stp)
sniffer = Sniffer(args, check_stp, args.recvif[0])
invalid_stp(args.sendif[0])
sniffer.join()
# The 'correct' packet is a corrupt STP packet, so it shouldn't turn up.
if sniffer.foundCorrectPacket:
if sniffer.correctPackets:
sys.exit(1)
if __name__ == '__main__':

View File

@ -96,14 +96,14 @@ def main():
args = parser.parse_args()
sniffer = None
if not args.recvif is None:
sniffer = Sniffer(args, check_icmp_too_big)
sniffer = Sniffer(args, check_icmp_too_big, args.recvif[0])
ping(args.sendif[0], args.to[0], args)
if sniffer:
sniffer.join()
if sniffer.foundCorrectPacket:
if sniffer.correctPackets:
sys.exit(0)
else:
sys.exit(1)

View File

@ -317,16 +317,16 @@ def main():
if args.tcpsyn:
checkfn=check_tcpsyn
sniffer = Sniffer(args, checkfn)
sniffer = Sniffer(args, checkfn, args.recvif[0])
replysniffer = None
if not args.replyif is None:
checkfn=check_ping_reply
replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0])
replysniffer = Sniffer(args, checkfn, args.replyif[0])
dupsniffer = None
if args.checkdup is not None:
dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0])
dupsniffer = Sniffer(args, check_dup, args.checkdup[0])
if args.tcpsyn:
tcpsyn(args.sendif[0], args.to[0], args)
@ -344,7 +344,7 @@ def main():
if sniffer:
sniffer.join()
if sniffer.foundCorrectPacket:
if sniffer.correctPackets:
sys.exit(0)
else:
sys.exit(1)
@ -352,7 +352,7 @@ def main():
if replysniffer:
replysniffer.join()
if replysniffer.foundCorrectPacket:
if replysniffer.correctPackets:
sys.exit(0)
else:
sys.exit(1)

View File

@ -31,18 +31,15 @@
import sys
class Sniffer(threading.Thread):
def __init__(self, args, check_function, recvif=None, timeout=3):
def __init__(self, args, check_function, recvif, timeout=3):
threading.Thread.__init__(self)
self._sem = threading.Semaphore(0)
self._args = args
self._timeout = timeout
if recvif is not None:
self._recvif = recvif
else:
self._recvif = args.recvif[0]
self._recvif = recvif
self._check_function = check_function
self.foundCorrectPacket = False
self.correctPackets = 0
self.start()
if not self._sem.acquire(timeout=30):
@ -51,7 +48,7 @@ def __init__(self, args, check_function, recvif=None, timeout=3):
def _checkPacket(self, packet):
ret = self._check_function(self._args, packet)
if ret:
self.foundCorrectPacket = True
self.correctPackets += 1
return ret
def _startedCb(self):
@ -59,9 +56,6 @@ def _startedCb(self):
def run(self):
self.packets = []
try:
self.packets = sp.sniff(iface=self._recvif,
stop_filter=self._checkPacket, timeout=self._timeout,
started_callback=self._startedCb)
except Exception as e:
print(e, file=sys.stderr)
self.packets = sp.sniff(iface=self._recvif,
stop_filter=self._checkPacket, timeout=self._timeout,
started_callback=self._startedCb)

View File

@ -72,7 +72,7 @@ def main():
sp.sendp(udp, iface=args.sendif[0], verbose=False)
# Start sniffing on recvif
sniffer = Sniffer(args, check_icmp_error)
sniffer = Sniffer(args, check_icmp_error, args.recvif[0])
# Send the bad error packet
icmp_reachable = sp.Ether() / \
@ -83,7 +83,7 @@ def main():
sp.sendp(icmp_reachable, iface=args.sendif[0], verbose=False)
sniffer.join()
if sniffer.foundCorrectPacket:
if sniffer.correctPackets:
sys.exit(1)
sys.exit(0)