GH-120804: Remove PidfdChildWatcher, ThreadedChildWatcher and AbstractChildWatcher from asyncio APIs (#120893)

This commit is contained in:
Kumar Aditya 2024-06-23 18:38:50 +05:30 committed by GitHub
parent b6fa8fe86a
commit 9d2e1ea386
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 39 additions and 259 deletions

View file

@ -28,9 +28,6 @@
__all__ = (
'SelectorEventLoop',
'AbstractChildWatcher',
'PidfdChildWatcher',
'ThreadedChildWatcher',
'DefaultEventLoopPolicy',
'EventLoop',
)
@ -65,6 +62,10 @@ def __init__(self, selector=None):
super().__init__(selector)
self._signal_handlers = {}
self._unix_server_sockets = {}
if can_use_pidfd():
self._watcher = _PidfdChildWatcher()
else:
self._watcher = _ThreadedChildWatcher()
def close(self):
super().close()
@ -197,33 +198,22 @@ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
async def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
watcher = events.get_event_loop_policy()._watcher
with watcher:
if not watcher.is_active():
# Check early.
# Raising exception before process creation
# prevents subprocess execution if the watcher
# is not ready to handle it.
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
"subprocess support is not installed.")
waiter = self.create_future()
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
waiter=waiter, extra=extra,
**kwargs)
watcher.add_child_handler(transp.get_pid(),
self._child_watcher_callback, transp)
try:
await waiter
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
transp.close()
await transp._wait()
raise
watcher = self._watcher
waiter = self.create_future()
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
waiter=waiter, extra=extra,
**kwargs)
watcher.add_child_handler(transp.get_pid(),
self._child_watcher_callback, transp)
try:
await waiter
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
transp.close()
await transp._wait()
raise
return transp
@ -865,93 +855,7 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
stdin_w.close()
class AbstractChildWatcher:
"""Abstract base class for monitoring child processes.
Objects derived from this class monitor a collection of subprocesses and
report their termination or interruption by a signal.
New callbacks are registered with .add_child_handler(). Starting a new
process must be done within a 'with' block to allow the watcher to suspend
its activity until the new process if fully registered (this is needed to
prevent a race condition in some implementations).
Example:
with watcher:
proc = subprocess.Popen("sleep 1")
watcher.add_child_handler(proc.pid, callback)
Notes:
Implementations of this class must be thread-safe.
Since child watcher objects may catch the SIGCHLD signal and call
waitpid(-1), there should be only one active object per process.
"""
def __init_subclass__(cls) -> None:
if cls.__module__ != __name__:
warnings._deprecated("AbstractChildWatcher",
"{name!r} is deprecated as of Python 3.12 and will be "
"removed in Python {remove}.",
remove=(3, 14))
def add_child_handler(self, pid, callback, *args):
"""Register a new child handler.
Arrange for callback(pid, returncode, *args) to be called when
process 'pid' terminates. Specifying another callback for the same
process replaces the previous handler.
Note: callback() must be thread-safe.
"""
raise NotImplementedError()
def remove_child_handler(self, pid):
"""Removes the handler for process 'pid'.
The function returns True if the handler was successfully removed,
False if there was nothing to remove."""
raise NotImplementedError()
def attach_loop(self, loop):
"""Attach the watcher to an event loop.
If the watcher was previously attached to an event loop, then it is
first detached before attaching to the new loop.
Note: loop may be None.
"""
raise NotImplementedError()
def close(self):
"""Close the watcher.
This must be called to make sure that any underlying resource is freed.
"""
raise NotImplementedError()
def is_active(self):
"""Return ``True`` if the watcher is active and is used by the event loop.
Return True if the watcher is installed and ready to handle process exit
notifications.
"""
raise NotImplementedError()
def __enter__(self):
"""Enter the watcher's context and allow starting new processes
This function must return self"""
raise NotImplementedError()
def __exit__(self, a, b, c):
"""Exit the watcher's context"""
raise NotImplementedError()
class PidfdChildWatcher(AbstractChildWatcher):
class _PidfdChildWatcher:
"""Child watcher implementation using Linux's pid file descriptors.
This child watcher polls process file descriptors (pidfds) to await child
@ -963,21 +867,9 @@ class PidfdChildWatcher(AbstractChildWatcher):
recent (5.3+) kernels.
"""
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
def is_active(self):
return True
def close(self):
pass
def attach_loop(self, loop):
pass
def add_child_handler(self, pid, callback, *args):
loop = events.get_running_loop()
pidfd = os.pidfd_open(pid)
@ -1002,14 +894,7 @@ def _do_wait(self, pid, pidfd, callback, args):
os.close(pidfd)
callback(pid, returncode, *args)
def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base classes require it.
return True
class ThreadedChildWatcher(AbstractChildWatcher):
class _ThreadedChildWatcher:
"""Threaded child watcher implementation.
The watcher uses a thread per process
@ -1029,15 +914,6 @@ def __init__(self):
def is_active(self):
return True
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def __del__(self, _warn=warnings.warn):
threads = [thread for thread in list(self._threads.values())
if thread.is_alive()]
@ -1055,15 +931,6 @@ def add_child_handler(self, pid, callback, *args):
self._threads[pid] = thread
thread.start()
def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base classes require it.
return True
def attach_loop(self, loop):
pass
def _do_waitpid(self, loop, expected_pid, callback, args):
assert expected_pid > 0
@ -1103,29 +970,9 @@ def can_use_pidfd():
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
"""UNIX event loop policy with a watcher for child processes."""
"""UNIX event loop policy"""
_loop_factory = _UnixSelectorEventLoop
def __init__(self):
super().__init__()
if can_use_pidfd():
self._watcher = PidfdChildWatcher()
else:
self._watcher = ThreadedChildWatcher()
def set_event_loop(self, loop):
"""Set the event loop.
As a side effect, if a child watcher was set before, then calling
.set_event_loop() from the main thread will call .attach_loop(loop) on
the child watcher.
"""
super().set_event_loop(loop)
if (self._watcher is not None and
threading.current_thread() is threading.main_thread()):
self._watcher.attach_loop(loop)
SelectorEventLoop = _UnixSelectorEventLoop
DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy

View file

@ -2209,22 +2209,8 @@ def test_remove_fds_after_closing(self):
else:
import selectors
class UnixEventLoopTestsMixin(EventLoopTestsMixin):
def setUp(self):
super().setUp()
watcher = asyncio.ThreadedChildWatcher()
watcher.attach_loop(self.loop)
policy = asyncio.get_event_loop_policy()
policy._watcher = watcher
def tearDown(self):
policy = asyncio.get_event_loop_policy()
policy._watcher = None
super().tearDown()
if hasattr(selectors, 'KqueueSelector'):
class KqueueEventLoopTests(UnixEventLoopTestsMixin,
class KqueueEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):
@ -2249,7 +2235,7 @@ def test_write_pty(self):
super().test_write_pty()
if hasattr(selectors, 'EpollSelector'):
class EPollEventLoopTests(UnixEventLoopTestsMixin,
class EPollEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):
@ -2257,7 +2243,7 @@ def create_event_loop(self):
return asyncio.SelectorEventLoop(selectors.EpollSelector())
if hasattr(selectors, 'PollSelector'):
class PollEventLoopTests(UnixEventLoopTestsMixin,
class PollEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):
@ -2265,7 +2251,7 @@ def create_event_loop(self):
return asyncio.SelectorEventLoop(selectors.PollSelector())
# Should always exist.
class SelectEventLoopTests(UnixEventLoopTestsMixin,
class SelectEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):
@ -2830,10 +2816,6 @@ def setUp(self):
def tearDown(self):
try:
if sys.platform != 'win32':
policy = asyncio.get_event_loop_policy()
policy._watcher = None
super().tearDown()
finally:
self.loop.close()

View file

@ -869,31 +869,27 @@ async def main():
# Unix
class SubprocessWatcherMixin(SubprocessMixin):
Watcher = None
def setUp(self):
super().setUp()
policy = asyncio.get_event_loop_policy()
self.loop = policy.new_event_loop()
self.set_event_loop(self.loop)
watcher = self._get_watcher()
watcher.attach_loop(self.loop)
policy._watcher = watcher
def test_watcher_implementation(self):
loop = self.loop
watcher = loop._watcher
if unix_events.can_use_pidfd():
self.assertIsInstance(watcher, unix_events._PidfdChildWatcher)
else:
self.assertIsInstance(watcher, unix_events._ThreadedChildWatcher)
def tearDown(self):
super().tearDown()
policy = asyncio.get_event_loop_policy()
watcher = policy._watcher
policy._watcher = None
watcher.attach_loop(None)
watcher.close()
class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
def _get_watcher(self):
return unix_events.ThreadedChildWatcher()
def setUp(self):
# Force the use of the threaded child watcher
unix_events.can_use_pidfd = mock.Mock(return_value=False)
super().setUp()
@unittest.skipUnless(
unix_events.can_use_pidfd(),
@ -902,9 +898,7 @@ def _get_watcher(self):
class SubprocessPidfdWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
def _get_watcher(self):
return unix_events.PidfdChildWatcher()
pass
else:
# Windows

View file

@ -1112,32 +1112,6 @@ def test_write_eof_pending(self):
self.assertFalse(self.protocol.connection_lost.called)
class AbstractChildWatcherTests(unittest.TestCase):
def test_warns_on_subclassing(self):
with self.assertWarns(DeprecationWarning):
class MyWatcher(asyncio.AbstractChildWatcher):
pass
def test_not_implemented(self):
f = mock.Mock()
watcher = asyncio.AbstractChildWatcher()
self.assertRaises(
NotImplementedError, watcher.add_child_handler, f, f)
self.assertRaises(
NotImplementedError, watcher.remove_child_handler, f)
self.assertRaises(
NotImplementedError, watcher.attach_loop, f)
self.assertRaises(
NotImplementedError, watcher.close)
self.assertRaises(
NotImplementedError, watcher.is_active)
self.assertRaises(
NotImplementedError, watcher.__enter__)
self.assertRaises(
NotImplementedError, watcher.__exit__, f, f, f)
class TestFunctional(unittest.TestCase):
def setUp(self):

View file

@ -547,23 +547,6 @@ def close_loop(loop):
loop._default_executor.shutdown(wait=True)
loop.close()
policy = support.maybe_get_event_loop_policy()
if policy is not None:
try:
watcher = policy._watcher
except AttributeError:
# watcher is not implemented by EventLoopPolicy, e.g. Windows
pass
else:
if isinstance(watcher, asyncio.ThreadedChildWatcher):
# Wait for subprocess to finish, but not forever
for thread in list(watcher._threads.values()):
thread.join(timeout=support.SHORT_TIMEOUT)
if thread.is_alive():
raise RuntimeError(f"thread {thread} still alive: "
"subprocess still running")
def set_event_loop(self, loop, *, cleanup=True):
if loop is None:
raise AssertionError('loop is None')