bpo-32604: PEP 554 for use in test suite (GH-19985)

* PEP 554 for use in test suite

* 📜🤖 Added by blurb_it.

* Fix space

* Add doc to doc tree

* Move to modules doc tree

* Fix suspicious doc errors

* Fix test__all

* Docs docs docs

* Support isolated and fix wait

* Fix white space

* Remove undefined from __all__

* Fix recv and add exceptions

* Remove unused exceptions, fix pep 8 formatting errors and fix _NOT_SET in recv_nowait()

Co-authored-by: nanjekyejoannah <joannah.nanjekye@ibm.com>
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
This commit is contained in:
Joannah Nanjekye 2020-05-19 14:20:38 -03:00 committed by GitHub
parent c105f7d895
commit 9d17cbf33d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 865 additions and 0 deletions

View file

@ -0,0 +1,183 @@
"""Subinterpreters High Level Module."""
import _xxsubinterpreters as _interpreters
# aliases:
from _xxsubinterpreters import (
ChannelError, ChannelNotFoundError, ChannelEmptyError,
is_shareable,
)
__all__ = [
'Interpreter', 'get_current', 'get_main', 'create', 'list_all',
'SendChannel', 'RecvChannel',
'create_channel', 'list_all_channels', 'is_shareable',
'ChannelError', 'ChannelNotFoundError',
'ChannelEmptyError',
]
def create(*, isolated=True):
"""
Initialize a new (idle) Python interpreter.
"""
id = _interpreters.create(isolated=isolated)
return Interpreter(id, isolated=isolated)
def list_all():
"""
Get all existing interpreters.
"""
return [Interpreter(id) for id in
_interpreters.list_all()]
def get_current():
"""
Get the currently running interpreter.
"""
id = _interpreters.get_current()
return Interpreter(id)
def get_main():
"""
Get the main interpreter.
"""
id = _interpreters.get_main()
return Interpreter(id)
class Interpreter:
"""
The Interpreter object represents
a single interpreter.
"""
def __init__(self, id, *, isolated=None):
self._id = id
self._isolated = isolated
@property
def id(self):
return self._id
@property
def isolated(self):
if self._isolated is None:
self._isolated = _interpreters.is_isolated(self._id)
return self._isolated
def is_running(self):
"""
Return whether or not the identified
interpreter is running.
"""
return _interpreters.is_running(self._id)
def close(self):
"""
Finalize and destroy the interpreter.
Attempting to destroy the current
interpreter results in a RuntimeError.
"""
return _interpreters.destroy(self._id)
def run(self, src_str, /, *, channels=None):
"""
Run the given source code in the interpreter.
This blocks the current Python thread until done.
"""
_interpreters.run_string(self._id, src_str)
def create_channel():
"""
Create a new channel for passing data between
interpreters.
"""
cid = _interpreters.channel_create()
return (RecvChannel(cid), SendChannel(cid))
def list_all_channels():
"""
Get all open channels.
"""
return [(RecvChannel(cid), SendChannel(cid))
for cid in _interpreters.channel_list_all()]
_NOT_SET = object()
class RecvChannel:
"""
The RecvChannel object represents
a recieving channel.
"""
def __init__(self, id):
self._id = id
def recv(self, *, _delay=10 / 1000): # 10 milliseconds
"""
Get the next object from the channel,
and wait if none have been sent.
Associate the interpreter with the channel.
"""
import time
sentinel = object()
obj = _interpreters.channel_recv(self._id, sentinel)
while obj is sentinel:
time.sleep(_delay)
obj = _interpreters.channel_recv(self._id, sentinel)
return obj
def recv_nowait(self, default=_NOT_SET):
"""
Like recv(), but return the default
instead of waiting.
This function is blocked by a missing low-level
implementation of channel_recv_wait().
"""
if default is _NOT_SET:
return _interpreters.channel_recv(self._id)
else:
return _interpreters.channel_recv(self._id, default)
class SendChannel:
"""
The SendChannel object represents
a sending channel.
"""
def __init__(self, id):
self._id = id
def send(self, obj):
"""
Send the object (i.e. its data) to the receiving
end of the channel and wait. Associate the interpreter
with the channel.
"""
import time
_interpreters.channel_send(self._id, obj)
time.sleep(2)
def send_nowait(self, obj):
"""
Like send(), but return False if not received.
This function is blocked by a missing low-level
implementation of channel_send_wait().
"""
_interpreters.channel_send(self._id, obj)
return False

