test_thread uses support.sleeping_retry() (#93849)

test_thread.test_count() now fails if it takes longer than
LONG_TIMEOUT seconds.
This commit is contained in:
Victor Stinner 2022-06-15 13:52:13 +02:00 committed by GitHub
parent d31834688b
commit bddbd80cff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -13,7 +13,6 @@
NUMTASKS = 10
NUMTRIPS = 3
POLL_SLEEP = 0.010 # seconds = 10 ms
_print_mutex = thread.allocate_lock()
@ -121,19 +120,24 @@ def task():
with threading_helper.wait_threads_exit():
thread.start_new_thread(task, ())
while not started:
time.sleep(POLL_SLEEP)
for _ in support.sleeping_retry(support.LONG_TIMEOUT):
if started:
break
self.assertEqual(thread._count(), orig + 1)
# Allow the task to finish.
mut.release()
# The only reliable way to be sure that the thread ended from the
# interpreter's point of view is to wait for the function object to be
# destroyed.
# interpreter's point of view is to wait for the function object to
# be destroyed.
done = []
wr = weakref.ref(task, lambda _: done.append(None))
del task
while not done:
time.sleep(POLL_SLEEP)
for _ in support.sleeping_retry(support.LONG_TIMEOUT):
if done:
break
support.gc_collect() # For PyPy or other GCs.
self.assertEqual(thread._count(), orig)