blob: 91b89514758be45c75118f0ae7c91952bf5acad1 [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00004from __future__ import with_statement
Antoine Pitrou8e605772011-04-25 21:21:07 +02005import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01006import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00007import random
8import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00009import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +000010import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000011import unittest
Nick Coghlan7d270ee2013-10-17 22:35:35 +100012import textwrap
Benjamin Petersonee8712c2008-05-20 21:35:26 +000013from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000014try:
Stefan Krahfd24f9e2012-08-20 11:04:24 +020015 import _posixsubprocess
16except ImportError:
17 _posixsubprocess = None
18try:
Victor Stinner45df8202010-04-28 22:31:17 +000019 import threading
20except ImportError:
21 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000022import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000023
Benjamin Petersona54c9092009-01-13 02:11:23 +000024
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000025def testfunction(self):
26 """some doc"""
27 return self
28
29class InstanceMethod:
30 id = _testcapi.instancemethod(id)
31 testfunction = _testcapi.instancemethod(testfunction)
32
33class CAPITest(unittest.TestCase):
34
35 def test_instancemethod(self):
36 inst = InstanceMethod()
37 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000038 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000039 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
40 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
41
42 InstanceMethod.testfunction.attribute = "test"
43 self.assertEqual(testfunction.attribute, "test")
44 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
45
Stefan Krah0ca46242010-06-09 08:56:28 +000046 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000047 def test_no_FatalError_infinite_loop(self):
Antoine Pitrou77e904e2013-10-08 23:04:32 +020048 with support.SuppressCrashReport():
Ezio Melotti25a40452013-03-05 20:26:17 +020049 p = subprocess.Popen([sys.executable, "-c",
Ezio Melottie1857d92013-03-05 20:31:34 +020050 'import _testcapi;'
51 '_testcapi.crash_no_current_thread()'],
52 stdout=subprocess.PIPE,
53 stderr=subprocess.PIPE)
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000054 (out, err) = p.communicate()
55 self.assertEqual(out, b'')
56 # This used to cause an infinite loop.
Vinay Sajip73954042012-05-06 11:34:50 +010057 self.assertTrue(err.rstrip().startswith(
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000058 b'Fatal Python error:'
Vinay Sajip73954042012-05-06 11:34:50 +010059 b' PyThreadState_Get: no current thread'))
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000060
Antoine Pitrou915605c2011-02-24 20:53:48 +000061 def test_memoryview_from_NULL_pointer(self):
62 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000063
Martin v. Löwisaa2efcb2012-04-19 14:33:43 +020064 def test_exc_info(self):
65 raised_exception = ValueError("5")
66 new_exc = TypeError("TEST")
67 try:
68 raise raised_exception
69 except ValueError as e:
70 tb = e.__traceback__
71 orig_sys_exc_info = sys.exc_info()
72 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
73 new_sys_exc_info = sys.exc_info()
74 new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
75 reset_sys_exc_info = sys.exc_info()
76
77 self.assertEqual(orig_exc_info[1], e)
78
79 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
80 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
81 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
82 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
83 self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
84 else:
85 self.assertTrue(False)
86
Stefan Krahfd24f9e2012-08-20 11:04:24 +020087 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
88 def test_seq_bytes_to_charp_array(self):
89 # Issue #15732: crash in _PySequence_BytesToCharpArray()
90 class Z(object):
91 def __len__(self):
92 return 1
93 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
94 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krah7cacd2e2012-08-21 08:16:09 +020095 # Issue #15736: overflow in _PySequence_BytesToCharpArray()
96 class Z(object):
97 def __len__(self):
98 return sys.maxsize
99 def __getitem__(self, i):
100 return b'x'
101 self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
102 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krahfd24f9e2012-08-20 11:04:24 +0200103
Stefan Krahdb579d72012-08-20 14:36:47 +0200104 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
105 def test_subprocess_fork_exec(self):
106 class Z(object):
107 def __len__(self):
108 return 1
109
110 # Issue #15738: crash in subprocess_fork_exec()
111 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
112 Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
113
Victor Stinner45df8202010-04-28 22:31:17 +0000114@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +0000115class TestPendingCalls(unittest.TestCase):
116
117 def pendingcalls_submit(self, l, n):
118 def callback():
119 #this function can be interrupted by thread switching so let's
120 #use an atomic operation
121 l.append(None)
122
123 for i in range(n):
124 time.sleep(random.random()*0.02) #0.01 secs on average
125 #try submitting callback until successful.
126 #rely on regular interrupt to flush queue if we are
127 #unsuccessful.
128 while True:
129 if _testcapi._pending_threadfunc(callback):
130 break;
131
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000132 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000133 #now, stick around until l[0] has grown to 10
134 count = 0;
135 while len(l) != n:
136 #this busy loop is where we expect to be interrupted to
137 #run our callbacks. Note that callbacks are only run on the
138 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000139 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000140 print("(%i)"%(len(l),),)
141 for i in range(1000):
142 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000143 if context and not context.event.is_set():
144 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000145 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000146 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000147 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000148 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000149 print("(%i)"%(len(l),))
150
151 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000152
153 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000154 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000155 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000156 class foo(object):pass
157 context = foo()
158 context.l = []
159 context.n = 2 #submits per thread
160 context.nThreads = n // context.n
161 context.nFinished = 0
162 context.lock = threading.Lock()
163 context.event = threading.Event()
164
165 for i in range(context.nThreads):
166 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000167 t.start()
168 threads.append(t)
169
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000170 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000171
172 for t in threads:
173 t.join()
174
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000175 def pendingcalls_thread(self, context):
176 try:
177 self.pendingcalls_submit(context.l, context.n)
178 finally:
179 with context.lock:
180 context.nFinished += 1
181 nFinished = context.nFinished
182 if False and support.verbose:
183 print("finished threads: ", nFinished)
184 if nFinished == context.nThreads:
185 context.event.set()
186
Benjamin Petersona54c9092009-01-13 02:11:23 +0000187 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200188 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000189 #once. It is ok to ask for too many, because we loop until we find a slot.
190 #the loop can be interrupted to dispatch.
191 #there are only 32 dispatch slots, so we go for twice that!
192 l = []
193 n = 64
194 self.pendingcalls_submit(l, n)
195 self.pendingcalls_wait(l, n)
196
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200197
198class SubinterpreterTest(unittest.TestCase):
199
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100200 def test_subinterps(self):
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100201 import builtins
202 r, w = os.pipe()
203 code = """if 1:
204 import sys, builtins, pickle
205 with open({:d}, "wb") as f:
206 pickle.dump(id(sys.modules), f)
207 pickle.dump(id(builtins), f)
208 """.format(w)
209 with open(r, "rb") as f:
210 ret = _testcapi.run_in_subinterp(code)
211 self.assertEqual(ret, 0)
212 self.assertNotEqual(pickle.load(f), id(sys.modules))
213 self.assertNotEqual(pickle.load(f), id(builtins))
214
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200215
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000216# Bug #6012
217class Test6012(unittest.TestCase):
218 def test(self):
219 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000220
Antoine Pitrou8e605772011-04-25 21:21:07 +0200221
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000222@unittest.skipIf(
223 sys.platform.startswith('win'),
224 "interpreter embedding tests aren't built under Windows")
225class EmbeddingTests(unittest.TestCase):
226 # XXX only tested under Unix checkouts
Antoine Pitrou8e605772011-04-25 21:21:07 +0200227
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000228 def setUp(self):
Antoine Pitrou8e605772011-04-25 21:21:07 +0200229 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000230 self.test_exe = exe = os.path.join(basepath, "Modules", "_testembed")
231 if not os.path.exists(exe):
232 self.skipTest("%r doesn't exist" % exe)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200233 # This is needed otherwise we get a fatal error:
234 # "Py_Initialize: Unable to get the locale encoding
235 # LookupError: no codec search functions registered: can't find encoding"
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000236 self.oldcwd = os.getcwd()
Antoine Pitrou8e605772011-04-25 21:21:07 +0200237 os.chdir(basepath)
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000238
239 def tearDown(self):
240 os.chdir(self.oldcwd)
241
242 def run_embedded_interpreter(self, *args):
243 """Runs a test in the embedded interpreter"""
244 cmd = [self.test_exe]
245 cmd.extend(args)
246 p = subprocess.Popen(cmd,
247 stdout=subprocess.PIPE,
248 stderr=subprocess.PIPE)
249 (out, err) = p.communicate()
250 self.assertEqual(p.returncode, 0,
251 "bad returncode %d, stderr is %r" %
252 (p.returncode, err))
253 return out.decode("latin1"), err.decode("latin1")
254
255 def test_subinterps(self):
256 # This is just a "don't crash" test
257 out, err = self.run_embedded_interpreter()
258 if support.verbose:
259 print()
260 print(out)
261 print(err)
262
263 def test_forced_io_encoding(self):
264 # Checks forced configuration of embedded interpreter IO streams
265 out, err = self.run_embedded_interpreter("forced_io_encoding")
266 if support.verbose:
267 print()
268 print(out)
269 print(err)
270 expected_output = textwrap.dedent("""\
271 --- Use defaults ---
272 Expected encoding: default
273 Expected errors: default
274 stdin: {0.stdin.encoding}:strict
275 stdout: {0.stdout.encoding}:strict
276 stderr: {0.stderr.encoding}:backslashreplace
277 --- Set errors only ---
278 Expected encoding: default
279 Expected errors: surrogateescape
280 stdin: {0.stdin.encoding}:surrogateescape
281 stdout: {0.stdout.encoding}:surrogateescape
282 stderr: {0.stderr.encoding}:backslashreplace
283 --- Set encoding only ---
284 Expected encoding: latin-1
285 Expected errors: default
286 stdin: latin-1:strict
287 stdout: latin-1:strict
288 stderr: latin-1:backslashreplace
289 --- Set encoding and errors ---
290 Expected encoding: latin-1
291 Expected errors: surrogateescape
292 stdin: latin-1:surrogateescape
293 stdout: latin-1:surrogateescape
294 stderr: latin-1:backslashreplace""").format(sys)
Nick Coghlan6508dc52013-10-18 01:44:22 +1000295 # Looks like this overspecifies the output :(
296 self.maxDiff = None
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000297 self.assertEqual(out.strip(), expected_output)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200298
Larry Hastings8f904da2012-06-22 03:56:29 -0700299class SkipitemTest(unittest.TestCase):
300
301 def test_skipitem(self):
302 """
303 If this test failed, you probably added a new "format unit"
304 in Python/getargs.c, but neglected to update our poor friend
305 skipitem() in the same file. (If so, shame on you!)
306
Larry Hastings48ed3602012-06-22 12:58:36 -0700307 With a few exceptions**, this function brute-force tests all
308 printable ASCII*** characters (32 to 126 inclusive) as format units,
309 checking to see that PyArg_ParseTupleAndKeywords() return consistent
310 errors both when the unit is attempted to be used and when it is
311 skipped. If the format unit doesn't exist, we'll get one of two
312 specific error messages (one for used, one for skipped); if it does
313 exist we *won't* get that error--we'll get either no error or some
314 other error. If we get the specific "does not exist" error for one
315 test and not for the other, there's a mismatch, and the test fails.
Larry Hastings8f904da2012-06-22 03:56:29 -0700316
Larry Hastings48ed3602012-06-22 12:58:36 -0700317 ** Some format units have special funny semantics and it would
318 be difficult to accomodate them here. Since these are all
319 well-established and properly skipped in skipitem() we can
320 get away with not testing them--this test is really intended
321 to catch *new* format units.
322
323 *** Python C source files must be ASCII. Therefore it's impossible
324 to have non-ASCII format units.
325
Larry Hastings8f904da2012-06-22 03:56:29 -0700326 """
327 empty_tuple = ()
328 tuple_1 = (0,)
329 dict_b = {'b':1}
330 keywords = ["a", "b"]
331
Larry Hastings48ed3602012-06-22 12:58:36 -0700332 for i in range(32, 127):
Larry Hastings8f904da2012-06-22 03:56:29 -0700333 c = chr(i)
334
Larry Hastings8f904da2012-06-22 03:56:29 -0700335 # skip parentheses, the error reporting is inconsistent about them
336 # skip 'e', it's always a two-character code
337 # skip '|' and '$', they don't represent arguments anyway
Larry Hastings48ed3602012-06-22 12:58:36 -0700338 if c in '()e|$':
Larry Hastings8f904da2012-06-22 03:56:29 -0700339 continue
340
341 # test the format unit when not skipped
342 format = c + "i"
343 try:
344 # (note: the format string must be bytes!)
345 _testcapi.parse_tuple_and_keywords(tuple_1, dict_b,
346 format.encode("ascii"), keywords)
347 when_not_skipped = False
348 except TypeError as e:
349 s = "argument 1 must be impossible<bad format char>, not int"
350 when_not_skipped = (str(e) == s)
351 except RuntimeError as e:
352 when_not_skipped = False
353
354 # test the format unit when skipped
355 optional_format = "|" + format
356 try:
357 _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b,
358 optional_format.encode("ascii"), keywords)
359 when_skipped = False
360 except RuntimeError as e:
361 s = "impossible<bad format char>: '{}'".format(format)
362 when_skipped = (str(e) == s)
363
364 message = ("test_skipitem_parity: "
365 "detected mismatch between convertsimple and skipitem "
366 "for format unit '{}' ({}), not skipped {}, skipped {}".format(
367 c, i, when_skipped, when_not_skipped))
368 self.assertIs(when_skipped, when_not_skipped, message)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200369
Jesus Cea6e1d2b62012-10-04 16:06:30 +0200370 def test_parse_tuple_and_keywords(self):
371 # parse_tuple_and_keywords error handling tests
372 self.assertRaises(TypeError, _testcapi.parse_tuple_and_keywords,
373 (), {}, 42, [])
374 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
375 (), {}, b'', 42)
376 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
377 (), {}, b'', [''] * 42)
378 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
379 (), {}, b'', [42])
380
Ezio Melotti29267c82013-02-23 05:52:46 +0200381@unittest.skipUnless(threading, 'Threading required for this test.')
382class TestThreadState(unittest.TestCase):
383
384 @support.reap_threads
385 def test_thread_state(self):
386 # some extra thread-state tests driven via _testcapi
387 def target():
388 idents = []
389
390 def callback():
Ezio Melotti35246bd2013-02-23 05:58:38 +0200391 idents.append(threading.get_ident())
Ezio Melotti29267c82013-02-23 05:52:46 +0200392
393 _testcapi._test_thread_state(callback)
394 a = b = callback
395 time.sleep(1)
396 # Check our main thread is in the list exactly 3 times.
Ezio Melotti35246bd2013-02-23 05:58:38 +0200397 self.assertEqual(idents.count(threading.get_ident()), 3,
Ezio Melotti29267c82013-02-23 05:52:46 +0200398 "Couldn't find main thread correctly in the list")
399
400 target()
401 t = threading.Thread(target=target)
402 t.start()
403 t.join()
404
405
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000406def test_main():
Ezio Melotti29267c82013-02-23 05:52:46 +0200407 support.run_unittest(CAPITest, TestPendingCalls, Test6012,
Nick Coghlan7d270ee2013-10-17 22:35:35 +1000408 EmbeddingTests, SkipitemTest, TestThreadState,
Antoine Pitrou7a2572c2013-08-01 20:43:26 +0200409 SubinterpreterTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000410
411 for name in dir(_testcapi):
412 if name.startswith('test_'):
413 test = getattr(_testcapi, name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000414 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000415 print("internal", name)
Collin Winter3add4d72007-08-29 23:37:32 +0000416 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000417
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000418if __name__ == "__main__":
419 test_main()