mirror of
https://github.com/systemd/systemd
synced 2024-10-01 13:55:20 +00:00
test-network: add test for DHCPv4 DNR
This will test that networkd/resolved can understand the V4_DNR DHCP option.
This commit is contained in:
parent
ea72b708ed
commit
c42ed79c3a
|
@ -17,8 +17,10 @@
|
|||
|
||||
import argparse
|
||||
import datetime
|
||||
import enum
|
||||
import errno
|
||||
import itertools
|
||||
import ipaddress
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
|
@ -27,6 +29,7 @@ import re
|
|||
import shutil
|
||||
import signal
|
||||
import socket
|
||||
import struct
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
@ -742,6 +745,30 @@ def stop_by_pid_file(pid_file):
|
|||
print(f"Unexpected exception when waiting for {pid} to die: {e.errno}")
|
||||
rm_f(pid_file)
|
||||
|
||||
def dnr_v4_instance_data(adn, addrs=None, prio=1, alpns=("dot",), dohpath=None):
|
||||
b = bytes()
|
||||
pack = lambda c, w=1: struct.pack('>' + "_BH_I"[w], len(c)) + c
|
||||
pyton = lambda n, w=2: struct.pack('>' + "_BH_I"[w], n)
|
||||
ipv4 = ipaddress.IPv4Address
|
||||
class SvcParam(enum.Enum):
|
||||
ALPN = 1
|
||||
DOHPATH = 7
|
||||
|
||||
data = pyton(prio)
|
||||
|
||||
adn = adn.rstrip('.') + '.'
|
||||
data += pack(b.join(pack(label.encode('ascii')) for label in adn.split('.')))
|
||||
|
||||
if not addrs: # adn-only mode
|
||||
return pack(data, 2)
|
||||
|
||||
data += pack(b.join(ipv4(addr).packed for addr in addrs))
|
||||
data += pyton(SvcParam.ALPN.value) + pack(b.join(pack(alpn.encode('ascii')) for alpn in alpns), 2)
|
||||
if dohpath is not None:
|
||||
data += pyton(SvcParam.DOHPATH.value) + pack(dohpath.encode('utf-8'), 2)
|
||||
|
||||
return pack(data, 2)
|
||||
|
||||
def start_dnsmasq(*additional_options, interface='veth-peer', ra_mode=None, ipv4_range='192.168.5.10,192.168.5.200', ipv4_router='192.168.5.1', ipv6_range='2600::10,2600::20'):
|
||||
if ra_mode:
|
||||
ra_mode = f',{ra_mode}'
|
||||
|
@ -6993,6 +7020,48 @@ class NetworkdDHCPClientTests(unittest.TestCase, Utilities):
|
|||
check(self, False, False, True)
|
||||
check(self, False, False, False)
|
||||
|
||||
def test_dhcp_client_use_dnr(self):
|
||||
def check(self, ipv4, ipv6):
|
||||
os.makedirs(os.path.join(network_unit_dir, '25-dhcp-client.network.d'), exist_ok=True)
|
||||
with open(os.path.join(network_unit_dir, '25-dhcp-client.network.d/override.conf'), mode='w', encoding='utf-8') as f:
|
||||
f.write('[DHCPv4]\nUseDNS=')
|
||||
f.write('yes' if ipv4 else 'no')
|
||||
f.write('\n[DHCPv6]\nUseDNS=')
|
||||
f.write('yes' if ipv6 else 'no')
|
||||
f.write('\n[IPv6AcceptRA]\nUseDNS=no')
|
||||
|
||||
networkctl_reload()
|
||||
self.wait_online('veth99:routable')
|
||||
|
||||
# link becomes 'routable' when at least one protocol provide an valid address. Hence, we need to explicitly wait for both addresses.
|
||||
self.wait_address('veth99', r'inet 192.168.5.[0-9]*/24 metric 1024 brd 192.168.5.255 scope global dynamic', ipv='-4')
|
||||
self.wait_address('veth99', r'inet6 2600::[0-9a-f]*/128 scope global (dynamic noprefixroute|noprefixroute dynamic)', ipv='-6')
|
||||
|
||||
# make resolved re-read the link state file
|
||||
resolvectl('revert', 'veth99')
|
||||
|
||||
output = resolvectl('dns', 'veth99')
|
||||
print(output)
|
||||
if ipv4:
|
||||
self.assertIn('8.8.8.8#dns.google', output)
|
||||
self.assertIn('0.7.4.2#homer.simpson', output)
|
||||
else:
|
||||
self.assertNotIn('8.8.8.8#dns.google', output)
|
||||
|
||||
check_json(networkctl_json())
|
||||
|
||||
copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client.network', copy_dropins=False)
|
||||
|
||||
start_networkd()
|
||||
self.wait_online('veth-peer:carrier')
|
||||
dnr_v4 = dnr_v4_instance_data(adn = "dns.google", addrs = ["8.8.8.8", "8.8.4.4"])
|
||||
dnr_v4 += dnr_v4_instance_data(adn = "homer.simpson", addrs = ["0.7.4.2"], alpns = ("dot","h2","h3"), dohpath = "/springfield{?dns}")
|
||||
masq = lambda bs: ':'.join(f"{b:02x}" for b in bs)
|
||||
start_dnsmasq(f'--dhcp-option=162,{masq(dnr_v4)}')
|
||||
|
||||
check(self, True, False)
|
||||
check(self, False, False)
|
||||
|
||||
def test_dhcp_client_use_captive_portal(self):
|
||||
def check(self, ipv4, ipv6):
|
||||
os.makedirs(os.path.join(network_unit_dir, '25-dhcp-client.network.d'), exist_ok=True)
|
||||
|
|
Loading…
Reference in a new issue