bpo-32604: Recommit "bpo-32604: PEP 554 for use in test suite (GH-19985)" (GH-20611)

* PEP 554 for use in test suite

* 📜🤖 Added by blurb_it.

* Fix space

* Add doc to doc tree

* Move to modules doc tree

* Fix suspicious doc errors

* Fix test__all

* Docs docs docs

* Support isolated and fix wait

* Fix white space

* Remove undefined from __all__

* Fix recv and add exceptions

* Remove unused exceptions, fix pep 8 formatting errors and fix _NOT_SET in recv_nowait()

* Update Lib/test/support/interpreters.py

Co-authored-by: Pablo Galindo <Pablogsal@gmail.com>

* Remove documentation (module is for internal use)

Co-authored-by: nanjekyejoannah <joannah.nanjekye@ibm.com>
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py
new file mode 100644
index 0000000..3451a4c
--- /dev/null
+++ b/Lib/test/test_interpreters.py
@@ -0,0 +1,535 @@
+import contextlib
+import os
+import threading
+from textwrap import dedent
+import unittest
+import time
+
+import _xxsubinterpreters as _interpreters
+from test.support import interpreters
+
+
+def _captured_script(script):
+    r, w = os.pipe()
+    indented = script.replace('\n', '\n                ')
+    wrapped = dedent(f"""
+        import contextlib
+        with open({w}, 'w') as spipe:
+            with contextlib.redirect_stdout(spipe):
+                {indented}
+        """)
+    return wrapped, open(r)
+
+
+def clean_up_interpreters():
+    for interp in interpreters.list_all():
+        if interp.id == 0:  # main
+            continue
+        try:
+            interp.close()
+        except RuntimeError:
+            pass  # already destroyed
+
+
+def _run_output(interp, request, shared=None):
+    script, rpipe = _captured_script(request)
+    with rpipe:
+        interp.run(script)
+        return rpipe.read()
+
+
+@contextlib.contextmanager
+def _running(interp):
+    r, w = os.pipe()
+    def run():
+        interp.run(dedent(f"""
+            # wait for "signal"
+            with open({r}) as rpipe:
+                rpipe.read()
+            """))
+
+    t = threading.Thread(target=run)
+    t.start()
+
+    yield
+
+    with open(w, 'w') as spipe:
+        spipe.write('done')
+    t.join()
+
+
+class TestBase(unittest.TestCase):
+
+    def tearDown(self):
+        clean_up_interpreters()
+
+
+class CreateTests(TestBase):
+
+    def test_in_main(self):
+        interp = interpreters.create()
+        lst = interpreters.list_all()
+        self.assertEqual(interp.id, lst[1].id)
+
+    def test_in_thread(self):
+        lock = threading.Lock()
+        id = None
+        interp = interpreters.create()
+        lst = interpreters.list_all()
+        def f():
+            nonlocal id
+            id = interp.id
+            lock.acquire()
+            lock.release()
+
+        t = threading.Thread(target=f)
+        with lock:
+            t.start()
+        t.join()
+        self.assertEqual(interp.id, lst[1].id)
+
+    def test_in_subinterpreter(self):
+        main, = interpreters.list_all()
+        interp = interpreters.create()
+        out = _run_output(interp, dedent("""
+            from test.support import interpreters
+            interp = interpreters.create()
+            print(interp)
+            """))
+        interp2 = out.strip()
+
+        self.assertEqual(len(set(interpreters.list_all())), len({main, interp, interp2}))
+
+    def test_after_destroy_all(self):
+        before = set(interpreters.list_all())
+        # Create 3 subinterpreters.
+        interp_lst = []
+        for _ in range(3):
+            interps = interpreters.create()
+            interp_lst.append(interps)
+        # Now destroy them.
+        for interp in interp_lst:
+            interp.close()
+        # Finally, create another.
+        interp = interpreters.create()
+        self.assertEqual(len(set(interpreters.list_all())), len(before | {interp}))
+
+    def test_after_destroy_some(self):
+        before = set(interpreters.list_all())
+        # Create 3 subinterpreters.
+        interp1 = interpreters.create()
+        interp2 = interpreters.create()
+        interp3 = interpreters.create()
+        # Now destroy 2 of them.
+        interp1.close()
+        interp2.close()
+        # Finally, create another.
+        interp = interpreters.create()
+        self.assertEqual(len(set(interpreters.list_all())), len(before | {interp3, interp}))
+
+
+class GetCurrentTests(TestBase):
+
+    def test_main(self):
+        main_interp_id = _interpreters.get_main()
+        cur_interp_id =  interpreters.get_current().id
+        self.assertEqual(cur_interp_id, main_interp_id)
+
+    def test_subinterpreter(self):
+        main = _interpreters.get_main()
+        interp = interpreters.create()
+        out = _run_output(interp, dedent("""
+            from test.support import interpreters
+            cur = interpreters.get_current()
+            print(cur)
+            """))
+        cur = out.strip()
+        self.assertNotEqual(cur, main)
+
+
+class ListAllTests(TestBase):
+
+    def test_initial(self):
+        interps = interpreters.list_all()
+        self.assertEqual(1, len(interps))
+
+    def test_after_creating(self):
+        main = interpreters.get_current()
+        first = interpreters.create()
+        second = interpreters.create()
+
+        ids = []
+        for interp in interpreters.list_all():
+            ids.append(interp.id)
+
+        self.assertEqual(ids, [main.id, first.id, second.id])
+
+    def test_after_destroying(self):
+        main = interpreters.get_current()
+        first = interpreters.create()
+        second = interpreters.create()
+        first.close()
+
+        ids = []
+        for interp in interpreters.list_all():
+            ids.append(interp.id)
+
+        self.assertEqual(ids, [main.id, second.id])
+
+
+class TestInterpreterId(TestBase):
+
+    def test_in_main(self):
+        main = interpreters.get_current()
+        self.assertEqual(0, main.id)
+
+    def test_with_custom_num(self):
+        interp = interpreters.Interpreter(1)
+        self.assertEqual(1, interp.id)
+
+    def test_for_readonly_property(self):
+        interp = interpreters.Interpreter(1)
+        with self.assertRaises(AttributeError):
+            interp.id = 2
+
+
+class TestInterpreterIsRunning(TestBase):
+
+    def test_main(self):
+        main = interpreters.get_current()
+        self.assertTrue(main.is_running())
+
+    def test_subinterpreter(self):
+        interp = interpreters.create()
+        self.assertFalse(interp.is_running())
+
+        with _running(interp):
+            self.assertTrue(interp.is_running())
+        self.assertFalse(interp.is_running())
+
+    def test_from_subinterpreter(self):
+        interp = interpreters.create()
+        out = _run_output(interp, dedent(f"""
+            import _xxsubinterpreters as _interpreters
+            if _interpreters.is_running({interp.id}):
+                print(True)
+            else:
+                print(False)
+            """))
+        self.assertEqual(out.strip(), 'True')
+
+    def test_already_destroyed(self):
+        interp = interpreters.create()
+        interp.close()
+        with self.assertRaises(RuntimeError):
+            interp.is_running()
+
+
+class TestInterpreterDestroy(TestBase):
+
+    def test_basic(self):
+        interp1 = interpreters.create()
+        interp2 = interpreters.create()
+        interp3 = interpreters.create()
+        self.assertEqual(4, len(interpreters.list_all()))
+        interp2.close()
+        self.assertEqual(3, len(interpreters.list_all()))
+
+    def test_all(self):
+        before = set(interpreters.list_all())
+        interps = set()
+        for _ in range(3):
+            interp = interpreters.create()
+            interps.add(interp)
+        self.assertEqual(len(set(interpreters.list_all())), len(before | interps))
+        for interp in interps:
+            interp.close()
+        self.assertEqual(len(set(interpreters.list_all())), len(before))
+
+    def test_main(self):
+        main, = interpreters.list_all()
+        with self.assertRaises(RuntimeError):
+            main.close()
+
+        def f():
+            with self.assertRaises(RuntimeError):
+                main.close()
+
+        t = threading.Thread(target=f)
+        t.start()
+        t.join()
+
+    def test_already_destroyed(self):
+        interp = interpreters.create()
+        interp.close()
+        with self.assertRaises(RuntimeError):
+            interp.close()
+
+    def test_from_current(self):
+        main, = interpreters.list_all()
+        interp = interpreters.create()
+        script = dedent(f"""
+            from test.support import interpreters
+            try:
+                main = interpreters.get_current()
+                main.close()
+            except RuntimeError:
+                pass
+            """)
+
+        interp.run(script)
+        self.assertEqual(len(set(interpreters.list_all())), len({main, interp}))
+
+    def test_from_sibling(self):
+        main, = interpreters.list_all()
+        interp1 = interpreters.create()
+        script = dedent(f"""
+            from test.support import interpreters
+            interp2 = interpreters.create()
+            interp2.close()
+            """)
+        interp1.run(script)
+
+        self.assertEqual(len(set(interpreters.list_all())), len({main, interp1}))
+
+    def test_from_other_thread(self):
+        interp = interpreters.create()
+        def f():
+            interp.close()
+
+        t = threading.Thread(target=f)
+        t.start()
+        t.join()
+
+    def test_still_running(self):
+        main, = interpreters.list_all()
+        interp = interpreters.create()
+        with _running(interp):
+            with self.assertRaises(RuntimeError):
+                interp.close()
+            self.assertTrue(interp.is_running())
+
+
+class TestInterpreterRun(TestBase):
+
+    SCRIPT = dedent("""
+        with open('{}', 'w') as out:
+            out.write('{}')
+        """)
+    FILENAME = 'spam'
+
+    def setUp(self):
+        super().setUp()
+        self.interp = interpreters.create()
+        self._fs = None
+
+    def tearDown(self):
+        if self._fs is not None:
+            self._fs.close()
+        super().tearDown()
+
+    @property
+    def fs(self):
+        if self._fs is None:
+            self._fs = FSFixture(self)
+        return self._fs
+
+    def test_success(self):
+        script, file = _captured_script('print("it worked!", end="")')
+        with file:
+            self.interp.run(script)
+            out = file.read()
+
+        self.assertEqual(out, 'it worked!')
+
+    def test_in_thread(self):
+        script, file = _captured_script('print("it worked!", end="")')
+        with file:
+            def f():
+                self.interp.run(script)
+
+            t = threading.Thread(target=f)
+            t.start()
+            t.join()
+            out = file.read()
+
+        self.assertEqual(out, 'it worked!')
+
+    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
+    def test_fork(self):
+        import tempfile
+        with tempfile.NamedTemporaryFile('w+') as file:
+            file.write('')
+            file.flush()
+
+            expected = 'spam spam spam spam spam'
+            script = dedent(f"""
+                import os
+                try:
+                    os.fork()
+                except RuntimeError:
+                    with open('{file.name}', 'w') as out:
+                        out.write('{expected}')
+                """)
+            self.interp.run(script)
+
+            file.seek(0)
+            content = file.read()
+            self.assertEqual(content, expected)
+
+    def test_already_running(self):
+        with _running(self.interp):
+            with self.assertRaises(RuntimeError):
+                self.interp.run('print("spam")')
+
+    def test_bad_script(self):
+        with self.assertRaises(TypeError):
+            self.interp.run(10)
+
+    def test_bytes_for_script(self):
+        with self.assertRaises(TypeError):
+            self.interp.run(b'print("spam")')
+
+
+class TestIsShareable(TestBase):
+
+    def test_default_shareables(self):
+        shareables = [
+                # singletons
+                None,
+                # builtin objects
+                b'spam',
+                'spam',
+                10,
+                -10,
+                ]
+        for obj in shareables:
+            with self.subTest(obj):
+                self.assertTrue(
+                    interpreters.is_shareable(obj))
+
+    def test_not_shareable(self):
+        class Cheese:
+            def __init__(self, name):
+                self.name = name
+            def __str__(self):
+                return self.name
+
+        class SubBytes(bytes):
+            """A subclass of a shareable type."""
+
+        not_shareables = [
+                # singletons
+                True,
+                False,
+                NotImplemented,
+                ...,
+                # builtin types and objects
+                type,
+                object,
+                object(),
+                Exception(),
+                100.0,
+                # user-defined types and objects
+                Cheese,
+                Cheese('Wensleydale'),
+                SubBytes(b'spam'),
+                ]
+        for obj in not_shareables:
+            with self.subTest(repr(obj)):
+                self.assertFalse(
+                    interpreters.is_shareable(obj))
+
+
+class TestChannel(TestBase):
+
+    def test_create_cid(self):
+        r, s = interpreters.create_channel()
+        self.assertIsInstance(r, interpreters.RecvChannel)
+        self.assertIsInstance(s, interpreters.SendChannel)
+
+    def test_sequential_ids(self):
+        before = interpreters.list_all_channels()
+        channels1 = interpreters.create_channel()
+        channels2 = interpreters.create_channel()
+        channels3 = interpreters.create_channel()
+        after = interpreters.list_all_channels()
+
+        self.assertEqual(len(set(after) - set(before)),
+                         len({channels1, channels2, channels3}))
+
+
+class TestSendRecv(TestBase):
+
+    def test_send_recv_main(self):
+        r, s = interpreters.create_channel()
+        orig = b'spam'
+        s.send(orig)
+        obj = r.recv()
+
+        self.assertEqual(obj, orig)
+        self.assertIsNot(obj, orig)
+
+    def test_send_recv_same_interpreter(self):
+        interp = interpreters.create()
+        out = _run_output(interp, dedent("""
+            from test.support import interpreters
+            r, s = interpreters.create_channel()
+            orig = b'spam'
+            s.send(orig)
+            obj = r.recv()
+            assert obj is not orig
+            assert obj == orig
+            """))
+
+    def test_send_recv_different_threads(self):
+        r, s = interpreters.create_channel()
+
+        def f():
+            while True:
+                try:
+                    obj = r.recv()
+                    break
+                except interpreters.ChannelEmptyError:
+                    time.sleep(0.1)
+            s.send(obj)
+        t = threading.Thread(target=f)
+        t.start()
+
+        s.send(b'spam')
+        t.join()
+        obj = r.recv()
+
+        self.assertEqual(obj, b'spam')
+
+    def test_send_recv_nowait_main(self):
+        r, s = interpreters.create_channel()
+        orig = b'spam'
+        s.send(orig)
+        obj = r.recv_nowait()
+
+        self.assertEqual(obj, orig)
+        self.assertIsNot(obj, orig)
+
+    def test_send_recv_nowait_same_interpreter(self):
+        interp = interpreters.create()
+        out = _run_output(interp, dedent("""
+            from test.support import interpreters
+            r, s = interpreters.create_channel()
+            orig = b'spam'
+            s.send(orig)
+            obj = r.recv_nowait()
+            assert obj is not orig
+            assert obj == orig
+            """))
+
+        r, s = interpreters.create_channel()
+
+        def f():
+            while True:
+                try:
+                    obj = r.recv_nowait()
+                    break
+                except _interpreters.ChannelEmptyError:
+                    time.sleep(0.1)
+            s.send(obj)