gh-108294: Add error handling for time.sleep audit event (GH-108363)

I've also cleaned the tests up a bit to make this easier to test.
This commit is contained in:
Petr Viktorin 2023-09-05 10:25:08 +02:00 committed by GitHub
parent 24e989211a
commit 230649f538
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 32 deletions

View file

@ -186,7 +186,7 @@ class C(A):
)
def test_open():
def test_open(testfn):
# SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open()
try:
import ssl
@ -199,11 +199,11 @@ def test_open():
# All of them should fail
with TestHook(raise_on_events={"open"}) as hook:
for fn, *args in [
(open, sys.argv[2], "r"),
(open, testfn, "r"),
(open, sys.executable, "rb"),
(open, 3, "wb"),
(open, sys.argv[2], "w", -1, None, None, None, False, lambda *a: 1),
(load_dh_params, sys.argv[2]),
(open, testfn, "w", -1, None, None, None, False, lambda *a: 1),
(load_dh_params, testfn),
]:
if not fn:
continue
@ -216,11 +216,11 @@ def test_open():
[
i
for i in [
(sys.argv[2], "r"),
(testfn, "r"),
(sys.executable, "r"),
(3, "w"),
(sys.argv[2], "w"),
(sys.argv[2], "rb") if load_dh_params else None,
(testfn, "w"),
(testfn, "rb") if load_dh_params else None,
]
if i is not None
],
@ -517,12 +517,15 @@ def test_not_in_gc():
assert hook not in o
def test_time():
def test_time(mode):
import time
def hook(event, args):
if event.startswith("time."):
print(event, *args)
if mode == 'print':
print(event, *args)
elif mode == 'fail':
raise AssertionError('hook failed')
sys.addaudithook(hook)
time.sleep(0)
@ -549,4 +552,4 @@ def hook(event, args):
suppress_msvcrt_asserts()
test = sys.argv[1]
globals()[test]()
globals()[test](*sys.argv[2:])

View file

@ -19,7 +19,7 @@ class AuditTest(unittest.TestCase):
maxDiff = None
@support.requires_subprocess()
def do_test(self, *args):
def run_test_in_subprocess(self, *args):
with subprocess.Popen(
[sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
encoding="utf-8",
@ -27,27 +27,26 @@ def do_test(self, *args):
stderr=subprocess.PIPE,
) as p:
p.wait()
sys.stdout.writelines(p.stdout)
sys.stderr.writelines(p.stderr)
if p.returncode:
self.fail("".join(p.stderr))
return p, p.stdout.read(), p.stderr.read()
@support.requires_subprocess()
def run_python(self, *args):
def do_test(self, *args):
proc, stdout, stderr = self.run_test_in_subprocess(*args)
sys.stdout.write(stdout)
sys.stderr.write(stderr)
if proc.returncode:
self.fail(stderr)
def run_python(self, *args, expect_stderr=False):
events = []
with subprocess.Popen(
[sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as p:
p.wait()
sys.stderr.writelines(p.stderr)
return (
p.returncode,
[line.strip().partition(" ") for line in p.stdout],
"".join(p.stderr),
)
proc, stdout, stderr = self.run_test_in_subprocess(*args)
if not expect_stderr or support.verbose:
sys.stderr.write(stderr)
return (
proc.returncode,
[line.strip().partition(" ") for line in stdout.splitlines()],
stderr,
)
def test_basic(self):
self.do_test("test_basic")
@ -257,7 +256,7 @@ def test_not_in_gc(self):
self.fail(stderr)
def test_time(self):
returncode, events, stderr = self.run_python("test_time")
returncode, events, stderr = self.run_python("test_time", "print")
if returncode:
self.fail(stderr)
@ -271,6 +270,11 @@ def test_time(self):
self.assertEqual(actual, expected)
def test_time_fail(self):
returncode, events, stderr = self.run_python("test_time", "fail",
expect_stderr=True)
self.assertNotEqual(returncode, 0)
self.assertIn('hook failed', stderr.splitlines()[-1])
def test_sys_monitoring_register_callback(self):
returncode, events, stderr = self.run_python("test_sys_monitoring_register_callback")

View file

@ -413,7 +413,9 @@ Return the clk_id of a thread's CPU time clock.");
static PyObject *
time_sleep(PyObject *self, PyObject *timeout_obj)
{
PySys_Audit("time.sleep", "O", timeout_obj);
if (PySys_Audit("time.sleep", "O", timeout_obj) < 0) {
return NULL;
}
_PyTime_t timeout;
if (_PyTime_FromSecondsObject(&timeout, timeout_obj, _PyTime_ROUND_TIMEOUT))