cpython/Lib/test/test_audit.py
2022-10-07 20:17:08 +03:00

228 lines
7.2 KiB
Python

"""Tests for sys.audit and sys.addaudithook
"""
import subprocess
import sys
import unittest
from test import support
from test.support import import_helper
from test.support import os_helper
if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
raise unittest.SkipTest("test only relevant when sys.audit is available")
AUDIT_TESTS_PY = support.findfile("audit-tests.py")
class AuditTest(unittest.TestCase):
maxDiff = None
@support.requires_subprocess()
def do_test(self, *args):
with subprocess.Popen(
[sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as p:
p.wait()
sys.stdout.writelines(p.stdout)
sys.stderr.writelines(p.stderr)
if p.returncode:
self.fail("".join(p.stderr))
@support.requires_subprocess()
def run_python(self, *args):
events = []
with subprocess.Popen(
[sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as p:
p.wait()
sys.stderr.writelines(p.stderr)
return (
p.returncode,
[line.strip().partition(" ") for line in p.stdout],
"".join(p.stderr),
)
def test_basic(self):
self.do_test("test_basic")
def test_block_add_hook(self):
self.do_test("test_block_add_hook")
def test_block_add_hook_baseexception(self):
self.do_test("test_block_add_hook_baseexception")
def test_marshal(self):
import_helper.import_module("marshal")
self.do_test("test_marshal")
def test_pickle(self):
import_helper.import_module("pickle")
self.do_test("test_pickle")
def test_monkeypatch(self):
self.do_test("test_monkeypatch")
def test_open(self):
self.do_test("test_open", os_helper.TESTFN)
def test_cantrace(self):
self.do_test("test_cantrace")
def test_mmap(self):
self.do_test("test_mmap")
def test_excepthook(self):
returncode, events, stderr = self.run_python("test_excepthook")
if not returncode:
self.fail(f"Expected fatal exception\n{stderr}")
self.assertSequenceEqual(
[("sys.excepthook", " ", "RuntimeError('fatal-error')")], events
)
def test_unraisablehook(self):
returncode, events, stderr = self.run_python("test_unraisablehook")
if returncode:
self.fail(stderr)
self.assertEqual(events[0][0], "sys.unraisablehook")
self.assertEqual(
events[0][2],
"RuntimeError('nonfatal-error') Exception ignored for audit hook test",
)
def test_winreg(self):
import_helper.import_module("winreg")
returncode, events, stderr = self.run_python("test_winreg")
if returncode:
self.fail(stderr)
self.assertEqual(events[0][0], "winreg.OpenKey")
self.assertEqual(events[1][0], "winreg.OpenKey/result")
expected = events[1][2]
self.assertTrue(expected)
self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2])
self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3])
self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4])
def test_socket(self):
import_helper.import_module("socket")
returncode, events, stderr = self.run_python("test_socket")
if returncode:
self.fail(stderr)
if support.verbose:
print(*events, sep='\n')
self.assertEqual(events[0][0], "socket.gethostname")
self.assertEqual(events[1][0], "socket.__new__")
self.assertEqual(events[2][0], "socket.bind")
self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)"))
def test_gc(self):
returncode, events, stderr = self.run_python("test_gc")
if returncode:
self.fail(stderr)
if support.verbose:
print(*events, sep='\n')
self.assertEqual(
[event[0] for event in events],
["gc.get_objects", "gc.get_referrers", "gc.get_referents"]
)
def test_http(self):
import_helper.import_module("http.client")
returncode, events, stderr = self.run_python("test_http_client")
if returncode:
self.fail(stderr)
if support.verbose:
print(*events, sep='\n')
self.assertEqual(events[0][0], "http.client.connect")
self.assertEqual(events[0][2], "www.python.org 80")
self.assertEqual(events[1][0], "http.client.send")
if events[1][2] != '[cannot send]':
self.assertIn('HTTP', events[1][2])
def test_sqlite3(self):
sqlite3 = import_helper.import_module("sqlite3")
returncode, events, stderr = self.run_python("test_sqlite3")
if returncode:
self.fail(stderr)
if support.verbose:
print(*events, sep='\n')
actual = [ev[0] for ev in events]
expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2
if hasattr(sqlite3.Connection, "enable_load_extension"):
expected += [
"sqlite3.enable_load_extension",
"sqlite3.load_extension",
]
self.assertEqual(actual, expected)
def test_sys_getframe(self):
returncode, events, stderr = self.run_python("test_sys_getframe")
if returncode:
self.fail(stderr)
if support.verbose:
print(*events, sep='\n')
actual = [(ev[0], ev[2]) for ev in events]
expected = [("sys._getframe", "test_sys_getframe")]
self.assertEqual(actual, expected)
def test_wmi_exec_query(self):
import_helper.import_module("_wmi")
returncode, events, stderr = self.run_python("test_wmi_exec_query")
if returncode:
self.fail(stderr)
if support.verbose:
print(*events, sep='\n')
actual = [(ev[0], ev[2]) for ev in events]
expected = [("_wmi.exec_query", "SELECT * FROM Win32_OperatingSystem")]
self.assertEqual(actual, expected)
def test_syslog(self):
syslog = import_helper.import_module("syslog")
returncode, events, stderr = self.run_python("test_syslog")
if returncode:
self.fail(stderr)
if support.verbose:
print('Events:', *events, sep='\n ')
self.assertSequenceEqual(
events,
[('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'),
('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'),
('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'),
('syslog.closelog', '', ''),
('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'),
('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'),
('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'),
('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'),
('syslog.closelog', '', '')]
)
if __name__ == "__main__":
unittest.main()