blob: 6f75b776bc711b73f08c516924ba397b685f590e [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Antoine Pitrou8e605772011-04-25 21:21:07 +02004import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01005import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00006import random
7import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00008import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +00009import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000010import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +000011from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000012try:
Stefan Krahfd24f9e2012-08-20 11:04:24 +020013 import _posixsubprocess
14except ImportError:
15 _posixsubprocess = None
16try:
Victor Stinner45df8202010-04-28 22:31:17 +000017 import threading
18except ImportError:
19 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000020import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000021
Benjamin Petersona54c9092009-01-13 02:11:23 +000022
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000023def testfunction(self):
24 """some doc"""
25 return self
26
27class InstanceMethod:
28 id = _testcapi.instancemethod(id)
29 testfunction = _testcapi.instancemethod(testfunction)
30
31class CAPITest(unittest.TestCase):
32
33 def test_instancemethod(self):
34 inst = InstanceMethod()
35 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000036 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000037 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
38 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
39
40 InstanceMethod.testfunction.attribute = "test"
41 self.assertEqual(testfunction.attribute, "test")
42 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
43
Stefan Krah0ca46242010-06-09 08:56:28 +000044 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000045 def test_no_FatalError_infinite_loop(self):
Antoine Pitrou77e904e2013-10-08 23:04:32 +020046 with support.SuppressCrashReport():
Ezio Melotti25a40452013-03-05 20:26:17 +020047 p = subprocess.Popen([sys.executable, "-c",
Ezio Melottie1857d92013-03-05 20:31:34 +020048 'import _testcapi;'
49 '_testcapi.crash_no_current_thread()'],
50 stdout=subprocess.PIPE,
51 stderr=subprocess.PIPE)
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000052 (out, err) = p.communicate()
53 self.assertEqual(out, b'')
54 # This used to cause an infinite loop.
Vinay Sajip73954042012-05-06 11:34:50 +010055 self.assertTrue(err.rstrip().startswith(
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000056 b'Fatal Python error:'
Vinay Sajip73954042012-05-06 11:34:50 +010057 b' PyThreadState_Get: no current thread'))
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000058
Antoine Pitrou915605c2011-02-24 20:53:48 +000059 def test_memoryview_from_NULL_pointer(self):
60 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000061
Martin v. Löwisaa2efcb2012-04-19 14:33:43 +020062 def test_exc_info(self):
63 raised_exception = ValueError("5")
64 new_exc = TypeError("TEST")
65 try:
66 raise raised_exception
67 except ValueError as e:
68 tb = e.__traceback__
69 orig_sys_exc_info = sys.exc_info()
70 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
71 new_sys_exc_info = sys.exc_info()
72 new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
73 reset_sys_exc_info = sys.exc_info()
74
75 self.assertEqual(orig_exc_info[1], e)
76
77 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
78 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
79 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
80 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
81 self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
82 else:
83 self.assertTrue(False)
84
Stefan Krahfd24f9e2012-08-20 11:04:24 +020085 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
86 def test_seq_bytes_to_charp_array(self):
87 # Issue #15732: crash in _PySequence_BytesToCharpArray()
88 class Z(object):
89 def __len__(self):
90 return 1
91 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
92 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krah7cacd2e2012-08-21 08:16:09 +020093 # Issue #15736: overflow in _PySequence_BytesToCharpArray()
94 class Z(object):
95 def __len__(self):
96 return sys.maxsize
97 def __getitem__(self, i):
98 return b'x'
99 self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
100 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krahfd24f9e2012-08-20 11:04:24 +0200101
Stefan Krahdb579d72012-08-20 14:36:47 +0200102 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
103 def test_subprocess_fork_exec(self):
104 class Z(object):
105 def __len__(self):
106 return 1
107
108 # Issue #15738: crash in subprocess_fork_exec()
109 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
110 Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
111
Larry Hastings44e2eaa2013-11-23 15:37:55 -0800112 def test_docstring_signature_parsing(self):
113
114 self.assertEqual(_testcapi.no_docstring.__doc__, None)
115 self.assertEqual(_testcapi.no_docstring.__text_signature__, None)
116
117 self.assertEqual(_testcapi.docstring_empty.__doc__, "")
118 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)
119
120 self.assertEqual(_testcapi.docstring_no_signature.__doc__,
121 "This docstring has no signature.")
122 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)
123
124 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
125 "docstring_with_invalid_signature (boo)\n"
126 "\n"
127 "This docstring has an invalid signature."
128 )
129 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)
130
131 self.assertEqual(_testcapi.docstring_with_signature.__doc__,
132 "This docstring has a valid signature.")
133 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "(sig)")
134
135 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
136 "This docstring has a valid signature and some extra newlines.")
137 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
138 "(parameter)")
139
140
Victor Stinner45df8202010-04-28 22:31:17 +0000141@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +0000142class TestPendingCalls(unittest.TestCase):
143
144 def pendingcalls_submit(self, l, n):
145 def callback():
146 #this function can be interrupted by thread switching so let's
147 #use an atomic operation
148 l.append(None)
149
150 for i in range(n):
151 time.sleep(random.random()*0.02) #0.01 secs on average
152 #try submitting callback until successful.
153 #rely on regular interrupt to flush queue if we are
154 #unsuccessful.
155 while True:
156 if _testcapi._pending_threadfunc(callback):
157 break;
158
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000159 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000160 #now, stick around until l[0] has grown to 10
161 count = 0;
162 while len(l) != n:
163 #this busy loop is where we expect to be interrupted to
164 #run our callbacks. Note that callbacks are only run on the
165 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000166 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000167 print("(%i)"%(len(l),),)
168 for i in range(1000):
169 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000170 if context and not context.event.is_set():
171 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000172 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000173 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000174 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000175 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000176 print("(%i)"%(len(l),))
177
178 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000179
180 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000181 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000182 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000183 class foo(object):pass
184 context = foo()
185 context.l = []
186 context.n = 2 #submits per thread
187 context.nThreads = n // context.n
188 context.nFinished = 0
189 context.lock = threading.Lock()
190 context.event = threading.Event()
191
192 for i in range(context.nThreads):
193 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000194 t.start()
195 threads.append(t)
196
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000197 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000198
199 for t in threads:
200 t.join()
201
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000202 def pendingcalls_thread(self, context):
203 try:
204 self.pendingcalls_submit(context.l, context.n)
205 finally:
206 with context.lock:
207 context.nFinished += 1
208 nFinished = context.nFinished
209 if False and support.verbose:
210 print("finished threads: ", nFinished)
211 if nFinished == context.nThreads:
212 context.event.set()
213
Benjamin Petersona54c9092009-01-13 02:11:23 +0000214 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200215 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000216 #once. It is ok to ask for too many, because we loop until we find a slot.
217 #the loop can be interrupted to dispatch.
218 #there are only 32 dispatch slots, so we go for twice that!
219 l = []
220 n = 64
221 self.pendingcalls_submit(l, n)
222 self.pendingcalls_wait(l, n)
223
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200224
225class SubinterpreterTest(unittest.TestCase):
226
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100227 def test_subinterps(self):
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100228 import builtins
229 r, w = os.pipe()
230 code = """if 1:
231 import sys, builtins, pickle
232 with open({:d}, "wb") as f:
233 pickle.dump(id(sys.modules), f)
234 pickle.dump(id(builtins), f)
235 """.format(w)
236 with open(r, "rb") as f:
Victor Stinnered3b0bc2013-11-23 12:27:24 +0100237 ret = support.run_in_subinterp(code)
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100238 self.assertEqual(ret, 0)
239 self.assertNotEqual(pickle.load(f), id(sys.modules))
240 self.assertNotEqual(pickle.load(f), id(builtins))
241
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200242
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000243# Bug #6012
244class Test6012(unittest.TestCase):
245 def test(self):
246 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000247
Antoine Pitrou8e605772011-04-25 21:21:07 +0200248
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000249class EmbeddingTests(unittest.TestCase):
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000250 def setUp(self):
Antoine Pitrou8e605772011-04-25 21:21:07 +0200251 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
Nick Coghlan4e641df2013-11-03 16:54:46 +1000252 exename = "_testembed"
253 if sys.platform.startswith("win"):
254 ext = ("_d" if "_d" in sys.executable else "") + ".exe"
255 exename += ext
256 exepath = os.path.dirname(sys.executable)
257 else:
258 exepath = os.path.join(basepath, "Modules")
259 self.test_exe = exe = os.path.join(exepath, exename)
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000260 if not os.path.exists(exe):
261 self.skipTest("%r doesn't exist" % exe)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200262 # This is needed otherwise we get a fatal error:
263 # "Py_Initialize: Unable to get the locale encoding
264 # LookupError: no codec search functions registered: can't find encoding"
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000265 self.oldcwd = os.getcwd()
Antoine Pitrou8e605772011-04-25 21:21:07 +0200266 os.chdir(basepath)
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000267
268 def tearDown(self):
269 os.chdir(self.oldcwd)
270
271 def run_embedded_interpreter(self, *args):
272 """Runs a test in the embedded interpreter"""
273 cmd = [self.test_exe]
274 cmd.extend(args)
275 p = subprocess.Popen(cmd,
276 stdout=subprocess.PIPE,
277 stderr=subprocess.PIPE)
278 (out, err) = p.communicate()
279 self.assertEqual(p.returncode, 0,
280 "bad returncode %d, stderr is %r" %
281 (p.returncode, err))
282 return out.decode("latin1"), err.decode("latin1")
283
284 def test_subinterps(self):
285 # This is just a "don't crash" test
286 out, err = self.run_embedded_interpreter()
287 if support.verbose:
288 print()
289 print(out)
290 print(err)
291
Nick Coghlan4e641df2013-11-03 16:54:46 +1000292 @staticmethod
293 def _get_default_pipe_encoding():
294 rp, wp = os.pipe()
295 try:
296 with os.fdopen(wp, 'w') as w:
297 default_pipe_encoding = w.encoding
298 finally:
299 os.close(rp)
300 return default_pipe_encoding
301
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000302 def test_forced_io_encoding(self):
303 # Checks forced configuration of embedded interpreter IO streams
304 out, err = self.run_embedded_interpreter("forced_io_encoding")
305 if support.verbose:
306 print()
307 print(out)
308 print(err)
Nick Coghlan4e641df2013-11-03 16:54:46 +1000309 expected_stdin_encoding = sys.__stdin__.encoding
310 expected_pipe_encoding = self._get_default_pipe_encoding()
311 expected_output = os.linesep.join([
312 "--- Use defaults ---",
313 "Expected encoding: default",
314 "Expected errors: default",
315 "stdin: {0}:strict",
316 "stdout: {1}:strict",
317 "stderr: {1}:backslashreplace",
318 "--- Set errors only ---",
319 "Expected encoding: default",
320 "Expected errors: surrogateescape",
321 "stdin: {0}:surrogateescape",
322 "stdout: {1}:surrogateescape",
323 "stderr: {1}:backslashreplace",
324 "--- Set encoding only ---",
325 "Expected encoding: latin-1",
326 "Expected errors: default",
327 "stdin: latin-1:strict",
328 "stdout: latin-1:strict",
329 "stderr: latin-1:backslashreplace",
330 "--- Set encoding and errors ---",
331 "Expected encoding: latin-1",
332 "Expected errors: surrogateescape",
333 "stdin: latin-1:surrogateescape",
334 "stdout: latin-1:surrogateescape",
335 "stderr: latin-1:backslashreplace"]).format(expected_stdin_encoding,
336 expected_pipe_encoding)
Nick Coghlan3321fb82013-10-18 23:59:58 +1000337 # This is useful if we ever trip over odd platform behaviour
Nick Coghlan6508dc52013-10-18 01:44:22 +1000338 self.maxDiff = None
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000339 self.assertEqual(out.strip(), expected_output)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200340
Larry Hastings8f904da2012-06-22 03:56:29 -0700341class SkipitemTest(unittest.TestCase):
342
343 def test_skipitem(self):
344 """
345 If this test failed, you probably added a new "format unit"
346 in Python/getargs.c, but neglected to update our poor friend
347 skipitem() in the same file. (If so, shame on you!)
348
Larry Hastings48ed3602012-06-22 12:58:36 -0700349 With a few exceptions**, this function brute-force tests all
350 printable ASCII*** characters (32 to 126 inclusive) as format units,
351 checking to see that PyArg_ParseTupleAndKeywords() return consistent
352 errors both when the unit is attempted to be used and when it is
353 skipped. If the format unit doesn't exist, we'll get one of two
354 specific error messages (one for used, one for skipped); if it does
355 exist we *won't* get that error--we'll get either no error or some
356 other error. If we get the specific "does not exist" error for one
357 test and not for the other, there's a mismatch, and the test fails.
Larry Hastings8f904da2012-06-22 03:56:29 -0700358
Larry Hastings48ed3602012-06-22 12:58:36 -0700359 ** Some format units have special funny semantics and it would
360 be difficult to accomodate them here. Since these are all
361 well-established and properly skipped in skipitem() we can
362 get away with not testing them--this test is really intended
363 to catch *new* format units.
364
365 *** Python C source files must be ASCII. Therefore it's impossible
366 to have non-ASCII format units.
367
Larry Hastings8f904da2012-06-22 03:56:29 -0700368 """
369 empty_tuple = ()
370 tuple_1 = (0,)
371 dict_b = {'b':1}
372 keywords = ["a", "b"]
373
Larry Hastings48ed3602012-06-22 12:58:36 -0700374 for i in range(32, 127):
Larry Hastings8f904da2012-06-22 03:56:29 -0700375 c = chr(i)
376
Larry Hastings8f904da2012-06-22 03:56:29 -0700377 # skip parentheses, the error reporting is inconsistent about them
378 # skip 'e', it's always a two-character code
379 # skip '|' and '$', they don't represent arguments anyway
Larry Hastings48ed3602012-06-22 12:58:36 -0700380 if c in '()e|$':
Larry Hastings8f904da2012-06-22 03:56:29 -0700381 continue
382
383 # test the format unit when not skipped
384 format = c + "i"
385 try:
386 # (note: the format string must be bytes!)
387 _testcapi.parse_tuple_and_keywords(tuple_1, dict_b,
388 format.encode("ascii"), keywords)
389 when_not_skipped = False
390 except TypeError as e:
391 s = "argument 1 must be impossible<bad format char>, not int"
392 when_not_skipped = (str(e) == s)
393 except RuntimeError as e:
394 when_not_skipped = False
395
396 # test the format unit when skipped
397 optional_format = "|" + format
398 try:
399 _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b,
400 optional_format.encode("ascii"), keywords)
401 when_skipped = False
402 except RuntimeError as e:
403 s = "impossible<bad format char>: '{}'".format(format)
404 when_skipped = (str(e) == s)
405
406 message = ("test_skipitem_parity: "
407 "detected mismatch between convertsimple and skipitem "
408 "for format unit '{}' ({}), not skipped {}, skipped {}".format(
409 c, i, when_skipped, when_not_skipped))
410 self.assertIs(when_skipped, when_not_skipped, message)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200411
Jesus Cea6e1d2b62012-10-04 16:06:30 +0200412 def test_parse_tuple_and_keywords(self):
413 # parse_tuple_and_keywords error handling tests
414 self.assertRaises(TypeError, _testcapi.parse_tuple_and_keywords,
415 (), {}, 42, [])
416 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
417 (), {}, b'', 42)
418 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
419 (), {}, b'', [''] * 42)
420 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
421 (), {}, b'', [42])
422
Ezio Melotti29267c82013-02-23 05:52:46 +0200423@unittest.skipUnless(threading, 'Threading required for this test.')
424class TestThreadState(unittest.TestCase):
425
426 @support.reap_threads
427 def test_thread_state(self):
428 # some extra thread-state tests driven via _testcapi
429 def target():
430 idents = []
431
432 def callback():
Ezio Melotti35246bd2013-02-23 05:58:38 +0200433 idents.append(threading.get_ident())
Ezio Melotti29267c82013-02-23 05:52:46 +0200434
435 _testcapi._test_thread_state(callback)
436 a = b = callback
437 time.sleep(1)
438 # Check our main thread is in the list exactly 3 times.
Ezio Melotti35246bd2013-02-23 05:58:38 +0200439 self.assertEqual(idents.count(threading.get_ident()), 3,
Ezio Melotti29267c82013-02-23 05:52:46 +0200440 "Couldn't find main thread correctly in the list")
441
442 target()
443 t = threading.Thread(target=target)
444 t.start()
445 t.join()
446
Zachary Warec12f09e2013-11-11 22:47:04 -0600447class Test_testcapi(unittest.TestCase):
448 def test__testcapi(self):
449 for name in dir(_testcapi):
450 if name.startswith('test_'):
Zachary Waredfcd6942013-11-11 22:59:23 -0600451 with self.subTest("internal", name=name):
452 test = getattr(_testcapi, name)
453 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000454
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000455if __name__ == "__main__":
Zachary Warec12f09e2013-11-11 22:47:04 -0600456 unittest.main()