mirror of
https://github.com/python/cpython
synced 2024-10-14 17:40:01 +00:00
asyncio: sync with Tulip
Issue #23347: send_signal(), kill() and terminate() methods of BaseSubprocessTransport now check if the transport was closed and if the process exited. Issue #23347: Refactor creation of subprocess transports. Changes on BaseSubprocessTransport: * Add a wait() method to wait until the child process exit * The constructor now accepts an optional waiter parameter. The _post_init() coroutine must not be called explicitly anymore. It makes subprocess transports closer to other transports, and it gives more freedom if we want later to change completly how subprocess transports are created. * close() now kills the process instead of kindly terminate it: the child process may ignore SIGTERM and continue to run. Call explicitly terminate() and wait() if you want to kindly terminate the child process. * close() now logs a warning in debug mode if the process is still running and needs to be killed * _make_subprocess_transport() is now fully asynchronous again: if the creation of the transport failed, wait asynchronously for the process eixt. Before the wait was synchronous. This change requires close() to *kill*, and not terminate, the child process. * Remove the _kill_wait() method, replaced with a more agressive close() method. It fixes _make_subprocess_transport() on error. BaseSubprocessTransport.close() calls the close() method of pipe transports, whereas _kill_wait() closed directly pipes of the subprocess.Popen object without unregistering file descriptors from the selector (which caused severe bugs). These changes simplifies the code of subprocess.py.
This commit is contained in:
parent
978a9afc6a
commit
47cd10d7a9
|
@ -3,6 +3,7 @@
|
|||
import sys
|
||||
import warnings
|
||||
|
||||
from . import futures
|
||||
from . import protocols
|
||||
from . import transports
|
||||
from .coroutines import coroutine
|
||||
|
@ -13,27 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
|
|||
|
||||
def __init__(self, loop, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
extra=None, **kwargs):
|
||||
waiter=None, extra=None, **kwargs):
|
||||
super().__init__(extra)
|
||||
self._closed = False
|
||||
self._protocol = protocol
|
||||
self._loop = loop
|
||||
self._proc = None
|
||||
self._pid = None
|
||||
|
||||
self._returncode = None
|
||||
self._exit_waiters = []
|
||||
self._pending_calls = collections.deque()
|
||||
self._pipes = {}
|
||||
self._finished = False
|
||||
|
||||
if stdin == subprocess.PIPE:
|
||||
self._pipes[0] = None
|
||||
if stdout == subprocess.PIPE:
|
||||
self._pipes[1] = None
|
||||
if stderr == subprocess.PIPE:
|
||||
self._pipes[2] = None
|
||||
self._pending_calls = collections.deque()
|
||||
self._finished = False
|
||||
self._returncode = None
|
||||
|
||||
# Create the child process: set the _proc attribute
|
||||
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
|
||||
stderr=stderr, bufsize=bufsize, **kwargs)
|
||||
self._pid = self._proc.pid
|
||||
self._extra['subprocess'] = self._proc
|
||||
|
||||
if self._loop.get_debug():
|
||||
if isinstance(args, (bytes, str)):
|
||||
program = args
|
||||
|
@ -42,6 +48,8 @@ def __init__(self, loop, protocol, args, shell,
|
|||
logger.debug('process %r created: pid %s',
|
||||
program, self._pid)
|
||||
|
||||
self._loop.create_task(self._connect_pipes(waiter))
|
||||
|
||||
def __repr__(self):
|
||||
info = [self.__class__.__name__]
|
||||
if self._closed:
|
||||
|
@ -77,12 +85,23 @@ def _make_read_subprocess_pipe_proto(self, fd):
|
|||
|
||||
def close(self):
|
||||
self._closed = True
|
||||
|
||||
for proto in self._pipes.values():
|
||||
if proto is None:
|
||||
continue
|
||||
proto.pipe.close()
|
||||
if self._returncode is None:
|
||||
self.terminate()
|
||||
|
||||
if self._proc is not None and self._returncode is None:
|
||||
if self._loop.get_debug():
|
||||
logger.warning('Close running child process: kill %r', self)
|
||||
|
||||
try:
|
||||
self._proc.kill()
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
|
||||
# Don't clear the _proc reference yet because _post_init() may
|
||||
# still run
|
||||
|
||||
# On Python 3.3 and older, objects with a destructor part of a reference
|
||||
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
|
||||
|
@ -105,59 +124,42 @@ def get_pipe_transport(self, fd):
|
|||
else:
|
||||
return None
|
||||
|
||||
def _check_proc(self):
|
||||
if self._closed:
|
||||
raise ValueError("operation on closed transport")
|
||||
if self._proc is None:
|
||||
raise ProcessLookupError()
|
||||
|
||||
def send_signal(self, signal):
|
||||
self._check_proc()
|
||||
self._proc.send_signal(signal)
|
||||
|
||||
def terminate(self):
|
||||
self._check_proc()
|
||||
self._proc.terminate()
|
||||
|
||||
def kill(self):
|
||||
self._check_proc()
|
||||
self._proc.kill()
|
||||
|
||||
def _kill_wait(self):
|
||||
"""Close pipes, kill the subprocess and read its return status.
|
||||
|
||||
Function called when an exception is raised during the creation
|
||||
of a subprocess.
|
||||
"""
|
||||
self._closed = True
|
||||
if self._loop.get_debug():
|
||||
logger.warning('Exception during subprocess creation, '
|
||||
'kill the subprocess %r',
|
||||
self,
|
||||
exc_info=True)
|
||||
|
||||
proc = self._proc
|
||||
if proc.stdout:
|
||||
proc.stdout.close()
|
||||
if proc.stderr:
|
||||
proc.stderr.close()
|
||||
if proc.stdin:
|
||||
proc.stdin.close()
|
||||
|
||||
try:
|
||||
proc.kill()
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
self._returncode = proc.wait()
|
||||
|
||||
self.close()
|
||||
|
||||
@coroutine
|
||||
def _post_init(self):
|
||||
def _connect_pipes(self, waiter):
|
||||
try:
|
||||
proc = self._proc
|
||||
loop = self._loop
|
||||
|
||||
if proc.stdin is not None:
|
||||
_, pipe = yield from loop.connect_write_pipe(
|
||||
lambda: WriteSubprocessPipeProto(self, 0),
|
||||
proc.stdin)
|
||||
self._pipes[0] = pipe
|
||||
|
||||
if proc.stdout is not None:
|
||||
_, pipe = yield from loop.connect_read_pipe(
|
||||
lambda: ReadSubprocessPipeProto(self, 1),
|
||||
proc.stdout)
|
||||
self._pipes[1] = pipe
|
||||
|
||||
if proc.stderr is not None:
|
||||
_, pipe = yield from loop.connect_read_pipe(
|
||||
lambda: ReadSubprocessPipeProto(self, 2),
|
||||
|
@ -166,13 +168,16 @@ def _post_init(self):
|
|||
|
||||
assert self._pending_calls is not None
|
||||
|
||||
self._loop.call_soon(self._protocol.connection_made, self)
|
||||
loop.call_soon(self._protocol.connection_made, self)
|
||||
for callback, data in self._pending_calls:
|
||||
self._loop.call_soon(callback, *data)
|
||||
loop.call_soon(callback, *data)
|
||||
self._pending_calls = None
|
||||
except:
|
||||
self._kill_wait()
|
||||
raise
|
||||
except Exception as exc:
|
||||
if waiter is not None and not waiter.cancelled():
|
||||
waiter.set_exception(exc)
|
||||
else:
|
||||
if waiter is not None and not waiter.cancelled():
|
||||
waiter.set_result(None)
|
||||
|
||||
def _call(self, cb, *data):
|
||||
if self._pending_calls is not None:
|
||||
|
@ -197,6 +202,23 @@ def _process_exited(self, returncode):
|
|||
self._call(self._protocol.process_exited)
|
||||
self._try_finish()
|
||||
|
||||
# wake up futures waiting for wait()
|
||||
for waiter in self._exit_waiters:
|
||||
if not waiter.cancelled():
|
||||
waiter.set_result(returncode)
|
||||
self._exit_waiters = None
|
||||
|
||||
def wait(self):
|
||||
"""Wait until the process exit and return the process return code.
|
||||
|
||||
This method is a coroutine."""
|
||||
if self._returncode is not None:
|
||||
return self._returncode
|
||||
|
||||
waiter = futures.Future(loop=self._loop)
|
||||
self._exit_waiters.append(waiter)
|
||||
return (yield from waiter)
|
||||
|
||||
def _try_finish(self):
|
||||
assert not self._finished
|
||||
if self._returncode is None:
|
||||
|
@ -210,9 +232,9 @@ def _call_connection_lost(self, exc):
|
|||
try:
|
||||
self._protocol.connection_lost(exc)
|
||||
finally:
|
||||
self._loop = None
|
||||
self._proc = None
|
||||
self._protocol = None
|
||||
self._loop = None
|
||||
|
||||
|
||||
class WriteSubprocessPipeProto(protocols.BaseProtocol):
|
||||
|
|
|
@ -25,8 +25,6 @@ def __init__(self, limit, loop):
|
|||
super().__init__(loop=loop)
|
||||
self._limit = limit
|
||||
self.stdin = self.stdout = self.stderr = None
|
||||
self.waiter = futures.Future(loop=loop)
|
||||
self._waiters = collections.deque()
|
||||
self._transport = None
|
||||
|
||||
def __repr__(self):
|
||||
|
@ -61,9 +59,6 @@ def connection_made(self, transport):
|
|||
reader=None,
|
||||
loop=self._loop)
|
||||
|
||||
if not self.waiter.cancelled():
|
||||
self.waiter.set_result(None)
|
||||
|
||||
def pipe_data_received(self, fd, data):
|
||||
if fd == 1:
|
||||
reader = self.stdout
|
||||
|
@ -94,16 +89,9 @@ def pipe_connection_lost(self, fd, exc):
|
|||
reader.set_exception(exc)
|
||||
|
||||
def process_exited(self):
|
||||
returncode = self._transport.get_returncode()
|
||||
self._transport.close()
|
||||
self._transport = None
|
||||
|
||||
# wake up futures waiting for wait()
|
||||
while self._waiters:
|
||||
waiter = self._waiters.popleft()
|
||||
if not waiter.cancelled():
|
||||
waiter.set_result(returncode)
|
||||
|
||||
|
||||
class Process:
|
||||
def __init__(self, transport, protocol, loop):
|
||||
|
@ -124,30 +112,18 @@ def returncode(self):
|
|||
|
||||
@coroutine
|
||||
def wait(self):
|
||||
"""Wait until the process exit and return the process return code."""
|
||||
returncode = self._transport.get_returncode()
|
||||
if returncode is not None:
|
||||
return returncode
|
||||
"""Wait until the process exit and return the process return code.
|
||||
|
||||
waiter = futures.Future(loop=self._loop)
|
||||
self._protocol._waiters.append(waiter)
|
||||
yield from waiter
|
||||
return waiter.result()
|
||||
|
||||
def _check_alive(self):
|
||||
if self._transport.get_returncode() is not None:
|
||||
raise ProcessLookupError()
|
||||
This method is a coroutine."""
|
||||
return (yield from self._transport.wait())
|
||||
|
||||
def send_signal(self, signal):
|
||||
self._check_alive()
|
||||
self._transport.send_signal(signal)
|
||||
|
||||
def terminate(self):
|
||||
self._check_alive()
|
||||
self._transport.terminate()
|
||||
|
||||
def kill(self):
|
||||
self._check_alive()
|
||||
self._transport.kill()
|
||||
|
||||
@coroutine
|
||||
|
@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
|
|||
protocol_factory,
|
||||
cmd, stdin=stdin, stdout=stdout,
|
||||
stderr=stderr, **kwds)
|
||||
try:
|
||||
yield from protocol.waiter
|
||||
except:
|
||||
transport._kill_wait()
|
||||
raise
|
||||
return Process(transport, protocol, loop)
|
||||
|
||||
@coroutine
|
||||
|
@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None,
|
|||
program, *args,
|
||||
stdin=stdin, stdout=stdout,
|
||||
stderr=stderr, **kwds)
|
||||
try:
|
||||
yield from protocol.waiter
|
||||
except:
|
||||
transport._kill_wait()
|
||||
raise
|
||||
return Process(transport, protocol, loop)
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
from . import constants
|
||||
from . import coroutines
|
||||
from . import events
|
||||
from . import futures
|
||||
from . import selector_events
|
||||
from . import selectors
|
||||
from . import transports
|
||||
|
@ -175,16 +176,20 @@ def _make_subprocess_transport(self, protocol, args, shell,
|
|||
stdin, stdout, stderr, bufsize,
|
||||
extra=None, **kwargs):
|
||||
with events.get_child_watcher() as watcher:
|
||||
waiter = futures.Future(loop=self)
|
||||
transp = _UnixSubprocessTransport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
extra=extra, **kwargs)
|
||||
try:
|
||||
yield from transp._post_init()
|
||||
except:
|
||||
transp.close()
|
||||
raise
|
||||
waiter=waiter, extra=extra,
|
||||
**kwargs)
|
||||
|
||||
watcher.add_child_handler(transp.get_pid(),
|
||||
self._child_watcher_callback, transp)
|
||||
try:
|
||||
yield from waiter
|
||||
except:
|
||||
transp.close()
|
||||
yield from transp.wait()
|
||||
raise
|
||||
|
||||
return transp
|
||||
|
||||
|
@ -774,7 +779,7 @@ def __exit__(self, a, b, c):
|
|||
pass
|
||||
|
||||
def add_child_handler(self, pid, callback, *args):
|
||||
self._callbacks[pid] = callback, args
|
||||
self._callbacks[pid] = (callback, args)
|
||||
|
||||
# Prevent a race condition in case the child is already terminated.
|
||||
self._do_waitpid(pid)
|
||||
|
|
|
@ -366,13 +366,16 @@ def loop_accept_pipe(f=None):
|
|||
def _make_subprocess_transport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
extra=None, **kwargs):
|
||||
waiter = futures.Future(loop=self)
|
||||
transp = _WindowsSubprocessTransport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
extra=extra, **kwargs)
|
||||
waiter=waiter, extra=extra,
|
||||
**kwargs)
|
||||
try:
|
||||
yield from transp._post_init()
|
||||
yield from waiter
|
||||
except:
|
||||
transp.close()
|
||||
yield from transp.wait()
|
||||
raise
|
||||
|
||||
return transp
|
||||
|
|
|
@ -1551,9 +1551,10 @@ def test_subprocess_exec(self):
|
|||
stdin = transp.get_pipe_transport(0)
|
||||
stdin.write(b'Python The Winner')
|
||||
self.loop.run_until_complete(proto.got_data[1].wait())
|
||||
transp.close()
|
||||
with test_utils.disable_logger():
|
||||
transp.close()
|
||||
self.loop.run_until_complete(proto.completed)
|
||||
self.check_terminated(proto.returncode)
|
||||
self.check_killed(proto.returncode)
|
||||
self.assertEqual(b'Python The Winner', proto.data[1])
|
||||
|
||||
def test_subprocess_interactive(self):
|
||||
|
@ -1567,21 +1568,20 @@ def test_subprocess_interactive(self):
|
|||
self.loop.run_until_complete(proto.connected)
|
||||
self.assertEqual('CONNECTED', proto.state)
|
||||
|
||||
try:
|
||||
stdin = transp.get_pipe_transport(0)
|
||||
stdin.write(b'Python ')
|
||||
self.loop.run_until_complete(proto.got_data[1].wait())
|
||||
proto.got_data[1].clear()
|
||||
self.assertEqual(b'Python ', proto.data[1])
|
||||
stdin = transp.get_pipe_transport(0)
|
||||
stdin.write(b'Python ')
|
||||
self.loop.run_until_complete(proto.got_data[1].wait())
|
||||
proto.got_data[1].clear()
|
||||
self.assertEqual(b'Python ', proto.data[1])
|
||||
|
||||
stdin.write(b'The Winner')
|
||||
self.loop.run_until_complete(proto.got_data[1].wait())
|
||||
self.assertEqual(b'Python The Winner', proto.data[1])
|
||||
finally:
|
||||
stdin.write(b'The Winner')
|
||||
self.loop.run_until_complete(proto.got_data[1].wait())
|
||||
self.assertEqual(b'Python The Winner', proto.data[1])
|
||||
|
||||
with test_utils.disable_logger():
|
||||
transp.close()
|
||||
|
||||
self.loop.run_until_complete(proto.completed)
|
||||
self.check_terminated(proto.returncode)
|
||||
self.check_killed(proto.returncode)
|
||||
|
||||
def test_subprocess_shell(self):
|
||||
connect = self.loop.subprocess_shell(
|
||||
|
@ -1739,9 +1739,10 @@ def test_subprocess_close_client_stream(self):
|
|||
# GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
|
||||
# WriteFile() we get ERROR_BROKEN_PIPE as expected.)
|
||||
self.assertEqual(b'ERR:OSError', proto.data[2])
|
||||
transp.close()
|
||||
with test_utils.disable_logger():
|
||||
transp.close()
|
||||
self.loop.run_until_complete(proto.completed)
|
||||
self.check_terminated(proto.returncode)
|
||||
self.check_killed(proto.returncode)
|
||||
|
||||
def test_subprocess_wait_no_same_group(self):
|
||||
# start the new process in a new session
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
from unittest import mock
|
||||
|
||||
import asyncio
|
||||
from asyncio import base_subprocess
|
||||
from asyncio import subprocess
|
||||
from asyncio import test_utils
|
||||
try:
|
||||
|
@ -23,6 +24,70 @@
|
|||
'data = sys.stdin.buffer.read()',
|
||||
'sys.stdout.buffer.write(data)'))]
|
||||
|
||||
class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
|
||||
def _start(self, *args, **kwargs):
|
||||
self._proc = mock.Mock()
|
||||
self._proc.stdin = None
|
||||
self._proc.stdout = None
|
||||
self._proc.stderr = None
|
||||
|
||||
|
||||
class SubprocessTransportTests(test_utils.TestCase):
|
||||
def setUp(self):
|
||||
self.loop = self.new_test_loop()
|
||||
self.set_event_loop(self.loop)
|
||||
|
||||
|
||||
def create_transport(self, waiter=None):
|
||||
protocol = mock.Mock()
|
||||
protocol.connection_made._is_coroutine = False
|
||||
protocol.process_exited._is_coroutine = False
|
||||
transport = TestSubprocessTransport(
|
||||
self.loop, protocol, ['test'], False,
|
||||
None, None, None, 0, waiter=waiter)
|
||||
return (transport, protocol)
|
||||
|
||||
def test_close(self):
|
||||
waiter = asyncio.Future(loop=self.loop)
|
||||
transport, protocol = self.create_transport(waiter)
|
||||
transport._process_exited(0)
|
||||
transport.close()
|
||||
|
||||
# The loop didn't run yet
|
||||
self.assertFalse(protocol.connection_made.called)
|
||||
|
||||
# methods must raise ProcessLookupError if the transport was closed
|
||||
self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM)
|
||||
self.assertRaises(ValueError, transport.terminate)
|
||||
self.assertRaises(ValueError, transport.kill)
|
||||
|
||||
self.loop.run_until_complete(waiter)
|
||||
|
||||
def test_proc_exited(self):
|
||||
waiter = asyncio.Future(loop=self.loop)
|
||||
transport, protocol = self.create_transport(waiter)
|
||||
transport._process_exited(6)
|
||||
self.loop.run_until_complete(waiter)
|
||||
|
||||
self.assertEqual(transport.get_returncode(), 6)
|
||||
|
||||
self.assertTrue(protocol.connection_made.called)
|
||||
self.assertTrue(protocol.process_exited.called)
|
||||
self.assertTrue(protocol.connection_lost.called)
|
||||
self.assertEqual(protocol.connection_lost.call_args[0], (None,))
|
||||
|
||||
self.assertFalse(transport._closed)
|
||||
self.assertIsNone(transport._loop)
|
||||
self.assertIsNone(transport._proc)
|
||||
self.assertIsNone(transport._protocol)
|
||||
|
||||
# methods must raise ProcessLookupError if the process exited
|
||||
self.assertRaises(ProcessLookupError,
|
||||
transport.send_signal, signal.SIGTERM)
|
||||
self.assertRaises(ProcessLookupError, transport.terminate)
|
||||
self.assertRaises(ProcessLookupError, transport.kill)
|
||||
|
||||
|
||||
class SubprocessMixin:
|
||||
|
||||
def test_stdin_stdout(self):
|
||||
|
|
Loading…
Reference in a new issue