blob: 000079e4a6ba50f56e8821a10f478f754ee3318d [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Antoine Pitrou8e605772011-04-25 21:21:07 +02004import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01005import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00006import random
7import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00008import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +00009import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000010import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +000011from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000012try:
Stefan Krahfd24f9e2012-08-20 11:04:24 +020013 import _posixsubprocess
14except ImportError:
15 _posixsubprocess = None
16try:
Victor Stinner45df8202010-04-28 22:31:17 +000017 import threading
18except ImportError:
19 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000020import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000021
Benjamin Petersona54c9092009-01-13 02:11:23 +000022
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000023def testfunction(self):
24 """some doc"""
25 return self
26
27class InstanceMethod:
28 id = _testcapi.instancemethod(id)
29 testfunction = _testcapi.instancemethod(testfunction)
30
31class CAPITest(unittest.TestCase):
32
33 def test_instancemethod(self):
34 inst = InstanceMethod()
35 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000036 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000037 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
38 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
39
40 InstanceMethod.testfunction.attribute = "test"
41 self.assertEqual(testfunction.attribute, "test")
42 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
43
Stefan Krah0ca46242010-06-09 08:56:28 +000044 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000045 def test_no_FatalError_infinite_loop(self):
Antoine Pitrou77e904e2013-10-08 23:04:32 +020046 with support.SuppressCrashReport():
Ezio Melotti25a40452013-03-05 20:26:17 +020047 p = subprocess.Popen([sys.executable, "-c",
Ezio Melottie1857d92013-03-05 20:31:34 +020048 'import _testcapi;'
49 '_testcapi.crash_no_current_thread()'],
50 stdout=subprocess.PIPE,
51 stderr=subprocess.PIPE)
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000052 (out, err) = p.communicate()
53 self.assertEqual(out, b'')
54 # This used to cause an infinite loop.
Vinay Sajip73954042012-05-06 11:34:50 +010055 self.assertTrue(err.rstrip().startswith(
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000056 b'Fatal Python error:'
Vinay Sajip73954042012-05-06 11:34:50 +010057 b' PyThreadState_Get: no current thread'))
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000058
Antoine Pitrou915605c2011-02-24 20:53:48 +000059 def test_memoryview_from_NULL_pointer(self):
60 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000061
Martin v. Löwisaa2efcb2012-04-19 14:33:43 +020062 def test_exc_info(self):
63 raised_exception = ValueError("5")
64 new_exc = TypeError("TEST")
65 try:
66 raise raised_exception
67 except ValueError as e:
68 tb = e.__traceback__
69 orig_sys_exc_info = sys.exc_info()
70 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
71 new_sys_exc_info = sys.exc_info()
72 new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
73 reset_sys_exc_info = sys.exc_info()
74
75 self.assertEqual(orig_exc_info[1], e)
76
77 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
78 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
79 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
80 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
81 self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
82 else:
83 self.assertTrue(False)
84
Stefan Krahfd24f9e2012-08-20 11:04:24 +020085 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
86 def test_seq_bytes_to_charp_array(self):
87 # Issue #15732: crash in _PySequence_BytesToCharpArray()
88 class Z(object):
89 def __len__(self):
90 return 1
91 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
92 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krah7cacd2e2012-08-21 08:16:09 +020093 # Issue #15736: overflow in _PySequence_BytesToCharpArray()
94 class Z(object):
95 def __len__(self):
96 return sys.maxsize
97 def __getitem__(self, i):
98 return b'x'
99 self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
100 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krahfd24f9e2012-08-20 11:04:24 +0200101
Stefan Krahdb579d72012-08-20 14:36:47 +0200102 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
103 def test_subprocess_fork_exec(self):
104 class Z(object):
105 def __len__(self):
106 return 1
107
108 # Issue #15738: crash in subprocess_fork_exec()
109 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
110 Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
111
Victor Stinner45df8202010-04-28 22:31:17 +0000112@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +0000113class TestPendingCalls(unittest.TestCase):
114
115 def pendingcalls_submit(self, l, n):
116 def callback():
117 #this function can be interrupted by thread switching so let's
118 #use an atomic operation
119 l.append(None)
120
121 for i in range(n):
122 time.sleep(random.random()*0.02) #0.01 secs on average
123 #try submitting callback until successful.
124 #rely on regular interrupt to flush queue if we are
125 #unsuccessful.
126 while True:
127 if _testcapi._pending_threadfunc(callback):
128 break;
129
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000130 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000131 #now, stick around until l[0] has grown to 10
132 count = 0;
133 while len(l) != n:
134 #this busy loop is where we expect to be interrupted to
135 #run our callbacks. Note that callbacks are only run on the
136 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000137 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000138 print("(%i)"%(len(l),),)
139 for i in range(1000):
140 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000141 if context and not context.event.is_set():
142 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000143 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000144 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000145 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000146 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000147 print("(%i)"%(len(l),))
148
149 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000150
151 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000152 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000153 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000154 class foo(object):pass
155 context = foo()
156 context.l = []
157 context.n = 2 #submits per thread
158 context.nThreads = n // context.n
159 context.nFinished = 0
160 context.lock = threading.Lock()
161 context.event = threading.Event()
162
163 for i in range(context.nThreads):
164 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000165 t.start()
166 threads.append(t)
167
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000168 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000169
170 for t in threads:
171 t.join()
172
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000173 def pendingcalls_thread(self, context):
174 try:
175 self.pendingcalls_submit(context.l, context.n)
176 finally:
177 with context.lock:
178 context.nFinished += 1
179 nFinished = context.nFinished
180 if False and support.verbose:
181 print("finished threads: ", nFinished)
182 if nFinished == context.nThreads:
183 context.event.set()
184
Benjamin Petersona54c9092009-01-13 02:11:23 +0000185 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200186 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000187 #once. It is ok to ask for too many, because we loop until we find a slot.
188 #the loop can be interrupted to dispatch.
189 #there are only 32 dispatch slots, so we go for twice that!
190 l = []
191 n = 64
192 self.pendingcalls_submit(l, n)
193 self.pendingcalls_wait(l, n)
194
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200195
196class SubinterpreterTest(unittest.TestCase):
197
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100198 def test_subinterps(self):
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100199 import builtins
200 r, w = os.pipe()
201 code = """if 1:
202 import sys, builtins, pickle
203 with open({:d}, "wb") as f:
204 pickle.dump(id(sys.modules), f)
205 pickle.dump(id(builtins), f)
206 """.format(w)
207 with open(r, "rb") as f:
Victor Stinnered3b0bc2013-11-23 12:27:24 +0100208 ret = support.run_in_subinterp(code)
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100209 self.assertEqual(ret, 0)
210 self.assertNotEqual(pickle.load(f), id(sys.modules))
211 self.assertNotEqual(pickle.load(f), id(builtins))
212
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200213
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000214# Bug #6012
215class Test6012(unittest.TestCase):
216 def test(self):
217 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000218
Antoine Pitrou8e605772011-04-25 21:21:07 +0200219
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000220class EmbeddingTests(unittest.TestCase):
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000221 def setUp(self):
Antoine Pitrou8e605772011-04-25 21:21:07 +0200222 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
Nick Coghlan4e641df2013-11-03 16:54:46 +1000223 exename = "_testembed"
224 if sys.platform.startswith("win"):
225 ext = ("_d" if "_d" in sys.executable else "") + ".exe"
226 exename += ext
227 exepath = os.path.dirname(sys.executable)
228 else:
229 exepath = os.path.join(basepath, "Modules")
230 self.test_exe = exe = os.path.join(exepath, exename)
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000231 if not os.path.exists(exe):
232 self.skipTest("%r doesn't exist" % exe)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200233 # This is needed otherwise we get a fatal error:
234 # "Py_Initialize: Unable to get the locale encoding
235 # LookupError: no codec search functions registered: can't find encoding"
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000236 self.oldcwd = os.getcwd()
Antoine Pitrou8e605772011-04-25 21:21:07 +0200237 os.chdir(basepath)
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000238
239 def tearDown(self):
240 os.chdir(self.oldcwd)
241
242 def run_embedded_interpreter(self, *args):
243 """Runs a test in the embedded interpreter"""
244 cmd = [self.test_exe]
245 cmd.extend(args)
246 p = subprocess.Popen(cmd,
247 stdout=subprocess.PIPE,
248 stderr=subprocess.PIPE)
249 (out, err) = p.communicate()
250 self.assertEqual(p.returncode, 0,
251 "bad returncode %d, stderr is %r" %
252 (p.returncode, err))
253 return out.decode("latin1"), err.decode("latin1")
254
255 def test_subinterps(self):
256 # This is just a "don't crash" test
257 out, err = self.run_embedded_interpreter()
258 if support.verbose:
259 print()
260 print(out)
261 print(err)
262
Nick Coghlan4e641df2013-11-03 16:54:46 +1000263 @staticmethod
264 def _get_default_pipe_encoding():
265 rp, wp = os.pipe()
266 try:
267 with os.fdopen(wp, 'w') as w:
268 default_pipe_encoding = w.encoding
269 finally:
270 os.close(rp)
271 return default_pipe_encoding
272
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000273 def test_forced_io_encoding(self):
274 # Checks forced configuration of embedded interpreter IO streams
275 out, err = self.run_embedded_interpreter("forced_io_encoding")
276 if support.verbose:
277 print()
278 print(out)
279 print(err)
Nick Coghlan4e641df2013-11-03 16:54:46 +1000280 expected_stdin_encoding = sys.__stdin__.encoding
281 expected_pipe_encoding = self._get_default_pipe_encoding()
282 expected_output = os.linesep.join([
283 "--- Use defaults ---",
284 "Expected encoding: default",
285 "Expected errors: default",
286 "stdin: {0}:strict",
287 "stdout: {1}:strict",
288 "stderr: {1}:backslashreplace",
289 "--- Set errors only ---",
290 "Expected encoding: default",
291 "Expected errors: surrogateescape",
292 "stdin: {0}:surrogateescape",
293 "stdout: {1}:surrogateescape",
294 "stderr: {1}:backslashreplace",
295 "--- Set encoding only ---",
296 "Expected encoding: latin-1",
297 "Expected errors: default",
298 "stdin: latin-1:strict",
299 "stdout: latin-1:strict",
300 "stderr: latin-1:backslashreplace",
301 "--- Set encoding and errors ---",
302 "Expected encoding: latin-1",
303 "Expected errors: surrogateescape",
304 "stdin: latin-1:surrogateescape",
305 "stdout: latin-1:surrogateescape",
306 "stderr: latin-1:backslashreplace"]).format(expected_stdin_encoding,
307 expected_pipe_encoding)
Nick Coghlan3321fb82013-10-18 23:59:58 +1000308 # This is useful if we ever trip over odd platform behaviour
Nick Coghlan6508dc52013-10-18 01:44:22 +1000309 self.maxDiff = None
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000310 self.assertEqual(out.strip(), expected_output)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200311
Larry Hastings8f904da2012-06-22 03:56:29 -0700312class SkipitemTest(unittest.TestCase):
313
314 def test_skipitem(self):
315 """
316 If this test failed, you probably added a new "format unit"
317 in Python/getargs.c, but neglected to update our poor friend
318 skipitem() in the same file. (If so, shame on you!)
319
Larry Hastings48ed3602012-06-22 12:58:36 -0700320 With a few exceptions**, this function brute-force tests all
321 printable ASCII*** characters (32 to 126 inclusive) as format units,
322 checking to see that PyArg_ParseTupleAndKeywords() return consistent
323 errors both when the unit is attempted to be used and when it is
324 skipped. If the format unit doesn't exist, we'll get one of two
325 specific error messages (one for used, one for skipped); if it does
326 exist we *won't* get that error--we'll get either no error or some
327 other error. If we get the specific "does not exist" error for one
328 test and not for the other, there's a mismatch, and the test fails.
Larry Hastings8f904da2012-06-22 03:56:29 -0700329
Larry Hastings48ed3602012-06-22 12:58:36 -0700330 ** Some format units have special funny semantics and it would
331 be difficult to accomodate them here. Since these are all
332 well-established and properly skipped in skipitem() we can
333 get away with not testing them--this test is really intended
334 to catch *new* format units.
335
336 *** Python C source files must be ASCII. Therefore it's impossible
337 to have non-ASCII format units.
338
Larry Hastings8f904da2012-06-22 03:56:29 -0700339 """
340 empty_tuple = ()
341 tuple_1 = (0,)
342 dict_b = {'b':1}
343 keywords = ["a", "b"]
344
Larry Hastings48ed3602012-06-22 12:58:36 -0700345 for i in range(32, 127):
Larry Hastings8f904da2012-06-22 03:56:29 -0700346 c = chr(i)
347
Larry Hastings8f904da2012-06-22 03:56:29 -0700348 # skip parentheses, the error reporting is inconsistent about them
349 # skip 'e', it's always a two-character code
350 # skip '|' and '$', they don't represent arguments anyway
Larry Hastings48ed3602012-06-22 12:58:36 -0700351 if c in '()e|$':
Larry Hastings8f904da2012-06-22 03:56:29 -0700352 continue
353
354 # test the format unit when not skipped
355 format = c + "i"
356 try:
357 # (note: the format string must be bytes!)
358 _testcapi.parse_tuple_and_keywords(tuple_1, dict_b,
359 format.encode("ascii"), keywords)
360 when_not_skipped = False
361 except TypeError as e:
362 s = "argument 1 must be impossible<bad format char>, not int"
363 when_not_skipped = (str(e) == s)
364 except RuntimeError as e:
365 when_not_skipped = False
366
367 # test the format unit when skipped
368 optional_format = "|" + format
369 try:
370 _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b,
371 optional_format.encode("ascii"), keywords)
372 when_skipped = False
373 except RuntimeError as e:
374 s = "impossible<bad format char>: '{}'".format(format)
375 when_skipped = (str(e) == s)
376
377 message = ("test_skipitem_parity: "
378 "detected mismatch between convertsimple and skipitem "
379 "for format unit '{}' ({}), not skipped {}, skipped {}".format(
380 c, i, when_skipped, when_not_skipped))
381 self.assertIs(when_skipped, when_not_skipped, message)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200382
Jesus Cea6e1d2b62012-10-04 16:06:30 +0200383 def test_parse_tuple_and_keywords(self):
384 # parse_tuple_and_keywords error handling tests
385 self.assertRaises(TypeError, _testcapi.parse_tuple_and_keywords,
386 (), {}, 42, [])
387 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
388 (), {}, b'', 42)
389 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
390 (), {}, b'', [''] * 42)
391 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
392 (), {}, b'', [42])
393
Ezio Melotti29267c82013-02-23 05:52:46 +0200394@unittest.skipUnless(threading, 'Threading required for this test.')
395class TestThreadState(unittest.TestCase):
396
397 @support.reap_threads
398 def test_thread_state(self):
399 # some extra thread-state tests driven via _testcapi
400 def target():
401 idents = []
402
403 def callback():
Ezio Melotti35246bd2013-02-23 05:58:38 +0200404 idents.append(threading.get_ident())
Ezio Melotti29267c82013-02-23 05:52:46 +0200405
406 _testcapi._test_thread_state(callback)
407 a = b = callback
408 time.sleep(1)
409 # Check our main thread is in the list exactly 3 times.
Ezio Melotti35246bd2013-02-23 05:58:38 +0200410 self.assertEqual(idents.count(threading.get_ident()), 3,
Ezio Melotti29267c82013-02-23 05:52:46 +0200411 "Couldn't find main thread correctly in the list")
412
413 target()
414 t = threading.Thread(target=target)
415 t.start()
416 t.join()
417
Zachary Warec12f09e2013-11-11 22:47:04 -0600418class Test_testcapi(unittest.TestCase):
419 def test__testcapi(self):
420 for name in dir(_testcapi):
421 if name.startswith('test_'):
Zachary Waredfcd6942013-11-11 22:59:23 -0600422 with self.subTest("internal", name=name):
423 test = getattr(_testcapi, name)
424 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000425
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000426if __name__ == "__main__":
Zachary Warec12f09e2013-11-11 22:47:04 -0600427 unittest.main()