View file

@ -0,0 +1,145 @@
High-level implementation of Subinterpreters
============================================
**Source code:** :source:`Lib/test/support/_interpreters.py`
--------------
This module provides high-level tools for working with sub-interpreters,
such as creating them, running code in them, or sending data between them.
It is a wrapper around the low-level ``__xxsubinterpreters`` module.
.. versionchanged:: added in 3.9
Interpreter Objects
-------------------
The ``Interpreter`` object represents a single interpreter.
.. class:: Interpreter(id)
The class implementing a subinterpreter object.
.. method:: is_running()
Return ``True`` if the identified interpreter is running.
.. method:: close()
Destroy the interpreter. Attempting to destroy the current
interpreter results in a `RuntimeError`.
.. method:: run(self, src_str, /, *, channels=None):
Run the given source code in the interpreter. This blocks
the current thread until done. ``channels`` should be in
the form : `(RecvChannel, SendChannel)`.
RecvChannel Objects
-------------------
The ``RecvChannel`` object represents a recieving channel.
.. class:: RecvChannel(id)
This class represents the receiving end of a channel.
.. method:: recv()
Get the next object from the channel, and wait if
none have been sent. Associate the interpreter
with the channel.
.. method:: recv_nowait(default=None)
Like ``recv()``, but return the default result
instead of waiting.
SendChannel Objects
--------------------
The ``SendChannel`` object represents a sending channel.
.. class:: SendChannel(id)
This class represents the sending end of a channel.
.. method:: send(obj)
Send the object ``obj`` to the receiving end of the channel
and wait. Associate the interpreter with the channel.
.. method:: send_nowait(obj)
Similar to ``send()``, but returns ``False`` if
*obj* is not immediately received instead of blocking.
This module defines the following global functions:
.. function:: is_shareable(obj)
Return ``True`` if the object's data can be shared between
interpreters.
.. function:: create_channel()
Create a new channel for passing data between interpreters.
.. function:: list_all_channels()
Return all open channels.
.. function:: create(*, isolated=True)
Initialize a new (idle) Python interpreter. Get the currently
running interpreter. This method returns an ``Interpreter`` object.
.. function:: get_current()
Get the currently running interpreter. This method returns
an ``Interpreter`` object.
.. function:: get_main()
Get the main interpreter. This method returns
an ``Interpreter`` object.
.. function:: list_all()
Get all existing interpreters. Returns a list
of ``Interpreter`` objects.
This module also defines the following exceptions.
.. exception:: RunFailedError
This exception, a subclass of :exc:`RuntimeError`, is raised when the
``Interpreter.run()`` results in an uncaught exception.
.. exception:: ChannelError
This exception is a subclass of :exc:`Exception`, and is the base
class for all channel-related exceptions.
.. exception:: ChannelNotFoundError
This exception is a subclass of :exc:`ChannelError`, and is raised
when the the identified channel is not found.
.. exception:: ChannelEmptyError
This exception is a subclass of :exc:`ChannelError`, and is raised when
the channel is unexpectedly empty.
.. exception:: ChannelNotEmptyError
This exception is a subclass of :exc:`ChannelError`, and is raised when
the channel is unexpectedly not empty.
.. exception:: NotReceivedError
This exception is a subclass of :exc:`ChannelError`, and is raised when
nothing was waiting to receive a sent object.

View file

