Nick Coghlan | 39f0bb5 | 2017-11-28 08:11:51 +1000 | [diff] [blame] | 1 | # Run the tests in Programs/_testembed.c (tests for the CPython embedding APIs) |
| 2 | from test import support |
| 3 | import unittest |
| 4 | |
| 5 | from collections import namedtuple |
| 6 | import os |
| 7 | import re |
| 8 | import subprocess |
| 9 | import sys |
| 10 | |
| 11 | |
| 12 | class EmbeddingTests(unittest.TestCase): |
| 13 | def setUp(self): |
| 14 | here = os.path.abspath(__file__) |
| 15 | basepath = os.path.dirname(os.path.dirname(os.path.dirname(here))) |
| 16 | exename = "_testembed" |
| 17 | if sys.platform.startswith("win"): |
| 18 | ext = ("_d" if "_d" in sys.executable else "") + ".exe" |
| 19 | exename += ext |
| 20 | exepath = os.path.dirname(sys.executable) |
| 21 | else: |
| 22 | exepath = os.path.join(basepath, "Programs") |
| 23 | self.test_exe = exe = os.path.join(exepath, exename) |
| 24 | if not os.path.exists(exe): |
| 25 | self.skipTest("%r doesn't exist" % exe) |
| 26 | # This is needed otherwise we get a fatal error: |
| 27 | # "Py_Initialize: Unable to get the locale encoding |
| 28 | # LookupError: no codec search functions registered: can't find encoding" |
| 29 | self.oldcwd = os.getcwd() |
| 30 | os.chdir(basepath) |
| 31 | |
| 32 | def tearDown(self): |
| 33 | os.chdir(self.oldcwd) |
| 34 | |
| 35 | def run_embedded_interpreter(self, *args, env=None): |
| 36 | """Runs a test in the embedded interpreter""" |
| 37 | cmd = [self.test_exe] |
| 38 | cmd.extend(args) |
| 39 | if env is not None and sys.platform == 'win32': |
| 40 | # Windows requires at least the SYSTEMROOT environment variable to |
| 41 | # start Python. |
| 42 | env = env.copy() |
| 43 | env['SYSTEMROOT'] = os.environ['SYSTEMROOT'] |
| 44 | |
| 45 | p = subprocess.Popen(cmd, |
| 46 | stdout=subprocess.PIPE, |
| 47 | stderr=subprocess.PIPE, |
| 48 | universal_newlines=True, |
| 49 | env=env) |
| 50 | (out, err) = p.communicate() |
| 51 | if p.returncode != 0 and support.verbose: |
| 52 | print(f"--- {cmd} failed ---") |
| 53 | print(f"stdout:\n{out}") |
Miss Islington (bot) | c6d94c3 | 2018-03-25 04:27:57 -0700 | [diff] [blame] | 54 | print(f"stderr:\n{err}") |
Nick Coghlan | 39f0bb5 | 2017-11-28 08:11:51 +1000 | [diff] [blame] | 55 | print(f"------") |
| 56 | |
| 57 | self.assertEqual(p.returncode, 0, |
| 58 | "bad returncode %d, stderr is %r" % |
| 59 | (p.returncode, err)) |
| 60 | return out, err |
| 61 | |
| 62 | def run_repeated_init_and_subinterpreters(self): |
| 63 | out, err = self.run_embedded_interpreter("repeated_init_and_subinterpreters") |
| 64 | self.assertEqual(err, "") |
| 65 | |
| 66 | # The output from _testembed looks like this: |
| 67 | # --- Pass 0 --- |
| 68 | # interp 0 <0x1cf9330>, thread state <0x1cf9700>: id(modules) = 139650431942728 |
| 69 | # interp 1 <0x1d4f690>, thread state <0x1d35350>: id(modules) = 139650431165784 |
| 70 | # interp 2 <0x1d5a690>, thread state <0x1d99ed0>: id(modules) = 139650413140368 |
| 71 | # interp 3 <0x1d4f690>, thread state <0x1dc3340>: id(modules) = 139650412862200 |
| 72 | # interp 0 <0x1cf9330>, thread state <0x1cf9700>: id(modules) = 139650431942728 |
| 73 | # --- Pass 1 --- |
| 74 | # ... |
| 75 | |
| 76 | interp_pat = (r"^interp (\d+) <(0x[\dA-F]+)>, " |
| 77 | r"thread state <(0x[\dA-F]+)>: " |
| 78 | r"id\(modules\) = ([\d]+)$") |
| 79 | Interp = namedtuple("Interp", "id interp tstate modules") |
| 80 | |
| 81 | numloops = 0 |
| 82 | current_run = [] |
| 83 | for line in out.splitlines(): |
| 84 | if line == "--- Pass {} ---".format(numloops): |
| 85 | self.assertEqual(len(current_run), 0) |
Miss Islington (bot) | c6d94c3 | 2018-03-25 04:27:57 -0700 | [diff] [blame] | 86 | if support.verbose > 1: |
Nick Coghlan | 39f0bb5 | 2017-11-28 08:11:51 +1000 | [diff] [blame] | 87 | print(line) |
| 88 | numloops += 1 |
| 89 | continue |
| 90 | |
| 91 | self.assertLess(len(current_run), 5) |
| 92 | match = re.match(interp_pat, line) |
| 93 | if match is None: |
| 94 | self.assertRegex(line, interp_pat) |
| 95 | |
| 96 | # Parse the line from the loop. The first line is the main |
| 97 | # interpreter and the 3 afterward are subinterpreters. |
| 98 | interp = Interp(*match.groups()) |
Miss Islington (bot) | c6d94c3 | 2018-03-25 04:27:57 -0700 | [diff] [blame] | 99 | if support.verbose > 1: |
Nick Coghlan | 39f0bb5 | 2017-11-28 08:11:51 +1000 | [diff] [blame] | 100 | print(interp) |
| 101 | self.assertTrue(interp.interp) |
| 102 | self.assertTrue(interp.tstate) |
| 103 | self.assertTrue(interp.modules) |
| 104 | current_run.append(interp) |
| 105 | |
| 106 | # The last line in the loop should be the same as the first. |
| 107 | if len(current_run) == 5: |
| 108 | main = current_run[0] |
| 109 | self.assertEqual(interp, main) |
| 110 | yield current_run |
| 111 | current_run = [] |
| 112 | |
| 113 | def test_subinterps_main(self): |
| 114 | for run in self.run_repeated_init_and_subinterpreters(): |
| 115 | main = run[0] |
| 116 | |
| 117 | self.assertEqual(main.id, '0') |
| 118 | |
| 119 | def test_subinterps_different_ids(self): |
| 120 | for run in self.run_repeated_init_and_subinterpreters(): |
| 121 | main, *subs, _ = run |
| 122 | |
| 123 | mainid = int(main.id) |
| 124 | for i, sub in enumerate(subs): |
| 125 | self.assertEqual(sub.id, str(mainid + i + 1)) |
| 126 | |
| 127 | def test_subinterps_distinct_state(self): |
| 128 | for run in self.run_repeated_init_and_subinterpreters(): |
| 129 | main, *subs, _ = run |
| 130 | |
| 131 | if '0x0' in main: |
| 132 | # XXX Fix on Windows (and other platforms): something |
| 133 | # is going on with the pointers in Programs/_testembed.c. |
| 134 | # interp.interp is 0x0 and interp.modules is the same |
| 135 | # between interpreters. |
| 136 | raise unittest.SkipTest('platform prints pointers as 0x0') |
| 137 | |
| 138 | for sub in subs: |
| 139 | # A new subinterpreter may have the same |
| 140 | # PyInterpreterState pointer as a previous one if |
| 141 | # the earlier one has already been destroyed. So |
| 142 | # we compare with the main interpreter. The same |
| 143 | # applies to tstate. |
| 144 | self.assertNotEqual(sub.interp, main.interp) |
| 145 | self.assertNotEqual(sub.tstate, main.tstate) |
| 146 | self.assertNotEqual(sub.modules, main.modules) |
| 147 | |
| 148 | def test_forced_io_encoding(self): |
| 149 | # Checks forced configuration of embedded interpreter IO streams |
| 150 | env = dict(os.environ, PYTHONIOENCODING="utf-8:surrogateescape") |
| 151 | out, err = self.run_embedded_interpreter("forced_io_encoding", env=env) |
| 152 | if support.verbose > 1: |
| 153 | print() |
| 154 | print(out) |
| 155 | print(err) |
| 156 | expected_stream_encoding = "utf-8" |
| 157 | expected_errors = "surrogateescape" |
| 158 | expected_output = '\n'.join([ |
| 159 | "--- Use defaults ---", |
| 160 | "Expected encoding: default", |
| 161 | "Expected errors: default", |
| 162 | "stdin: {in_encoding}:{errors}", |
| 163 | "stdout: {out_encoding}:{errors}", |
| 164 | "stderr: {out_encoding}:backslashreplace", |
| 165 | "--- Set errors only ---", |
| 166 | "Expected encoding: default", |
| 167 | "Expected errors: ignore", |
| 168 | "stdin: {in_encoding}:ignore", |
| 169 | "stdout: {out_encoding}:ignore", |
| 170 | "stderr: {out_encoding}:backslashreplace", |
| 171 | "--- Set encoding only ---", |
| 172 | "Expected encoding: latin-1", |
| 173 | "Expected errors: default", |
| 174 | "stdin: latin-1:{errors}", |
| 175 | "stdout: latin-1:{errors}", |
| 176 | "stderr: latin-1:backslashreplace", |
| 177 | "--- Set encoding and errors ---", |
| 178 | "Expected encoding: latin-1", |
| 179 | "Expected errors: replace", |
| 180 | "stdin: latin-1:replace", |
| 181 | "stdout: latin-1:replace", |
| 182 | "stderr: latin-1:backslashreplace"]) |
| 183 | expected_output = expected_output.format( |
| 184 | in_encoding=expected_stream_encoding, |
| 185 | out_encoding=expected_stream_encoding, |
| 186 | errors=expected_errors) |
| 187 | # This is useful if we ever trip over odd platform behaviour |
| 188 | self.maxDiff = None |
| 189 | self.assertEqual(out.strip(), expected_output) |
| 190 | |
| 191 | def test_pre_initialization_api(self): |
| 192 | """ |
Miss Islington (bot) | c6d94c3 | 2018-03-25 04:27:57 -0700 | [diff] [blame] | 193 | Checks some key parts of the C-API that need to work before the runtine |
Nick Coghlan | 39f0bb5 | 2017-11-28 08:11:51 +1000 | [diff] [blame] | 194 | is initialized (via Py_Initialize()). |
| 195 | """ |
| 196 | env = dict(os.environ, PYTHONPATH=os.pathsep.join(sys.path)) |
| 197 | out, err = self.run_embedded_interpreter("pre_initialization_api", env=env) |
Miss Islington (bot) | c6d94c3 | 2018-03-25 04:27:57 -0700 | [diff] [blame] | 198 | if sys.platform == "win32": |
| 199 | expected_path = self.test_exe |
| 200 | else: |
| 201 | expected_path = os.path.join(os.getcwd(), "spam") |
| 202 | expected_output = f"sys.executable: {expected_path}\n" |
| 203 | self.assertIn(expected_output, out) |
| 204 | self.assertEqual(err, '') |
| 205 | |
| 206 | def test_pre_initialization_sys_options(self): |
| 207 | """ |
| 208 | Checks that sys.warnoptions and sys._xoptions can be set before the |
| 209 | runtime is initialized (otherwise they won't be effective). |
| 210 | """ |
Miss Islington (bot) | dd3ede7 | 2018-04-27 05:41:25 -0700 | [diff] [blame] | 211 | env = dict(os.environ, PYTHONPATH=os.pathsep.join(sys.path)) |
Miss Islington (bot) | c6d94c3 | 2018-03-25 04:27:57 -0700 | [diff] [blame] | 212 | out, err = self.run_embedded_interpreter( |
| 213 | "pre_initialization_sys_options", env=env) |
| 214 | expected_output = ( |
| 215 | "sys.warnoptions: ['once', 'module', 'default']\n" |
| 216 | "sys._xoptions: {'not_an_option': '1', 'also_not_an_option': '2'}\n" |
| 217 | "warnings.filters[:3]: ['default', 'module', 'once']\n" |
| 218 | ) |
| 219 | self.assertIn(expected_output, out) |
Nick Coghlan | 39f0bb5 | 2017-11-28 08:11:51 +1000 | [diff] [blame] | 220 | self.assertEqual(err, '') |
| 221 | |
Victor Stinner | b4d1e1f | 2017-11-30 22:05:00 +0100 | [diff] [blame] | 222 | def test_bpo20891(self): |
| 223 | """ |
| 224 | bpo-20891: Calling PyGILState_Ensure in a non-Python thread before |
| 225 | calling PyEval_InitThreads() must not crash. PyGILState_Ensure() must |
| 226 | call PyEval_InitThreads() for us in this case. |
| 227 | """ |
| 228 | out, err = self.run_embedded_interpreter("bpo20891") |
| 229 | self.assertEqual(out, '') |
| 230 | self.assertEqual(err, '') |
| 231 | |
Miss Islington (bot) | 3747dd1 | 2018-06-22 10:33:48 -0700 | [diff] [blame] | 232 | def test_initialize_twice(self): |
| 233 | """ |
| 234 | bpo-33932: Calling Py_Initialize() twice should do nothing (and not |
| 235 | crash!). |
| 236 | """ |
| 237 | out, err = self.run_embedded_interpreter("initialize_twice") |
| 238 | self.assertEqual(out, '') |
| 239 | self.assertEqual(err, '') |
| 240 | |
Miss Islington (bot) | 03ec4df | 2018-07-20 17:16:22 -0700 | [diff] [blame] | 241 | def test_initialize_pymain(self): |
| 242 | """ |
| 243 | bpo-34008: Calling Py_Main() after Py_Initialize() must not fail. |
| 244 | """ |
| 245 | out, err = self.run_embedded_interpreter("initialize_pymain") |
| 246 | self.assertEqual(out.rstrip(), "Py_Main() after Py_Initialize: sys.argv=['-c', 'arg2']") |
| 247 | self.assertEqual(err, '') |
| 248 | |
Nick Coghlan | 39f0bb5 | 2017-11-28 08:11:51 +1000 | [diff] [blame] | 249 | |
| 250 | if __name__ == "__main__": |
| 251 | unittest.main() |