diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index d5f259c246b..3e31bf8402e 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -29,6 +29,7 @@ class SemaphoreTracker(object): def __init__(self): self._lock = threading.Lock() self._fd = None + self._pid = None def getfd(self): self.ensure_running() @@ -40,8 +41,20 @@ def ensure_running(self): This can be run from any process. Usually a child process will use the semaphore created by its parent.''' with self._lock: - if self._fd is not None: - return + if self._pid is not None: + # semaphore tracker was launched before, is it still running? + pid, status = os.waitpid(self._pid, os.WNOHANG) + if not pid: + # => still alive + return + # => dead, launch it again + os.close(self._fd) + self._fd = None + self._pid = None + + warnings.warn('semaphore_tracker: process died unexpectedly, ' + 'relaunching. Some semaphores might leak.') + fds_to_pass = [] try: fds_to_pass.append(sys.stderr.fileno()) @@ -55,12 +68,13 @@ def ensure_running(self): exe = spawn.get_executable() args = [exe] + util._args_from_interpreter_flags() args += ['-c', cmd % r] - util.spawnv_passfds(exe, args, fds_to_pass) + pid = util.spawnv_passfds(exe, args, fds_to_pass) except: os.close(w) raise else: self._fd = w + self._pid = pid finally: os.close(r) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 799146d8a3f..d4e8a8a7e1a 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4,6 +4,7 @@ import unittest import queue as pyqueue +import contextlib import time import io import itertools @@ -4344,14 +4345,14 @@ def test_preload_resources(self): self.fail("failed spawning forkserver or grandchild") -# -# Check that killing process does not leak named semaphores -# - @unittest.skipIf(sys.platform == "win32", "test semantics don't make sense on Windows") class TestSemaphoreTracker(unittest.TestCase): + def test_semaphore_tracker(self): + # + # Check that killing process does not leak named semaphores + # import subprocess cmd = '''if 1: import multiprocessing as mp, time, os @@ -4385,6 +4386,40 @@ def test_semaphore_tracker(self): self.assertRegex(err, expected) self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) + def check_semaphore_tracker_death(self, signum, should_die): + # bpo-31310: if the semaphore tracker process has died, it should + # be restarted implicitly. + from multiprocessing.semaphore_tracker import _semaphore_tracker + _semaphore_tracker.ensure_running() + pid = _semaphore_tracker._pid + os.kill(pid, signum) + time.sleep(1.0) # give it time to die + + ctx = multiprocessing.get_context("spawn") + with contextlib.ExitStack() as stack: + if should_die: + stack.enter_context(self.assertWarnsRegex( + UserWarning, + "semaphore_tracker: process died")) + sem = ctx.Semaphore() + sem.acquire() + sem.release() + wr = weakref.ref(sem) + # ensure `sem` gets collected, which triggers communication with + # the semaphore tracker + del sem + gc.collect() + self.assertIsNone(wr()) + + def test_semaphore_tracker_sigint(self): + # Catchable signal (ignored by semaphore tracker) + self.check_semaphore_tracker_death(signal.SIGINT, False) + + def test_semaphore_tracker_sigkill(self): + # Uncatchable signal. + self.check_semaphore_tracker_death(signal.SIGKILL, True) + + class TestSimpleQueue(unittest.TestCase): @classmethod diff --git a/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst b/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst new file mode 100644 index 00000000000..4d340f07364 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst @@ -0,0 +1 @@ +multiprocessing's semaphore tracker should be launched again if crashed.