blob: 5002c77b6a1eba80935ed351888667d29a6a2371 [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00004from __future__ import with_statement
Antoine Pitrou8e605772011-04-25 21:21:07 +02005import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01006import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00007import random
8import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00009import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +000010import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000011import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +000012from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000013try:
Stefan Krahfd24f9e2012-08-20 11:04:24 +020014 import _posixsubprocess
15except ImportError:
16 _posixsubprocess = None
17try:
Victor Stinner45df8202010-04-28 22:31:17 +000018 import threading
19except ImportError:
20 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000021import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000022
Benjamin Petersona54c9092009-01-13 02:11:23 +000023
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000024def testfunction(self):
25 """some doc"""
26 return self
27
28class InstanceMethod:
29 id = _testcapi.instancemethod(id)
30 testfunction = _testcapi.instancemethod(testfunction)
31
32class CAPITest(unittest.TestCase):
33
34 def test_instancemethod(self):
35 inst = InstanceMethod()
36 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000037 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000038 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
39 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
40
41 InstanceMethod.testfunction.attribute = "test"
42 self.assertEqual(testfunction.attribute, "test")
43 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
44
Stefan Krah0ca46242010-06-09 08:56:28 +000045 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000046 def test_no_FatalError_infinite_loop(self):
47 p = subprocess.Popen([sys.executable, "-c",
48 'import _testcapi;'
49 '_testcapi.crash_no_current_thread()'],
50 stdout=subprocess.PIPE,
51 stderr=subprocess.PIPE)
52 (out, err) = p.communicate()
53 self.assertEqual(out, b'')
54 # This used to cause an infinite loop.
Vinay Sajip73954042012-05-06 11:34:50 +010055 self.assertTrue(err.rstrip().startswith(
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000056 b'Fatal Python error:'
Vinay Sajip73954042012-05-06 11:34:50 +010057 b' PyThreadState_Get: no current thread'))
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000058
Antoine Pitrou915605c2011-02-24 20:53:48 +000059 def test_memoryview_from_NULL_pointer(self):
60 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000061
Martin v. Löwisaa2efcb2012-04-19 14:33:43 +020062 def test_exc_info(self):
63 raised_exception = ValueError("5")
64 new_exc = TypeError("TEST")
65 try:
66 raise raised_exception
67 except ValueError as e:
68 tb = e.__traceback__
69 orig_sys_exc_info = sys.exc_info()
70 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
71 new_sys_exc_info = sys.exc_info()
72 new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
73 reset_sys_exc_info = sys.exc_info()
74
75 self.assertEqual(orig_exc_info[1], e)
76
77 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
78 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
79 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
80 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
81 self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
82 else:
83 self.assertTrue(False)
84
Stefan Krahfd24f9e2012-08-20 11:04:24 +020085 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
86 def test_seq_bytes_to_charp_array(self):
87 # Issue #15732: crash in _PySequence_BytesToCharpArray()
88 class Z(object):
89 def __len__(self):
90 return 1
91 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
92 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krah7cacd2e2012-08-21 08:16:09 +020093 # Issue #15736: overflow in _PySequence_BytesToCharpArray()
94 class Z(object):
95 def __len__(self):
96 return sys.maxsize
97 def __getitem__(self, i):
98 return b'x'
99 self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
100 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krahfd24f9e2012-08-20 11:04:24 +0200101
Stefan Krahdb579d72012-08-20 14:36:47 +0200102 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
103 def test_subprocess_fork_exec(self):
104 class Z(object):
105 def __len__(self):
106 return 1
107
108 # Issue #15738: crash in subprocess_fork_exec()
109 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
110 Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
111
Victor Stinner45df8202010-04-28 22:31:17 +0000112@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +0000113class TestPendingCalls(unittest.TestCase):
114
115 def pendingcalls_submit(self, l, n):
116 def callback():
117 #this function can be interrupted by thread switching so let's
118 #use an atomic operation
119 l.append(None)
120
121 for i in range(n):
122 time.sleep(random.random()*0.02) #0.01 secs on average
123 #try submitting callback until successful.
124 #rely on regular interrupt to flush queue if we are
125 #unsuccessful.
126 while True:
127 if _testcapi._pending_threadfunc(callback):
128 break;
129
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000130 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000131 #now, stick around until l[0] has grown to 10
132 count = 0;
133 while len(l) != n:
134 #this busy loop is where we expect to be interrupted to
135 #run our callbacks. Note that callbacks are only run on the
136 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000137 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000138 print("(%i)"%(len(l),),)
139 for i in range(1000):
140 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000141 if context and not context.event.is_set():
142 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000143 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000144 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000145 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000146 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000147 print("(%i)"%(len(l),))
148
149 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000150
151 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000152 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000153 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000154 class foo(object):pass
155 context = foo()
156 context.l = []
157 context.n = 2 #submits per thread
158 context.nThreads = n // context.n
159 context.nFinished = 0
160 context.lock = threading.Lock()
161 context.event = threading.Event()
162
163 for i in range(context.nThreads):
164 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000165 t.start()
166 threads.append(t)
167
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000168 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000169
170 for t in threads:
171 t.join()
172
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000173 def pendingcalls_thread(self, context):
174 try:
175 self.pendingcalls_submit(context.l, context.n)
176 finally:
177 with context.lock:
178 context.nFinished += 1
179 nFinished = context.nFinished
180 if False and support.verbose:
181 print("finished threads: ", nFinished)
182 if nFinished == context.nThreads:
183 context.event.set()
184
Benjamin Petersona54c9092009-01-13 02:11:23 +0000185 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200186 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000187 #once. It is ok to ask for too many, because we loop until we find a slot.
188 #the loop can be interrupted to dispatch.
189 #there are only 32 dispatch slots, so we go for twice that!
190 l = []
191 n = 64
192 self.pendingcalls_submit(l, n)
193 self.pendingcalls_wait(l, n)
194
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100195 def test_subinterps(self):
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100196 import builtins
197 r, w = os.pipe()
198 code = """if 1:
199 import sys, builtins, pickle
200 with open({:d}, "wb") as f:
201 pickle.dump(id(sys.modules), f)
202 pickle.dump(id(builtins), f)
203 """.format(w)
204 with open(r, "rb") as f:
205 ret = _testcapi.run_in_subinterp(code)
206 self.assertEqual(ret, 0)
207 self.assertNotEqual(pickle.load(f), id(sys.modules))
208 self.assertNotEqual(pickle.load(f), id(builtins))
209
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000210# Bug #6012
211class Test6012(unittest.TestCase):
212 def test(self):
213 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000214
Antoine Pitrou8e605772011-04-25 21:21:07 +0200215
216class EmbeddingTest(unittest.TestCase):
217
Antoine Pitrou71cbafb2011-06-30 20:02:54 +0200218 @unittest.skipIf(
219 sys.platform.startswith('win'),
220 "test doesn't work under Windows")
Antoine Pitrou8e605772011-04-25 21:21:07 +0200221 def test_subinterps(self):
222 # XXX only tested under Unix checkouts
223 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
224 oldcwd = os.getcwd()
225 # This is needed otherwise we get a fatal error:
226 # "Py_Initialize: Unable to get the locale encoding
227 # LookupError: no codec search functions registered: can't find encoding"
228 os.chdir(basepath)
229 try:
230 exe = os.path.join(basepath, "Modules", "_testembed")
231 if not os.path.exists(exe):
232 self.skipTest("%r doesn't exist" % exe)
233 p = subprocess.Popen([exe],
234 stdout=subprocess.PIPE,
235 stderr=subprocess.PIPE)
236 (out, err) = p.communicate()
237 self.assertEqual(p.returncode, 0,
238 "bad returncode %d, stderr is %r" %
239 (p.returncode, err))
240 if support.verbose:
241 print()
242 print(out.decode('latin1'))
243 print(err.decode('latin1'))
244 finally:
245 os.chdir(oldcwd)
246
Larry Hastings8f904da2012-06-22 03:56:29 -0700247class SkipitemTest(unittest.TestCase):
248
249 def test_skipitem(self):
250 """
251 If this test failed, you probably added a new "format unit"
252 in Python/getargs.c, but neglected to update our poor friend
253 skipitem() in the same file. (If so, shame on you!)
254
Larry Hastings48ed3602012-06-22 12:58:36 -0700255 With a few exceptions**, this function brute-force tests all
256 printable ASCII*** characters (32 to 126 inclusive) as format units,
257 checking to see that PyArg_ParseTupleAndKeywords() return consistent
258 errors both when the unit is attempted to be used and when it is
259 skipped. If the format unit doesn't exist, we'll get one of two
260 specific error messages (one for used, one for skipped); if it does
261 exist we *won't* get that error--we'll get either no error or some
262 other error. If we get the specific "does not exist" error for one
263 test and not for the other, there's a mismatch, and the test fails.
Larry Hastings8f904da2012-06-22 03:56:29 -0700264
Larry Hastings48ed3602012-06-22 12:58:36 -0700265 ** Some format units have special funny semantics and it would
266 be difficult to accomodate them here. Since these are all
267 well-established and properly skipped in skipitem() we can
268 get away with not testing them--this test is really intended
269 to catch *new* format units.
270
271 *** Python C source files must be ASCII. Therefore it's impossible
272 to have non-ASCII format units.
273
Larry Hastings8f904da2012-06-22 03:56:29 -0700274 """
275 empty_tuple = ()
276 tuple_1 = (0,)
277 dict_b = {'b':1}
278 keywords = ["a", "b"]
279
Larry Hastings48ed3602012-06-22 12:58:36 -0700280 for i in range(32, 127):
Larry Hastings8f904da2012-06-22 03:56:29 -0700281 c = chr(i)
282
Larry Hastings8f904da2012-06-22 03:56:29 -0700283 # skip parentheses, the error reporting is inconsistent about them
284 # skip 'e', it's always a two-character code
285 # skip '|' and '$', they don't represent arguments anyway
Larry Hastings48ed3602012-06-22 12:58:36 -0700286 if c in '()e|$':
Larry Hastings8f904da2012-06-22 03:56:29 -0700287 continue
288
289 # test the format unit when not skipped
290 format = c + "i"
291 try:
292 # (note: the format string must be bytes!)
293 _testcapi.parse_tuple_and_keywords(tuple_1, dict_b,
294 format.encode("ascii"), keywords)
295 when_not_skipped = False
296 except TypeError as e:
297 s = "argument 1 must be impossible<bad format char>, not int"
298 when_not_skipped = (str(e) == s)
299 except RuntimeError as e:
300 when_not_skipped = False
301
302 # test the format unit when skipped
303 optional_format = "|" + format
304 try:
305 _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b,
306 optional_format.encode("ascii"), keywords)
307 when_skipped = False
308 except RuntimeError as e:
309 s = "impossible<bad format char>: '{}'".format(format)
310 when_skipped = (str(e) == s)
311
312 message = ("test_skipitem_parity: "
313 "detected mismatch between convertsimple and skipitem "
314 "for format unit '{}' ({}), not skipped {}, skipped {}".format(
315 c, i, when_skipped, when_not_skipped))
316 self.assertIs(when_skipped, when_not_skipped, message)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200317
Jesus Cea6e1d2b62012-10-04 16:06:30 +0200318 def test_parse_tuple_and_keywords(self):
319 # parse_tuple_and_keywords error handling tests
320 self.assertRaises(TypeError, _testcapi.parse_tuple_and_keywords,
321 (), {}, 42, [])
322 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
323 (), {}, b'', 42)
324 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
325 (), {}, b'', [''] * 42)
326 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
327 (), {}, b'', [42])
328
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000329def test_main():
Larry Hastings8f904da2012-06-22 03:56:29 -0700330 support.run_unittest(CAPITest, TestPendingCalls,
331 Test6012, EmbeddingTest, SkipitemTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000332
333 for name in dir(_testcapi):
334 if name.startswith('test_'):
335 test = getattr(_testcapi, name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000336 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000337 print("internal", name)
Collin Winter3add4d72007-08-29 23:37:32 +0000338 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000339
340 # some extra thread-state tests driven via _testcapi
341 def TestThreadState():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000342 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000343 print("auto-thread-state")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000344
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000345 idents = []
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000346
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000347 def callback():
Victor Stinner2a129742011-05-30 23:02:52 +0200348 idents.append(threading.get_ident())
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000349
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000350 _testcapi._test_thread_state(callback)
351 a = b = callback
352 time.sleep(1)
353 # Check our main thread is in the list exactly 3 times.
Victor Stinner2a129742011-05-30 23:02:52 +0200354 if idents.count(threading.get_ident()) != 3:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000355 raise support.TestFailed(
Collin Winter3add4d72007-08-29 23:37:32 +0000356 "Couldn't find main thread correctly in the list")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000357
Victor Stinner45df8202010-04-28 22:31:17 +0000358 if threading:
Christian Heimes5e696852008-04-09 08:37:03 +0000359 import time
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000360 TestThreadState()
Georg Brandl2067bfd2008-05-25 13:05:15 +0000361 t = threading.Thread(target=TestThreadState)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000362 t.start()
363 t.join()
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000364
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +0000365
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000366if __name__ == "__main__":
367 test_main()