| # Run the tests in Programs/_testembed.c (tests for the CPython embedding APIs) |
| from test import support |
| import unittest |
| |
| from collections import namedtuple |
| import os |
| import re |
| import subprocess |
| import sys |
| |
| |
| class EmbeddingTests(unittest.TestCase): |
| def setUp(self): |
| here = os.path.abspath(__file__) |
| basepath = os.path.dirname(os.path.dirname(os.path.dirname(here))) |
| exename = "_testembed" |
| if sys.platform.startswith("win"): |
| ext = ("_d" if "_d" in sys.executable else "") + ".exe" |
| exename += ext |
| exepath = os.path.dirname(sys.executable) |
| else: |
| exepath = os.path.join(basepath, "Programs") |
| self.test_exe = exe = os.path.join(exepath, exename) |
| if not os.path.exists(exe): |
| self.skipTest("%r doesn't exist" % exe) |
| # This is needed otherwise we get a fatal error: |
| # "Py_Initialize: Unable to get the locale encoding |
| # LookupError: no codec search functions registered: can't find encoding" |
| self.oldcwd = os.getcwd() |
| os.chdir(basepath) |
| |
| def tearDown(self): |
| os.chdir(self.oldcwd) |
| |
| def run_embedded_interpreter(self, *args, env=None): |
| """Runs a test in the embedded interpreter""" |
| cmd = [self.test_exe] |
| cmd.extend(args) |
| if env is not None and sys.platform == 'win32': |
| # Windows requires at least the SYSTEMROOT environment variable to |
| # start Python. |
| env = env.copy() |
| env['SYSTEMROOT'] = os.environ['SYSTEMROOT'] |
| |
| p = subprocess.Popen(cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| universal_newlines=True, |
| env=env) |
| (out, err) = p.communicate() |
| if p.returncode != 0 and support.verbose: |
| print(f"--- {cmd} failed ---") |
| print(f"stdout:\n{out}") |
| print(f"stderr:\n{out}") |
| print(f"------") |
| |
| self.assertEqual(p.returncode, 0, |
| "bad returncode %d, stderr is %r" % |
| (p.returncode, err)) |
| return out, err |
| |
| def run_repeated_init_and_subinterpreters(self): |
| out, err = self.run_embedded_interpreter("repeated_init_and_subinterpreters") |
| self.assertEqual(err, "") |
| |
| # The output from _testembed looks like this: |
| # --- Pass 0 --- |
| # interp 0 <0x1cf9330>, thread state <0x1cf9700>: id(modules) = 139650431942728 |
| # interp 1 <0x1d4f690>, thread state <0x1d35350>: id(modules) = 139650431165784 |
| # interp 2 <0x1d5a690>, thread state <0x1d99ed0>: id(modules) = 139650413140368 |
| # interp 3 <0x1d4f690>, thread state <0x1dc3340>: id(modules) = 139650412862200 |
| # interp 0 <0x1cf9330>, thread state <0x1cf9700>: id(modules) = 139650431942728 |
| # --- Pass 1 --- |
| # ... |
| |
| interp_pat = (r"^interp (\d+) <(0x[\dA-F]+)>, " |
| r"thread state <(0x[\dA-F]+)>: " |
| r"id\(modules\) = ([\d]+)$") |
| Interp = namedtuple("Interp", "id interp tstate modules") |
| |
| numloops = 0 |
| current_run = [] |
| for line in out.splitlines(): |
| if line == "--- Pass {} ---".format(numloops): |
| self.assertEqual(len(current_run), 0) |
| if support.verbose: |
| print(line) |
| numloops += 1 |
| continue |
| |
| self.assertLess(len(current_run), 5) |
| match = re.match(interp_pat, line) |
| if match is None: |
| self.assertRegex(line, interp_pat) |
| |
| # Parse the line from the loop. The first line is the main |
| # interpreter and the 3 afterward are subinterpreters. |
| interp = Interp(*match.groups()) |
| if support.verbose: |
| print(interp) |
| self.assertTrue(interp.interp) |
| self.assertTrue(interp.tstate) |
| self.assertTrue(interp.modules) |
| current_run.append(interp) |
| |
| # The last line in the loop should be the same as the first. |
| if len(current_run) == 5: |
| main = current_run[0] |
| self.assertEqual(interp, main) |
| yield current_run |
| current_run = [] |
| |
| def test_subinterps_main(self): |
| for run in self.run_repeated_init_and_subinterpreters(): |
| main = run[0] |
| |
| self.assertEqual(main.id, '0') |
| |
| def test_subinterps_different_ids(self): |
| for run in self.run_repeated_init_and_subinterpreters(): |
| main, *subs, _ = run |
| |
| mainid = int(main.id) |
| for i, sub in enumerate(subs): |
| self.assertEqual(sub.id, str(mainid + i + 1)) |
| |
| def test_subinterps_distinct_state(self): |
| for run in self.run_repeated_init_and_subinterpreters(): |
| main, *subs, _ = run |
| |
| if '0x0' in main: |
| # XXX Fix on Windows (and other platforms): something |
| # is going on with the pointers in Programs/_testembed.c. |
| # interp.interp is 0x0 and interp.modules is the same |
| # between interpreters. |
| raise unittest.SkipTest('platform prints pointers as 0x0') |
| |
| for sub in subs: |
| # A new subinterpreter may have the same |
| # PyInterpreterState pointer as a previous one if |
| # the earlier one has already been destroyed. So |
| # we compare with the main interpreter. The same |
| # applies to tstate. |
| self.assertNotEqual(sub.interp, main.interp) |
| self.assertNotEqual(sub.tstate, main.tstate) |
| self.assertNotEqual(sub.modules, main.modules) |
| |
| def test_forced_io_encoding(self): |
| # Checks forced configuration of embedded interpreter IO streams |
| env = dict(os.environ, PYTHONIOENCODING="utf-8:surrogateescape") |
| out, err = self.run_embedded_interpreter("forced_io_encoding", env=env) |
| if support.verbose > 1: |
| print() |
| print(out) |
| print(err) |
| expected_stream_encoding = "utf-8" |
| expected_errors = "surrogateescape" |
| expected_output = '\n'.join([ |
| "--- Use defaults ---", |
| "Expected encoding: default", |
| "Expected errors: default", |
| "stdin: {in_encoding}:{errors}", |
| "stdout: {out_encoding}:{errors}", |
| "stderr: {out_encoding}:backslashreplace", |
| "--- Set errors only ---", |
| "Expected encoding: default", |
| "Expected errors: ignore", |
| "stdin: {in_encoding}:ignore", |
| "stdout: {out_encoding}:ignore", |
| "stderr: {out_encoding}:backslashreplace", |
| "--- Set encoding only ---", |
| "Expected encoding: latin-1", |
| "Expected errors: default", |
| "stdin: latin-1:{errors}", |
| "stdout: latin-1:{errors}", |
| "stderr: latin-1:backslashreplace", |
| "--- Set encoding and errors ---", |
| "Expected encoding: latin-1", |
| "Expected errors: replace", |
| "stdin: latin-1:replace", |
| "stdout: latin-1:replace", |
| "stderr: latin-1:backslashreplace"]) |
| expected_output = expected_output.format( |
| in_encoding=expected_stream_encoding, |
| out_encoding=expected_stream_encoding, |
| errors=expected_errors) |
| # This is useful if we ever trip over odd platform behaviour |
| self.maxDiff = None |
| self.assertEqual(out.strip(), expected_output) |
| |
| def test_pre_initialization_api(self): |
| """ |
| Checks the few parts of the C-API that work before the runtine |
| is initialized (via Py_Initialize()). |
| """ |
| env = dict(os.environ, PYTHONPATH=os.pathsep.join(sys.path)) |
| out, err = self.run_embedded_interpreter("pre_initialization_api", env=env) |
| self.assertEqual(out, '') |
| self.assertEqual(err, '') |
| |
| def test_bpo20891(self): |
| """ |
| bpo-20891: Calling PyGILState_Ensure in a non-Python thread before |
| calling PyEval_InitThreads() must not crash. PyGILState_Ensure() must |
| call PyEval_InitThreads() for us in this case. |
| """ |
| out, err = self.run_embedded_interpreter("bpo20891") |
| self.assertEqual(out, '') |
| self.assertEqual(err, '') |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |