blob: e1f9ae9125956a5d3915ae2160274bed5d12986b [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00004from __future__ import with_statement
Antoine Pitrou8e605772011-04-25 21:21:07 +02005import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01006import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00007import random
8import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00009import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +000010import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000011import unittest
Nick Coghlan7d270ee2013-10-17 22:35:35 +100012import textwrap
Benjamin Petersonee8712c2008-05-20 21:35:26 +000013from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000014try:
Stefan Krahfd24f9e2012-08-20 11:04:24 +020015 import _posixsubprocess
16except ImportError:
17 _posixsubprocess = None
18try:
Victor Stinner45df8202010-04-28 22:31:17 +000019 import threading
20except ImportError:
21 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000022import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000023
Benjamin Petersona54c9092009-01-13 02:11:23 +000024
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000025def testfunction(self):
26 """some doc"""
27 return self
28
29class InstanceMethod:
30 id = _testcapi.instancemethod(id)
31 testfunction = _testcapi.instancemethod(testfunction)
32
33class CAPITest(unittest.TestCase):
34
35 def test_instancemethod(self):
36 inst = InstanceMethod()
37 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000038 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000039 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
40 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
41
42 InstanceMethod.testfunction.attribute = "test"
43 self.assertEqual(testfunction.attribute, "test")
44 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
45
Stefan Krah0ca46242010-06-09 08:56:28 +000046 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000047 def test_no_FatalError_infinite_loop(self):
Antoine Pitrou77e904e2013-10-08 23:04:32 +020048 with support.SuppressCrashReport():
Ezio Melotti25a40452013-03-05 20:26:17 +020049 p = subprocess.Popen([sys.executable, "-c",
Ezio Melottie1857d92013-03-05 20:31:34 +020050 'import _testcapi;'
51 '_testcapi.crash_no_current_thread()'],
52 stdout=subprocess.PIPE,
53 stderr=subprocess.PIPE)
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000054 (out, err) = p.communicate()
55 self.assertEqual(out, b'')
56 # This used to cause an infinite loop.
Vinay Sajip73954042012-05-06 11:34:50 +010057 self.assertTrue(err.rstrip().startswith(
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000058 b'Fatal Python error:'
Vinay Sajip73954042012-05-06 11:34:50 +010059 b' PyThreadState_Get: no current thread'))
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000060
Antoine Pitrou915605c2011-02-24 20:53:48 +000061 def test_memoryview_from_NULL_pointer(self):
62 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000063
Martin v. Löwisaa2efcb2012-04-19 14:33:43 +020064 def test_exc_info(self):
65 raised_exception = ValueError("5")
66 new_exc = TypeError("TEST")
67 try:
68 raise raised_exception
69 except ValueError as e:
70 tb = e.__traceback__
71 orig_sys_exc_info = sys.exc_info()
72 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
73 new_sys_exc_info = sys.exc_info()
74 new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
75 reset_sys_exc_info = sys.exc_info()
76
77 self.assertEqual(orig_exc_info[1], e)
78
79 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
80 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
81 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
82 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
83 self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
84 else:
85 self.assertTrue(False)
86
Stefan Krahfd24f9e2012-08-20 11:04:24 +020087 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
88 def test_seq_bytes_to_charp_array(self):
89 # Issue #15732: crash in _PySequence_BytesToCharpArray()
90 class Z(object):
91 def __len__(self):
92 return 1
93 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
94 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krah7cacd2e2012-08-21 08:16:09 +020095 # Issue #15736: overflow in _PySequence_BytesToCharpArray()
96 class Z(object):
97 def __len__(self):
98 return sys.maxsize
99 def __getitem__(self, i):
100 return b'x'
101 self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
102 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krahfd24f9e2012-08-20 11:04:24 +0200103
Stefan Krahdb579d72012-08-20 14:36:47 +0200104 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
105 def test_subprocess_fork_exec(self):
106 class Z(object):
107 def __len__(self):
108 return 1
109
110 # Issue #15738: crash in subprocess_fork_exec()
111 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
112 Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
113
Victor Stinner45df8202010-04-28 22:31:17 +0000114@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +0000115class TestPendingCalls(unittest.TestCase):
116
117 def pendingcalls_submit(self, l, n):
118 def callback():
119 #this function can be interrupted by thread switching so let's
120 #use an atomic operation
121 l.append(None)
122
123 for i in range(n):
124 time.sleep(random.random()*0.02) #0.01 secs on average
125 #try submitting callback until successful.
126 #rely on regular interrupt to flush queue if we are
127 #unsuccessful.
128 while True:
129 if _testcapi._pending_threadfunc(callback):
130 break;
131
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000132 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000133 #now, stick around until l[0] has grown to 10
134 count = 0;
135 while len(l) != n:
136 #this busy loop is where we expect to be interrupted to
137 #run our callbacks. Note that callbacks are only run on the
138 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000139 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000140 print("(%i)"%(len(l),),)
141 for i in range(1000):
142 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000143 if context and not context.event.is_set():
144 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000145 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000146 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000147 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000148 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000149 print("(%i)"%(len(l),))
150
151 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000152
153 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000154 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000155 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000156 class foo(object):pass
157 context = foo()
158 context.l = []
159 context.n = 2 #submits per thread
160 context.nThreads = n // context.n
161 context.nFinished = 0
162 context.lock = threading.Lock()
163 context.event = threading.Event()
164
165 for i in range(context.nThreads):
166 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000167 t.start()
168 threads.append(t)
169
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000170 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000171
172 for t in threads:
173 t.join()
174
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000175 def pendingcalls_thread(self, context):
176 try:
177 self.pendingcalls_submit(context.l, context.n)
178 finally:
179 with context.lock:
180 context.nFinished += 1
181 nFinished = context.nFinished
182 if False and support.verbose:
183 print("finished threads: ", nFinished)
184 if nFinished == context.nThreads:
185 context.event.set()
186
Benjamin Petersona54c9092009-01-13 02:11:23 +0000187 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200188 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000189 #once. It is ok to ask for too many, because we loop until we find a slot.
190 #the loop can be interrupted to dispatch.
191 #there are only 32 dispatch slots, so we go for twice that!
192 l = []
193 n = 64
194 self.pendingcalls_submit(l, n)
195 self.pendingcalls_wait(l, n)
196
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200197
198class SubinterpreterTest(unittest.TestCase):
199
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100200 def test_subinterps(self):
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100201 import builtins
202 r, w = os.pipe()
203 code = """if 1:
204 import sys, builtins, pickle
205 with open({:d}, "wb") as f:
206 pickle.dump(id(sys.modules), f)
207 pickle.dump(id(builtins), f)
208 """.format(w)
209 with open(r, "rb") as f:
210 ret = _testcapi.run_in_subinterp(code)
211 self.assertEqual(ret, 0)
212 self.assertNotEqual(pickle.load(f), id(sys.modules))
213 self.assertNotEqual(pickle.load(f), id(builtins))
214
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200215
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000216# Bug #6012
217class Test6012(unittest.TestCase):
218 def test(self):
219 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000220
Antoine Pitrou8e605772011-04-25 21:21:07 +0200221
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000222@unittest.skipIf(
223 sys.platform.startswith('win'),
224 "interpreter embedding tests aren't built under Windows")
225class EmbeddingTests(unittest.TestCase):
226 # XXX only tested under Unix checkouts
Antoine Pitrou8e605772011-04-25 21:21:07 +0200227
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000228 def setUp(self):
Antoine Pitrou8e605772011-04-25 21:21:07 +0200229 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000230 self.test_exe = exe = os.path.join(basepath, "Modules", "_testembed")
231 if not os.path.exists(exe):
232 self.skipTest("%r doesn't exist" % exe)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200233 # This is needed otherwise we get a fatal error:
234 # "Py_Initialize: Unable to get the locale encoding
235 # LookupError: no codec search functions registered: can't find encoding"
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000236 self.oldcwd = os.getcwd()
Antoine Pitrou8e605772011-04-25 21:21:07 +0200237 os.chdir(basepath)
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000238
239 def tearDown(self):
240 os.chdir(self.oldcwd)
241
242 def run_embedded_interpreter(self, *args):
243 """Runs a test in the embedded interpreter"""
244 cmd = [self.test_exe]
245 cmd.extend(args)
246 p = subprocess.Popen(cmd,
247 stdout=subprocess.PIPE,
248 stderr=subprocess.PIPE)
249 (out, err) = p.communicate()
250 self.assertEqual(p.returncode, 0,
251 "bad returncode %d, stderr is %r" %
252 (p.returncode, err))
253 return out.decode("latin1"), err.decode("latin1")
254
255 def test_subinterps(self):
256 # This is just a "don't crash" test
257 out, err = self.run_embedded_interpreter()
258 if support.verbose:
259 print()
260 print(out)
261 print(err)
262
263 def test_forced_io_encoding(self):
264 # Checks forced configuration of embedded interpreter IO streams
265 out, err = self.run_embedded_interpreter("forced_io_encoding")
266 if support.verbose:
267 print()
268 print(out)
269 print(err)
270 expected_output = textwrap.dedent("""\
271 --- Use defaults ---
272 Expected encoding: default
273 Expected errors: default
274 stdin: {0.stdin.encoding}:strict
275 stdout: {0.stdout.encoding}:strict
276 stderr: {0.stderr.encoding}:backslashreplace
277 --- Set errors only ---
278 Expected encoding: default
279 Expected errors: surrogateescape
280 stdin: {0.stdin.encoding}:surrogateescape
281 stdout: {0.stdout.encoding}:surrogateescape
282 stderr: {0.stderr.encoding}:backslashreplace
283 --- Set encoding only ---
284 Expected encoding: latin-1
285 Expected errors: default
286 stdin: latin-1:strict
287 stdout: latin-1:strict
288 stderr: latin-1:backslashreplace
289 --- Set encoding and errors ---
290 Expected encoding: latin-1
291 Expected errors: surrogateescape
292 stdin: latin-1:surrogateescape
293 stdout: latin-1:surrogateescape
294 stderr: latin-1:backslashreplace""").format(sys)
295
296 self.assertEqual(out.strip(), expected_output)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200297
Larry Hastings8f904da2012-06-22 03:56:29 -0700298class SkipitemTest(unittest.TestCase):
299
300 def test_skipitem(self):
301 """
302 If this test failed, you probably added a new "format unit"
303 in Python/getargs.c, but neglected to update our poor friend
304 skipitem() in the same file. (If so, shame on you!)
305
Larry Hastings48ed3602012-06-22 12:58:36 -0700306 With a few exceptions**, this function brute-force tests all
307 printable ASCII*** characters (32 to 126 inclusive) as format units,
308 checking to see that PyArg_ParseTupleAndKeywords() return consistent
309 errors both when the unit is attempted to be used and when it is
310 skipped. If the format unit doesn't exist, we'll get one of two
311 specific error messages (one for used, one for skipped); if it does
312 exist we *won't* get that error--we'll get either no error or some
313 other error. If we get the specific "does not exist" error for one
314 test and not for the other, there's a mismatch, and the test fails.
Larry Hastings8f904da2012-06-22 03:56:29 -0700315
Larry Hastings48ed3602012-06-22 12:58:36 -0700316 ** Some format units have special funny semantics and it would
317 be difficult to accomodate them here. Since these are all
318 well-established and properly skipped in skipitem() we can
319 get away with not testing them--this test is really intended
320 to catch *new* format units.
321
322 *** Python C source files must be ASCII. Therefore it's impossible
323 to have non-ASCII format units.
324
Larry Hastings8f904da2012-06-22 03:56:29 -0700325 """
326 empty_tuple = ()
327 tuple_1 = (0,)
328 dict_b = {'b':1}
329 keywords = ["a", "b"]
330
Larry Hastings48ed3602012-06-22 12:58:36 -0700331 for i in range(32, 127):
Larry Hastings8f904da2012-06-22 03:56:29 -0700332 c = chr(i)
333
Larry Hastings8f904da2012-06-22 03:56:29 -0700334 # skip parentheses, the error reporting is inconsistent about them
335 # skip 'e', it's always a two-character code
336 # skip '|' and '$', they don't represent arguments anyway
Larry Hastings48ed3602012-06-22 12:58:36 -0700337 if c in '()e|$':
Larry Hastings8f904da2012-06-22 03:56:29 -0700338 continue
339
340 # test the format unit when not skipped
341 format = c + "i"
342 try:
343 # (note: the format string must be bytes!)
344 _testcapi.parse_tuple_and_keywords(tuple_1, dict_b,
345 format.encode("ascii"), keywords)
346 when_not_skipped = False
347 except TypeError as e:
348 s = "argument 1 must be impossible<bad format char>, not int"
349 when_not_skipped = (str(e) == s)
350 except RuntimeError as e:
351 when_not_skipped = False
352
353 # test the format unit when skipped
354 optional_format = "|" + format
355 try:
356 _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b,
357 optional_format.encode("ascii"), keywords)
358 when_skipped = False
359 except RuntimeError as e:
360 s = "impossible<bad format char>: '{}'".format(format)
361 when_skipped = (str(e) == s)
362
363 message = ("test_skipitem_parity: "
364 "detected mismatch between convertsimple and skipitem "
365 "for format unit '{}' ({}), not skipped {}, skipped {}".format(
366 c, i, when_skipped, when_not_skipped))
367 self.assertIs(when_skipped, when_not_skipped, message)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200368
Jesus Cea6e1d2b62012-10-04 16:06:30 +0200369 def test_parse_tuple_and_keywords(self):
370 # parse_tuple_and_keywords error handling tests
371 self.assertRaises(TypeError, _testcapi.parse_tuple_and_keywords,
372 (), {}, 42, [])
373 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
374 (), {}, b'', 42)
375 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
376 (), {}, b'', [''] * 42)
377 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
378 (), {}, b'', [42])
379
Ezio Melotti29267c82013-02-23 05:52:46 +0200380@unittest.skipUnless(threading, 'Threading required for this test.')
381class TestThreadState(unittest.TestCase):
382
383 @support.reap_threads
384 def test_thread_state(self):
385 # some extra thread-state tests driven via _testcapi
386 def target():
387 idents = []
388
389 def callback():
Ezio Melotti35246bd2013-02-23 05:58:38 +0200390 idents.append(threading.get_ident())
Ezio Melotti29267c82013-02-23 05:52:46 +0200391
392 _testcapi._test_thread_state(callback)
393 a = b = callback
394 time.sleep(1)
395 # Check our main thread is in the list exactly 3 times.
Ezio Melotti35246bd2013-02-23 05:58:38 +0200396 self.assertEqual(idents.count(threading.get_ident()), 3,
Ezio Melotti29267c82013-02-23 05:52:46 +0200397 "Couldn't find main thread correctly in the list")
398
399 target()
400 t = threading.Thread(target=target)
401 t.start()
402 t.join()
403
404
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000405def test_main():
Ezio Melotti29267c82013-02-23 05:52:46 +0200406 support.run_unittest(CAPITest, TestPendingCalls, Test6012,
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000407 EmbeddingTests, SkipitemTest, TestThreadState,
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200408 SubinterpreterTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000409
410 for name in dir(_testcapi):
411 if name.startswith('test_'):
412 test = getattr(_testcapi, name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000413 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000414 print("internal", name)
Collin Winter3add4d72007-08-29 23:37:32 +0000415 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000416
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000417if __name__ == "__main__":
418 test_main()