@ -0,0 +1,535 @@
import contextlib
import os
import threading
from textwrap import dedent
import unittest
import time
import _xxsubinterpreters as _interpreters
from test.support import interpreters
def _captured_script(script):
r, w = os.pipe()
indented = script.replace('\n', '\n ')
wrapped = dedent(f"""
import contextlib
with open({w}, 'w') as spipe:
with contextlib.redirect_stdout(spipe):
{indented}
""")
return wrapped, open(r)
def clean_up_interpreters():
for interp in interpreters.list_all():
if interp.id == 0: # main
continue
try:
interp.close()
except RuntimeError:
pass # already destroyed
def _run_output(interp, request, shared=None):
script, rpipe = _captured_script(request)
with rpipe:
interp.run(script)
return rpipe.read()
@contextlib.contextmanager
def _running(interp):
r, w = os.pipe()
def run():
interp.run(dedent(f"""
# wait for "signal"
with open({r}) as rpipe:
rpipe.read()
"""))
t = threading.Thread(target=run)
t.start()
yield
with open(w, 'w') as spipe:
spipe.write('done')
t.join()
class TestBase(unittest.TestCase):
def tearDown(self):
clean_up_interpreters()
class CreateTests(TestBase):
def test_in_main(self):
interp = interpreters.create()
lst = interpreters.list_all()
self.assertEqual(interp.id, lst[1].id)
def test_in_thread(self):
lock = threading.Lock()
id = None
interp = interpreters.create()
lst = interpreters.list_all()
def f():
nonlocal id
id = interp.id
lock.acquire()
lock.release()
t = threading.Thread(target=f)
with lock:
t.start()
t.join()
self.assertEqual(interp.id, lst[1].id)
def test_in_subinterpreter(self):
main, = interpreters.list_all()
interp = interpreters.create()
out = _run_output(interp, dedent("""
from test.support import interpreters
interp = interpreters.create()
print(interp)
"""))
interp2 = out.strip()
self.assertEqual(len(set(interpreters.list_all())), len({main, interp, interp2}))
def test_after_destroy_all(self):
before = set(interpreters.list_all())
# Create 3 subinterpreters.
interp_lst = []
for _ in range(3):
interps = interpreters.create()
interp_lst.append(interps)
# Now destroy them.
for interp in interp_lst:
interp.close()
# Finally, create another.
interp = interpreters.create()
self.assertEqual(len(set(interpreters.list_all())), len(before | {interp}))
def test_after_destroy_some(self):
before = set(interpreters.list_all())
# Create 3 subinterpreters.
interp1 = interpreters.create()
interp2 = interpreters.create()
interp3 = interpreters.create()
# Now destroy 2 of them.
interp1.close()
interp2.close()
# Finally, create another.
interp = interpreters.create()
self.assertEqual(len(set(interpreters.list_all())), len(before | {interp3, interp}))
class GetCurrentTests(TestBase):
def test_main(self):
main_interp_id = _interpreters.get_main()
cur_interp_id = interpreters.get_current().id
self.assertEqual(cur_interp_id, main_interp_id)
def test_subinterpreter(self):
main = _interpreters.get_main()
interp = interpreters.create()
out = _run_output(interp, dedent("""
from test.support import interpreters
cur = interpreters.get_current()
print(cur)
"""))
cur = out.strip()
self.assertNotEqual(cur, main)
class ListAllTests(TestBase):
def test_initial(self):
interps = interpreters.list_all()
self.assertEqual(1, len(interps))
def test_after_creating(self):
main = interpreters.get_current()
first = interpreters.create()
second = interpreters.create()
ids = []
for interp in interpreters.list_all():
ids.append(interp.id)
self.assertEqual(ids, [main.id, first.id, second.id])
def test_after_destroying(self):
main = interpreters.get_current()
first = interpreters.create()
second = interpreters.create()
first.close()
ids = []
for interp in interpreters.list_all():
ids.append(interp.id)
self.assertEqual(ids, [main.id, second.id])
class TestInterpreterId(TestBase):
def test_in_main(self):
main = interpreters.get_current()
self.assertEqual(0, main.id)
def test_with_custom_num(self):
interp = interpreters.Interpreter(1)
self.assertEqual(1, interp.id)
def test_for_readonly_property(self):
interp = interpreters.Interpreter(1)
with self.assertRaises(AttributeError):
interp.id = 2
class TestInterpreterIsRunning(TestBase):
def test_main(self):
main = interpreters.get_current()
self.assertTrue(main.is_running())
def test_subinterpreter(self):
interp = interpreters.create()
self.assertFalse(interp.is_running())
with _running(interp):
self.assertTrue(interp.is_running())
self.assertFalse(interp.is_running())
def test_from_subinterpreter(self):
interp = interpreters.create()
out = _run_output(interp, dedent(f"""
import _xxsubinterpreters as _interpreters
if _interpreters.is_running({interp.id}):
print(True)
else:
print(False)
"""))
self.assertEqual(out.strip(), 'True')
def test_already_destroyed(self):
interp = interpreters.create()
interp.close()
with self.assertRaises(RuntimeError):
interp.is_running()
class TestInterpreterDestroy(TestBase):
def test_basic(self):
interp1 = interpreters.create()
interp2 = interpreters.create()
interp3 = interpreters.create()
self.assertEqual(4, len(interpreters.list_all()))
interp2.close()
self.assertEqual(3, len(interpreters.list_all()))
def test_all(self):
before = set(interpreters.list_all())
interps = set()
for _ in range(3):
interp = interpreters.create()
interps.add(interp)
self.assertEqual(len(set(interpreters.list_all())), len(before | interps))
for interp in interps:
interp.close()
self.assertEqual(len(set(interpreters.list_all())), len(before))
def test_main(self):
main, = interpreters.list_all()
with self.assertRaises(RuntimeError):
main.close()
def f():
with self.assertRaises(RuntimeError):
main.close()
t = threading.Thread(target=f)
t.start()
t.join()
def test_already_destroyed(self):
interp = interpreters.create()
interp.close()
with self.assertRaises(RuntimeError):
interp.close()
def test_from_current(self):
main, = interpreters.list_all()
interp = interpreters.create()
script = dedent(f"""
from test.support import interpreters
try:
main = interpreters.get_current()
main.close()
except RuntimeError:
pass
""")
interp.run(script)
self.assertEqual(len(set(interpreters.list_all())), len({main, interp}))
def test_from_sibling(self):
main, = interpreters.list_all()
interp1 = interpreters.create()
script = dedent(f"""
from test.support import interpreters
interp2 = interpreters.create()
interp2.close()
""")
interp1.run(script)
self.assertEqual(len(set(interpreters.list_all())), len({main, interp1}))
def test_from_other_thread(self):
interp = interpreters.create()
def f():
interp.close()
t = threading.Thread(target=f)
t.start()
t.join()
def test_still_running(self):
main, = interpreters.list_all()
interp = interpreters.create()
with _running(interp):
with self.assertRaises(RuntimeError):
interp.close()
self.assertTrue(interp.is_running())
class TestInterpreterRun(TestBase):
SCRIPT = dedent("""
with open('{}', 'w') as out:
out.write('{}')
""")
FILENAME = 'spam'
def setUp(self):
super().setUp()
self.interp = interpreters.create()
self._fs = None
def tearDown(self):
if self._fs is not None:
self._fs.close()
super().tearDown()
@property
def fs(self):
if self._fs is None:
self._fs = FSFixture(self)
return self._fs
def test_success(self):
script, file = _captured_script('print("it worked!", end="")')
with file:
self.interp.run(script)
out = file.read()
self.assertEqual(out, 'it worked!')
def test_in_thread(self):
script, file = _captured_script('print("it worked!", end="")')
with file:
def f():
self.interp.run(script)
t = threading.Thread(target=f)
t.start()
t.join()
out = file.read()
self.assertEqual(out, 'it worked!')
@unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
def test_fork(self):
import tempfile
with tempfile.NamedTemporaryFile('w+') as file:
file.write('')
file.flush()
expected = 'spam spam spam spam spam'
script = dedent(f"""
import os
try:
os.fork()
except RuntimeError:
with open('{file.name}', 'w') as out:
out.write('{expected}')
""")
self.interp.run(script)
file.seek(0)
content = file.read()
self.assertEqual(content, expected)
def test_already_running(self):
with _running(self.interp):
with self.assertRaises(RuntimeError):
self.interp.run('print("spam")')
def test_bad_script(self):
with self.assertRaises(TypeError):
self.interp.run(10)
def test_bytes_for_script(self):
with self.assertRaises(TypeError):
self.interp.run(b'print("spam")')
class TestIsShareable(TestBase):
def test_default_shareables(self):
shareables = [
# singletons
None,
# builtin objects
b'spam',
'spam',
10,
-10,
]
for obj in shareables:
with self.subTest(obj):
self.assertTrue(
interpreters.is_shareable(obj))
def test_not_shareable(self):
class Cheese:
def __init__(self, name):
self.name = name
def __str__(self):
return self.name
class SubBytes(bytes):
"""A subclass of a shareable type."""
not_shareables = [
# singletons
True,
False,
NotImplemented,
...,
# builtin types and objects
type,
object,
object(),
Exception(),
100.0,
# user-defined types and objects
Cheese,
Cheese('Wensleydale'),
SubBytes(b'spam'),
]
for obj in not_shareables:
with self.subTest(repr(obj)):
self.assertFalse(
interpreters.is_shareable(obj))
class TestChannel(TestBase):
def test_create_cid(self):
r, s = interpreters.create_channel()
self.assertIsInstance(r, interpreters.RecvChannel)
self.assertIsInstance(s, interpreters.SendChannel)
def test_sequential_ids(self):
before = interpreters.list_all_channels()
channels1 = interpreters.create_channel()
channels2 = interpreters.create_channel()
channels3 = interpreters.create_channel()
after = interpreters.list_all_channels()
self.assertEqual(len(set(after) - set(before)),
len({channels1, channels2, channels3}))
class TestSendRecv(TestBase):
def test_send_recv_main(self):
r, s = interpreters.create_channel()
orig = b'spam'
s.send(orig)
obj = r.recv()
self.assertEqual(obj, orig)
self.assertIsNot(obj, orig)
def test_send_recv_same_interpreter(self):
interp = interpreters.create()
out = _run_output(interp, dedent("""
from test.support import interpreters
r, s = interpreters.create_channel()
orig = b'spam'
s.send(orig)
obj = r.recv()
assert obj is not orig
assert obj == orig
"""))
def test_send_recv_different_threads(self):
r, s = interpreters.create_channel()
def f():
while True:
try:
obj = r.recv()
break
except interpreters.ChannelEmptyError:
time.sleep(0.1)
s.send(obj)
t = threading.Thread(target=f)
t.start()
s.send(b'spam')
t.join()
obj = r.recv()
self.assertEqual(obj, b'spam')
def test_send_recv_nowait_main(self):
r, s = interpreters.create_channel()
orig = b'spam'
s.send(orig)
obj = r.recv_nowait()
self.assertEqual(obj, orig)
self.assertIsNot(obj, orig)
def test_send_recv_nowait_same_interpreter(self):
interp = interpreters.create()
out = _run_output(interp, dedent("""
from test.support import interpreters
r, s = interpreters.create_channel()
orig = b'spam'
s.send(orig)
obj = r.recv_nowait()
assert obj is not orig
assert obj == orig
"""))
r, s = interpreters.create_channel()
def f():
while True:
try:
obj = r.recv_nowait()
break
except _interpreters.ChannelEmptyError:
time.sleep(0.1)
s.send(obj)

View file

@ -0,0 +1,2 @@
PEP 554 for use in the test suite.
(Patch By Joannah Nanjekye)