diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d6fe7d62675..bddcdadfeee 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -21,6 +21,7 @@ import weakref import test.support import test.support.script_helper +from test import support # Skip tests if _multiprocessing wasn't built. @@ -72,6 +73,12 @@ def close_queue(queue): queue.join_thread() +def join_process(process, timeout): + # Since multiprocessing.Process has the same API than threading.Thread + # (join() and is_alive(), the support function can be reused + support.join_thread(process, timeout) + + # # Constants # @@ -477,7 +484,7 @@ def test_many_processes(self): for p in procs: p.start() for p in procs: - p.join(timeout=10) + join_process(p, timeout=10) for p in procs: self.assertEqual(p.exitcode, 0) @@ -489,7 +496,7 @@ def test_many_processes(self): for p in procs: p.terminate() for p in procs: - p.join(timeout=10) + join_process(p, timeout=10) if os.name != 'nt': for p in procs: self.assertEqual(p.exitcode, -signal.SIGTERM) @@ -652,7 +659,7 @@ def test_sys_exit(self): p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) p.daemon = True p.start() - p.join(5) + join_process(p, timeout=5) self.assertEqual(p.exitcode, 1) with open(testfn, 'r') as f: @@ -665,7 +672,7 @@ def test_sys_exit(self): p = self.Process(target=sys.exit, args=(reason,)) p.daemon = True p.start() - p.join(5) + join_process(p, timeout=5) self.assertEqual(p.exitcode, reason) # @@ -1254,8 +1261,7 @@ def test_waitfor(self): state.value += 1 cond.notify() - p.join(5) - self.assertFalse(p.is_alive()) + join_process(p, timeout=5) self.assertEqual(p.exitcode, 0) @classmethod @@ -1291,7 +1297,7 @@ def test_waitfor_timeout(self): state.value += 1 cond.notify() - p.join(5) + join_process(p, timeout=5) self.assertTrue(success.value) @classmethod @@ -4005,7 +4011,7 @@ def test_timeout(self): self.assertEqual(conn.recv(), 456) conn.close() l.close() - p.join(10) + join_process(p, timeout=10) finally: socket.setdefaulttimeout(old_timeout) @@ -4041,7 +4047,7 @@ def child(cls, n, conn): p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) p.start() conn.close() - p.join(timeout=5) + join_process(p, timeout=5) else: conn.send(len(util._afterfork_registry)) conn.close() @@ -4054,7 +4060,7 @@ def test_lock(self): p.start() w.close() new_size = r.recv() - p.join(timeout=5) + join_process(p, timeout=5) self.assertLessEqual(new_size, old_size) # @@ -4109,7 +4115,7 @@ def test_closefd(self): p.start() writer.close() e = reader.recv() - p.join(timeout=5) + join_process(p, timeout=5) finally: self.close(fd) writer.close() diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 63f7a910710..b2e45605b9e 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -2107,6 +2107,16 @@ def wait_threads_exit(timeout=60.0): gc_collect() +def join_thread(thread, timeout=30.0): + """Join a thread. Raise an AssertionError if the thread is still alive + after timeout seconds. + """ + thread.join(timeout) + if thread.is_alive(): + msg = f"failed to join the thread in {timeout:.1f} seconds" + raise AssertionError(msg) + + def reap_children(): """Use this function at the end of test_main() whenever sub-processes are started. This will help ensure that no extra children (zombies) diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py index 2362834b85f..1d147c74196 100644 --- a/Lib/test/test_asynchat.py +++ b/Lib/test/test_asynchat.py @@ -123,9 +123,7 @@ def line_terminator_check(self, term, server_chunk): c.push(b"I'm not dead yet!" + term) c.push(SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) - s.join(timeout=TIMEOUT) - if s.is_alive(): - self.fail("join() timed out") + support.join_thread(s, timeout=TIMEOUT) self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) @@ -156,9 +154,7 @@ def numeric_terminator_check(self, termlen): c.push(data) c.push(SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) - s.join(timeout=TIMEOUT) - if s.is_alive(): - self.fail("join() timed out") + support.join_thread(s, timeout=TIMEOUT) self.assertEqual(c.contents, [data[:termlen]]) @@ -178,9 +174,7 @@ def test_none_terminator(self): c.push(data) c.push(SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) - s.join(timeout=TIMEOUT) - if s.is_alive(): - self.fail("join() timed out") + support.join_thread(s, timeout=TIMEOUT) self.assertEqual(c.contents, []) self.assertEqual(c.buffer, data) @@ -192,9 +186,7 @@ def test_simple_producer(self): p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) c.push_with_producer(p) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) - s.join(timeout=TIMEOUT) - if s.is_alive(): - self.fail("join() timed out") + support.join_thread(s, timeout=TIMEOUT) self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) @@ -204,9 +196,7 @@ def test_string_producer(self): data = b"hello world\nI'm not dead yet!\n" c.push_with_producer(data+SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) - s.join(timeout=TIMEOUT) - if s.is_alive(): - self.fail("join() timed out") + support.join_thread(s, timeout=TIMEOUT) self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) @@ -217,9 +207,7 @@ def test_empty_line(self): c.push(b"hello world\n\nI'm not dead yet!\n") c.push(SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) - s.join(timeout=TIMEOUT) - if s.is_alive(): - self.fail("join() timed out") + support.join_thread(s, timeout=TIMEOUT) self.assertEqual(c.contents, [b"hello world", b"", b"I'm not dead yet!"]) @@ -238,9 +226,7 @@ def test_close_when_done(self): # where the server echoes all of its data before we can check that it # got any down below. s.start_resend_event.set() - s.join(timeout=TIMEOUT) - if s.is_alive(): - self.fail("join() timed out") + support.join_thread(s, timeout=TIMEOUT) self.assertEqual(c.contents, []) # the server might have been able to send a byte or two back, but this @@ -261,7 +247,7 @@ def test_push(self): self.assertRaises(TypeError, c.push, 'unicode') c.push(SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) - s.join(timeout=TIMEOUT) + support.join_thread(s, timeout=TIMEOUT) self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes']) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 27781a2d91b..33421ce4c37 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -808,7 +808,7 @@ def client(): proto.transport.close() lsock.close() - thread.join(1) + support.join_thread(thread, timeout=1) self.assertFalse(thread.is_alive()) self.assertEqual(proto.state, 'CLOSED') self.assertEqual(proto.nbytes, len(message)) diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index c8e97276ff6..ee0c3b371f8 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -360,9 +360,7 @@ def test_send(self): self.assertEqual(cap.getvalue(), data*2) finally: - t.join(timeout=TIMEOUT) - if t.is_alive(): - self.fail("join() timed out") + support.join_thread(t, timeout=TIMEOUT) @unittest.skipUnless(hasattr(asyncore, 'file_wrapper'), @@ -794,9 +792,7 @@ def test_quick_connect(self): except OSError: pass finally: - t.join(timeout=TIMEOUT) - if t.is_alive(): - self.fail("join() timed out") + support.join_thread(t, timeout=TIMEOUT) class TestAPI_UseIPv4Sockets(BaseTestAPI): family = socket.AF_INET diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index 132c58624fe..2b62b05a594 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -220,7 +220,9 @@ def _cleanup(self): # cleanup the server self.server.shutdown() self.server.server_close() - self.thread.join(3.0) + support.join_thread(self.thread, 3.0) + # Explicitly clear the attribute to prevent dangling thread + self.thread = None def test_EOF_without_complete_welcome_message(self): # http://bugs.python.org/issue5949 diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 611044d8fa8..d264d786720 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -791,13 +791,10 @@ def stop(self, timeout=None): to terminate. """ self.close() - self._thread.join(timeout) + support.join_thread(self._thread, timeout) + self._thread = None asyncore.close_all(map=self._map, ignore_all=True) - alive = self._thread.is_alive() - self._thread = None - if alive: - self.fail("join() timed out") class ControlMixin(object): """ @@ -847,11 +844,8 @@ def stop(self, timeout=None): """ self.shutdown() if self._thread is not None: - self._thread.join(timeout) - alive = self._thread.is_alive() + support.join_thread(self._thread, timeout) self._thread = None - if alive: - self.fail("join() timed out") self.server_close() self.ready.clear() @@ -2892,9 +2886,7 @@ def setup_via_listener(self, text, verify=None): finally: t.ready.wait(2.0) logging.config.stopListening() - t.join(2.0) - if t.is_alive(): - self.fail("join() timed out") + support.join_thread(t, 2.0) def test_listen_config_10_ok(self): with support.captured_stdout() as output: diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index e501669eb6e..35466c1eae3 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -58,10 +58,7 @@ def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): block_func) return self.result finally: - thread.join(10) # make sure the thread terminates - if thread.is_alive(): - self.fail("trigger function '%r' appeared to not return" % - trigger_func) + support.join_thread(thread, 10) # make sure the thread terminates # Call this instead if block_func is supposed to raise an exception. def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, @@ -77,10 +74,7 @@ def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, self.fail("expected exception of kind %r" % expected_exception_class) finally: - thread.join(10) # make sure the thread terminates - if thread.is_alive(): - self.fail("trigger function '%r' appeared to not return" % - trigger_func) + support.join_thread(thread, 10) # make sure the thread terminates if not thread.startedEvent.is_set(): self.fail("trigger thread ended but event never set") diff --git a/Lib/test/test_sched.py b/Lib/test/test_sched.py index 794c6374c45..3f84af2a4c2 100644 --- a/Lib/test/test_sched.py +++ b/Lib/test/test_sched.py @@ -3,6 +3,7 @@ import threading import time import unittest +from test import support TIMEOUT = 10 @@ -81,8 +82,7 @@ def test_enter_concurrent(self): self.assertEqual(q.get(timeout=TIMEOUT), 5) self.assertTrue(q.empty()) timer.advance(1000) - t.join(timeout=TIMEOUT) - self.assertFalse(t.is_alive()) + support.join_thread(t, timeout=TIMEOUT) self.assertTrue(q.empty()) self.assertEqual(timer.time(), 5) @@ -137,8 +137,7 @@ def test_cancel_concurrent(self): self.assertEqual(q.get(timeout=TIMEOUT), 4) self.assertTrue(q.empty()) timer.advance(1000) - t.join(timeout=TIMEOUT) - self.assertFalse(t.is_alive()) + support.join_thread(t, timeout=TIMEOUT) self.assertTrue(q.empty()) self.assertEqual(timer.time(), 4)