blob: 2ef5d4a9c9a6cc27260ee896d4e79f7f40dbf60e [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00004from __future__ import with_statement
Antoine Pitrou8e605772011-04-25 21:21:07 +02005import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01006import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00007import random
8import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00009import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +000010import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000011import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +000012from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000013try:
Stefan Krahfd24f9e2012-08-20 11:04:24 +020014 import _posixsubprocess
15except ImportError:
16 _posixsubprocess = None
17try:
Victor Stinner45df8202010-04-28 22:31:17 +000018 import threading
19except ImportError:
20 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000021import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000022
Benjamin Petersona54c9092009-01-13 02:11:23 +000023
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000024def testfunction(self):
25 """some doc"""
26 return self
27
28class InstanceMethod:
29 id = _testcapi.instancemethod(id)
30 testfunction = _testcapi.instancemethod(testfunction)
31
32class CAPITest(unittest.TestCase):
33
34 def test_instancemethod(self):
35 inst = InstanceMethod()
36 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000037 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000038 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
39 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
40
41 InstanceMethod.testfunction.attribute = "test"
42 self.assertEqual(testfunction.attribute, "test")
43 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
44
Stefan Krah0ca46242010-06-09 08:56:28 +000045 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000046 def test_no_FatalError_infinite_loop(self):
Antoine Pitrou77e904e2013-10-08 23:04:32 +020047 with support.SuppressCrashReport():
Ezio Melotti25a40452013-03-05 20:26:17 +020048 p = subprocess.Popen([sys.executable, "-c",
Ezio Melottie1857d92013-03-05 20:31:34 +020049 'import _testcapi;'
50 '_testcapi.crash_no_current_thread()'],
51 stdout=subprocess.PIPE,
52 stderr=subprocess.PIPE)
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000053 (out, err) = p.communicate()
54 self.assertEqual(out, b'')
55 # This used to cause an infinite loop.
Vinay Sajip73954042012-05-06 11:34:50 +010056 self.assertTrue(err.rstrip().startswith(
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000057 b'Fatal Python error:'
Vinay Sajip73954042012-05-06 11:34:50 +010058 b' PyThreadState_Get: no current thread'))
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000059
Antoine Pitrou915605c2011-02-24 20:53:48 +000060 def test_memoryview_from_NULL_pointer(self):
61 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000062
Martin v. Löwisaa2efcb2012-04-19 14:33:43 +020063 def test_exc_info(self):
64 raised_exception = ValueError("5")
65 new_exc = TypeError("TEST")
66 try:
67 raise raised_exception
68 except ValueError as e:
69 tb = e.__traceback__
70 orig_sys_exc_info = sys.exc_info()
71 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
72 new_sys_exc_info = sys.exc_info()
73 new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
74 reset_sys_exc_info = sys.exc_info()
75
76 self.assertEqual(orig_exc_info[1], e)
77
78 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
79 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
80 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
81 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
82 self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
83 else:
84 self.assertTrue(False)
85
Stefan Krahfd24f9e2012-08-20 11:04:24 +020086 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
87 def test_seq_bytes_to_charp_array(self):
88 # Issue #15732: crash in _PySequence_BytesToCharpArray()
89 class Z(object):
90 def __len__(self):
91 return 1
92 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
93 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krah7cacd2e2012-08-21 08:16:09 +020094 # Issue #15736: overflow in _PySequence_BytesToCharpArray()
95 class Z(object):
96 def __len__(self):
97 return sys.maxsize
98 def __getitem__(self, i):
99 return b'x'
100 self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
101 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krahfd24f9e2012-08-20 11:04:24 +0200102
Stefan Krahdb579d72012-08-20 14:36:47 +0200103 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
104 def test_subprocess_fork_exec(self):
105 class Z(object):
106 def __len__(self):
107 return 1
108
109 # Issue #15738: crash in subprocess_fork_exec()
110 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
111 Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
112
Victor Stinner45df8202010-04-28 22:31:17 +0000113@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +0000114class TestPendingCalls(unittest.TestCase):
115
116 def pendingcalls_submit(self, l, n):
117 def callback():
118 #this function can be interrupted by thread switching so let's
119 #use an atomic operation
120 l.append(None)
121
122 for i in range(n):
123 time.sleep(random.random()*0.02) #0.01 secs on average
124 #try submitting callback until successful.
125 #rely on regular interrupt to flush queue if we are
126 #unsuccessful.
127 while True:
128 if _testcapi._pending_threadfunc(callback):
129 break;
130
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000131 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000132 #now, stick around until l[0] has grown to 10
133 count = 0;
134 while len(l) != n:
135 #this busy loop is where we expect to be interrupted to
136 #run our callbacks. Note that callbacks are only run on the
137 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000138 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000139 print("(%i)"%(len(l),),)
140 for i in range(1000):
141 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000142 if context and not context.event.is_set():
143 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000144 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000145 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000146 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000147 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000148 print("(%i)"%(len(l),))
149
150 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000151
152 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000153 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000154 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000155 class foo(object):pass
156 context = foo()
157 context.l = []
158 context.n = 2 #submits per thread
159 context.nThreads = n // context.n
160 context.nFinished = 0
161 context.lock = threading.Lock()
162 context.event = threading.Event()
163
164 for i in range(context.nThreads):
165 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000166 t.start()
167 threads.append(t)
168
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000169 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000170
171 for t in threads:
172 t.join()
173
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000174 def pendingcalls_thread(self, context):
175 try:
176 self.pendingcalls_submit(context.l, context.n)
177 finally:
178 with context.lock:
179 context.nFinished += 1
180 nFinished = context.nFinished
181 if False and support.verbose:
182 print("finished threads: ", nFinished)
183 if nFinished == context.nThreads:
184 context.event.set()
185
Benjamin Petersona54c9092009-01-13 02:11:23 +0000186 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200187 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000188 #once. It is ok to ask for too many, because we loop until we find a slot.
189 #the loop can be interrupted to dispatch.
190 #there are only 32 dispatch slots, so we go for twice that!
191 l = []
192 n = 64
193 self.pendingcalls_submit(l, n)
194 self.pendingcalls_wait(l, n)
195
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200196
197class SubinterpreterTest(unittest.TestCase):
198
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100199 def test_subinterps(self):
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100200 import builtins
201 r, w = os.pipe()
202 code = """if 1:
203 import sys, builtins, pickle
204 with open({:d}, "wb") as f:
205 pickle.dump(id(sys.modules), f)
206 pickle.dump(id(builtins), f)
207 """.format(w)
208 with open(r, "rb") as f:
209 ret = _testcapi.run_in_subinterp(code)
210 self.assertEqual(ret, 0)
211 self.assertNotEqual(pickle.load(f), id(sys.modules))
212 self.assertNotEqual(pickle.load(f), id(builtins))
213
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200214
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000215# Bug #6012
216class Test6012(unittest.TestCase):
217 def test(self):
218 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000219
Antoine Pitrou8e605772011-04-25 21:21:07 +0200220
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000221class EmbeddingTests(unittest.TestCase):
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000222 def setUp(self):
Antoine Pitrou8e605772011-04-25 21:21:07 +0200223 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
Nick Coghlan4e641df2013-11-03 16:54:46 +1000224 exename = "_testembed"
225 if sys.platform.startswith("win"):
226 ext = ("_d" if "_d" in sys.executable else "") + ".exe"
227 exename += ext
228 exepath = os.path.dirname(sys.executable)
229 else:
230 exepath = os.path.join(basepath, "Modules")
231 self.test_exe = exe = os.path.join(exepath, exename)
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000232 if not os.path.exists(exe):
233 self.skipTest("%r doesn't exist" % exe)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200234 # This is needed otherwise we get a fatal error:
235 # "Py_Initialize: Unable to get the locale encoding
236 # LookupError: no codec search functions registered: can't find encoding"
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000237 self.oldcwd = os.getcwd()
Antoine Pitrou8e605772011-04-25 21:21:07 +0200238 os.chdir(basepath)
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000239
240 def tearDown(self):
241 os.chdir(self.oldcwd)
242
243 def run_embedded_interpreter(self, *args):
244 """Runs a test in the embedded interpreter"""
245 cmd = [self.test_exe]
246 cmd.extend(args)
247 p = subprocess.Popen(cmd,
248 stdout=subprocess.PIPE,
249 stderr=subprocess.PIPE)
250 (out, err) = p.communicate()
251 self.assertEqual(p.returncode, 0,
252 "bad returncode %d, stderr is %r" %
253 (p.returncode, err))
254 return out.decode("latin1"), err.decode("latin1")
255
256 def test_subinterps(self):
257 # This is just a "don't crash" test
258 out, err = self.run_embedded_interpreter()
259 if support.verbose:
260 print()
261 print(out)
262 print(err)
263
Nick Coghlan4e641df2013-11-03 16:54:46 +1000264 @staticmethod
265 def _get_default_pipe_encoding():
266 rp, wp = os.pipe()
267 try:
268 with os.fdopen(wp, 'w') as w:
269 default_pipe_encoding = w.encoding
270 finally:
271 os.close(rp)
272 return default_pipe_encoding
273
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000274 def test_forced_io_encoding(self):
275 # Checks forced configuration of embedded interpreter IO streams
276 out, err = self.run_embedded_interpreter("forced_io_encoding")
277 if support.verbose:
278 print()
279 print(out)
280 print(err)
Nick Coghlan4e641df2013-11-03 16:54:46 +1000281 expected_stdin_encoding = sys.__stdin__.encoding
282 expected_pipe_encoding = self._get_default_pipe_encoding()
283 expected_output = os.linesep.join([
284 "--- Use defaults ---",
285 "Expected encoding: default",
286 "Expected errors: default",
287 "stdin: {0}:strict",
288 "stdout: {1}:strict",
289 "stderr: {1}:backslashreplace",
290 "--- Set errors only ---",
291 "Expected encoding: default",
292 "Expected errors: surrogateescape",
293 "stdin: {0}:surrogateescape",
294 "stdout: {1}:surrogateescape",
295 "stderr: {1}:backslashreplace",
296 "--- Set encoding only ---",
297 "Expected encoding: latin-1",
298 "Expected errors: default",
299 "stdin: latin-1:strict",
300 "stdout: latin-1:strict",
301 "stderr: latin-1:backslashreplace",
302 "--- Set encoding and errors ---",
303 "Expected encoding: latin-1",
304 "Expected errors: surrogateescape",
305 "stdin: latin-1:surrogateescape",
306 "stdout: latin-1:surrogateescape",
307 "stderr: latin-1:backslashreplace"]).format(expected_stdin_encoding,
308 expected_pipe_encoding)
Nick Coghlan3321fb82013-10-18 23:59:58 +1000309 # This is useful if we ever trip over odd platform behaviour
Nick Coghlan6508dc52013-10-18 01:44:22 +1000310 self.maxDiff = None
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000311 self.assertEqual(out.strip(), expected_output)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200312
Larry Hastings8f904da2012-06-22 03:56:29 -0700313class SkipitemTest(unittest.TestCase):
314
315 def test_skipitem(self):
316 """
317 If this test failed, you probably added a new "format unit"
318 in Python/getargs.c, but neglected to update our poor friend
319 skipitem() in the same file. (If so, shame on you!)
320
Larry Hastings48ed3602012-06-22 12:58:36 -0700321 With a few exceptions**, this function brute-force tests all
322 printable ASCII*** characters (32 to 126 inclusive) as format units,
323 checking to see that PyArg_ParseTupleAndKeywords() return consistent
324 errors both when the unit is attempted to be used and when it is
325 skipped. If the format unit doesn't exist, we'll get one of two
326 specific error messages (one for used, one for skipped); if it does
327 exist we *won't* get that error--we'll get either no error or some
328 other error. If we get the specific "does not exist" error for one
329 test and not for the other, there's a mismatch, and the test fails.
Larry Hastings8f904da2012-06-22 03:56:29 -0700330
Larry Hastings48ed3602012-06-22 12:58:36 -0700331 ** Some format units have special funny semantics and it would
332 be difficult to accomodate them here. Since these are all
333 well-established and properly skipped in skipitem() we can
334 get away with not testing them--this test is really intended
335 to catch *new* format units.
336
337 *** Python C source files must be ASCII. Therefore it's impossible
338 to have non-ASCII format units.
339
Larry Hastings8f904da2012-06-22 03:56:29 -0700340 """
341 empty_tuple = ()
342 tuple_1 = (0,)
343 dict_b = {'b':1}
344 keywords = ["a", "b"]
345
Larry Hastings48ed3602012-06-22 12:58:36 -0700346 for i in range(32, 127):
Larry Hastings8f904da2012-06-22 03:56:29 -0700347 c = chr(i)
348
Larry Hastings8f904da2012-06-22 03:56:29 -0700349 # skip parentheses, the error reporting is inconsistent about them
350 # skip 'e', it's always a two-character code
351 # skip '|' and '$', they don't represent arguments anyway
Larry Hastings48ed3602012-06-22 12:58:36 -0700352 if c in '()e|$':
Larry Hastings8f904da2012-06-22 03:56:29 -0700353 continue
354
355 # test the format unit when not skipped
356 format = c + "i"
357 try:
358 # (note: the format string must be bytes!)
359 _testcapi.parse_tuple_and_keywords(tuple_1, dict_b,
360 format.encode("ascii"), keywords)
361 when_not_skipped = False
362 except TypeError as e:
363 s = "argument 1 must be impossible<bad format char>, not int"
364 when_not_skipped = (str(e) == s)
365 except RuntimeError as e:
366 when_not_skipped = False
367
368 # test the format unit when skipped
369 optional_format = "|" + format
370 try:
371 _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b,
372 optional_format.encode("ascii"), keywords)
373 when_skipped = False
374 except RuntimeError as e:
375 s = "impossible<bad format char>: '{}'".format(format)
376 when_skipped = (str(e) == s)
377
378 message = ("test_skipitem_parity: "
379 "detected mismatch between convertsimple and skipitem "
380 "for format unit '{}' ({}), not skipped {}, skipped {}".format(
381 c, i, when_skipped, when_not_skipped))
382 self.assertIs(when_skipped, when_not_skipped, message)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200383
Jesus Cea6e1d2b62012-10-04 16:06:30 +0200384 def test_parse_tuple_and_keywords(self):
385 # parse_tuple_and_keywords error handling tests
386 self.assertRaises(TypeError, _testcapi.parse_tuple_and_keywords,
387 (), {}, 42, [])
388 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
389 (), {}, b'', 42)
390 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
391 (), {}, b'', [''] * 42)
392 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
393 (), {}, b'', [42])
394
Ezio Melotti29267c82013-02-23 05:52:46 +0200395@unittest.skipUnless(threading, 'Threading required for this test.')
396class TestThreadState(unittest.TestCase):
397
398 @support.reap_threads
399 def test_thread_state(self):
400 # some extra thread-state tests driven via _testcapi
401 def target():
402 idents = []
403
404 def callback():
Ezio Melotti35246bd2013-02-23 05:58:38 +0200405 idents.append(threading.get_ident())
Ezio Melotti29267c82013-02-23 05:52:46 +0200406
407 _testcapi._test_thread_state(callback)
408 a = b = callback
409 time.sleep(1)
410 # Check our main thread is in the list exactly 3 times.
Ezio Melotti35246bd2013-02-23 05:58:38 +0200411 self.assertEqual(idents.count(threading.get_ident()), 3,
Ezio Melotti29267c82013-02-23 05:52:46 +0200412 "Couldn't find main thread correctly in the list")
413
414 target()
415 t = threading.Thread(target=target)
416 t.start()
417 t.join()
418
419
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000420def test_main():
Ezio Melotti29267c82013-02-23 05:52:46 +0200421 support.run_unittest(CAPITest, TestPendingCalls, Test6012,
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000422 EmbeddingTests, SkipitemTest, TestThreadState,
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200423 SubinterpreterTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000424
425 for name in dir(_testcapi):
426 if name.startswith('test_'):
427 test = getattr(_testcapi, name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000428 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000429 print("internal", name)
Collin Winter3add4d72007-08-29 23:37:32 +0000430 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000431
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000432if __name__ == "__main__":
433 test_main()