mirror of
https://github.com/python/cpython
synced 2024-09-30 14:24:27 +00:00
67d140dba7
subprocess's communicate(None) closes stdin of the child process, after sending no (extra) data. Make asyncio variant do the same. This fixes issues with processes that waits for EOF on stdin before continuing.
225 lines
7.3 KiB
Python
225 lines
7.3 KiB
Python
__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
|
|
|
|
import subprocess
|
|
|
|
from . import events
|
|
from . import protocols
|
|
from . import streams
|
|
from . import tasks
|
|
from .log import logger
|
|
|
|
|
|
PIPE = subprocess.PIPE
|
|
STDOUT = subprocess.STDOUT
|
|
DEVNULL = subprocess.DEVNULL
|
|
|
|
|
|
class SubprocessStreamProtocol(streams.FlowControlMixin,
|
|
protocols.SubprocessProtocol):
|
|
"""Like StreamReaderProtocol, but for a subprocess."""
|
|
|
|
def __init__(self, limit, loop):
|
|
super().__init__(loop=loop)
|
|
self._limit = limit
|
|
self.stdin = self.stdout = self.stderr = None
|
|
self._transport = None
|
|
self._process_exited = False
|
|
self._pipe_fds = []
|
|
self._stdin_closed = self._loop.create_future()
|
|
|
|
def __repr__(self):
|
|
info = [self.__class__.__name__]
|
|
if self.stdin is not None:
|
|
info.append(f'stdin={self.stdin!r}')
|
|
if self.stdout is not None:
|
|
info.append(f'stdout={self.stdout!r}')
|
|
if self.stderr is not None:
|
|
info.append(f'stderr={self.stderr!r}')
|
|
return '<{}>'.format(' '.join(info))
|
|
|
|
def connection_made(self, transport):
|
|
self._transport = transport
|
|
|
|
stdout_transport = transport.get_pipe_transport(1)
|
|
if stdout_transport is not None:
|
|
self.stdout = streams.StreamReader(limit=self._limit,
|
|
loop=self._loop)
|
|
self.stdout.set_transport(stdout_transport)
|
|
self._pipe_fds.append(1)
|
|
|
|
stderr_transport = transport.get_pipe_transport(2)
|
|
if stderr_transport is not None:
|
|
self.stderr = streams.StreamReader(limit=self._limit,
|
|
loop=self._loop)
|
|
self.stderr.set_transport(stderr_transport)
|
|
self._pipe_fds.append(2)
|
|
|
|
stdin_transport = transport.get_pipe_transport(0)
|
|
if stdin_transport is not None:
|
|
self.stdin = streams.StreamWriter(stdin_transport,
|
|
protocol=self,
|
|
reader=None,
|
|
loop=self._loop)
|
|
|
|
def pipe_data_received(self, fd, data):
|
|
if fd == 1:
|
|
reader = self.stdout
|
|
elif fd == 2:
|
|
reader = self.stderr
|
|
else:
|
|
reader = None
|
|
if reader is not None:
|
|
reader.feed_data(data)
|
|
|
|
def pipe_connection_lost(self, fd, exc):
|
|
if fd == 0:
|
|
pipe = self.stdin
|
|
if pipe is not None:
|
|
pipe.close()
|
|
self.connection_lost(exc)
|
|
if exc is None:
|
|
self._stdin_closed.set_result(None)
|
|
else:
|
|
self._stdin_closed.set_exception(exc)
|
|
return
|
|
if fd == 1:
|
|
reader = self.stdout
|
|
elif fd == 2:
|
|
reader = self.stderr
|
|
else:
|
|
reader = None
|
|
if reader is not None:
|
|
if exc is None:
|
|
reader.feed_eof()
|
|
else:
|
|
reader.set_exception(exc)
|
|
|
|
if fd in self._pipe_fds:
|
|
self._pipe_fds.remove(fd)
|
|
self._maybe_close_transport()
|
|
|
|
def process_exited(self):
|
|
self._process_exited = True
|
|
self._maybe_close_transport()
|
|
|
|
def _maybe_close_transport(self):
|
|
if len(self._pipe_fds) == 0 and self._process_exited:
|
|
self._transport.close()
|
|
self._transport = None
|
|
|
|
def _get_close_waiter(self, stream):
|
|
if stream is self.stdin:
|
|
return self._stdin_closed
|
|
|
|
|
|
class Process:
|
|
def __init__(self, transport, protocol, loop):
|
|
self._transport = transport
|
|
self._protocol = protocol
|
|
self._loop = loop
|
|
self.stdin = protocol.stdin
|
|
self.stdout = protocol.stdout
|
|
self.stderr = protocol.stderr
|
|
self.pid = transport.get_pid()
|
|
|
|
def __repr__(self):
|
|
return f'<{self.__class__.__name__} {self.pid}>'
|
|
|
|
@property
|
|
def returncode(self):
|
|
return self._transport.get_returncode()
|
|
|
|
async def wait(self):
|
|
"""Wait until the process exit and return the process return code."""
|
|
return await self._transport._wait()
|
|
|
|
def send_signal(self, signal):
|
|
self._transport.send_signal(signal)
|
|
|
|
def terminate(self):
|
|
self._transport.terminate()
|
|
|
|
def kill(self):
|
|
self._transport.kill()
|
|
|
|
async def _feed_stdin(self, input):
|
|
debug = self._loop.get_debug()
|
|
if input is not None:
|
|
self.stdin.write(input)
|
|
if debug:
|
|
logger.debug(
|
|
'%r communicate: feed stdin (%s bytes)', self, len(input))
|
|
try:
|
|
await self.stdin.drain()
|
|
except (BrokenPipeError, ConnectionResetError) as exc:
|
|
# communicate() ignores BrokenPipeError and ConnectionResetError
|
|
if debug:
|
|
logger.debug('%r communicate: stdin got %r', self, exc)
|
|
|
|
if debug:
|
|
logger.debug('%r communicate: close stdin', self)
|
|
self.stdin.close()
|
|
|
|
async def _noop(self):
|
|
return None
|
|
|
|
async def _read_stream(self, fd):
|
|
transport = self._transport.get_pipe_transport(fd)
|
|
if fd == 2:
|
|
stream = self.stderr
|
|
else:
|
|
assert fd == 1
|
|
stream = self.stdout
|
|
if self._loop.get_debug():
|
|
name = 'stdout' if fd == 1 else 'stderr'
|
|
logger.debug('%r communicate: read %s', self, name)
|
|
output = await stream.read()
|
|
if self._loop.get_debug():
|
|
name = 'stdout' if fd == 1 else 'stderr'
|
|
logger.debug('%r communicate: close %s', self, name)
|
|
transport.close()
|
|
return output
|
|
|
|
async def communicate(self, input=None):
|
|
if self.stdin is not None:
|
|
stdin = self._feed_stdin(input)
|
|
else:
|
|
stdin = self._noop()
|
|
if self.stdout is not None:
|
|
stdout = self._read_stream(1)
|
|
else:
|
|
stdout = self._noop()
|
|
if self.stderr is not None:
|
|
stderr = self._read_stream(2)
|
|
else:
|
|
stderr = self._noop()
|
|
stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
|
|
await self.wait()
|
|
return (stdout, stderr)
|
|
|
|
|
|
async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
|
|
limit=streams._DEFAULT_LIMIT, **kwds):
|
|
loop = events.get_running_loop()
|
|
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
|
|
loop=loop)
|
|
transport, protocol = await loop.subprocess_shell(
|
|
protocol_factory,
|
|
cmd, stdin=stdin, stdout=stdout,
|
|
stderr=stderr, **kwds)
|
|
return Process(transport, protocol, loop)
|
|
|
|
|
|
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
|
|
stderr=None, limit=streams._DEFAULT_LIMIT,
|
|
**kwds):
|
|
loop = events.get_running_loop()
|
|
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
|
|
loop=loop)
|
|
transport, protocol = await loop.subprocess_exec(
|
|
protocol_factory,
|
|
program, *args,
|
|
stdin=stdin, stdout=stdout,
|
|
stderr=stderr, **kwds)
|
|
return Process(transport, protocol, loop)
|