blob: 2692b89d77712fb7001d724a8ed31f81d46aff58 [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00004from __future__ import with_statement
Antoine Pitrou8e605772011-04-25 21:21:07 +02005import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01006import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00007import random
8import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00009import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +000010import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000011import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +000012from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000013try:
14 import threading
15except ImportError:
16 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000017import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000018
Benjamin Petersona54c9092009-01-13 02:11:23 +000019
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000020def testfunction(self):
21 """some doc"""
22 return self
23
24class InstanceMethod:
25 id = _testcapi.instancemethod(id)
26 testfunction = _testcapi.instancemethod(testfunction)
27
28class CAPITest(unittest.TestCase):
29
30 def test_instancemethod(self):
31 inst = InstanceMethod()
32 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000033 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000034 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
35 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
36
37 InstanceMethod.testfunction.attribute = "test"
38 self.assertEqual(testfunction.attribute, "test")
39 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
40
Stefan Krah0ca46242010-06-09 08:56:28 +000041 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000042 def test_no_FatalError_infinite_loop(self):
43 p = subprocess.Popen([sys.executable, "-c",
44 'import _testcapi;'
45 '_testcapi.crash_no_current_thread()'],
46 stdout=subprocess.PIPE,
47 stderr=subprocess.PIPE)
48 (out, err) = p.communicate()
49 self.assertEqual(out, b'')
50 # This used to cause an infinite loop.
Vinay Sajip73954042012-05-06 11:34:50 +010051 self.assertTrue(err.rstrip().startswith(
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000052 b'Fatal Python error:'
Vinay Sajip73954042012-05-06 11:34:50 +010053 b' PyThreadState_Get: no current thread'))
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000054
Antoine Pitrou915605c2011-02-24 20:53:48 +000055 def test_memoryview_from_NULL_pointer(self):
56 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000057
Martin v. Löwisaa2efcb2012-04-19 14:33:43 +020058 def test_exc_info(self):
59 raised_exception = ValueError("5")
60 new_exc = TypeError("TEST")
61 try:
62 raise raised_exception
63 except ValueError as e:
64 tb = e.__traceback__
65 orig_sys_exc_info = sys.exc_info()
66 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
67 new_sys_exc_info = sys.exc_info()
68 new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
69 reset_sys_exc_info = sys.exc_info()
70
71 self.assertEqual(orig_exc_info[1], e)
72
73 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
74 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
75 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
76 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
77 self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
78 else:
79 self.assertTrue(False)
80
Victor Stinner45df8202010-04-28 22:31:17 +000081@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +000082class TestPendingCalls(unittest.TestCase):
83
84 def pendingcalls_submit(self, l, n):
85 def callback():
86 #this function can be interrupted by thread switching so let's
87 #use an atomic operation
88 l.append(None)
89
90 for i in range(n):
91 time.sleep(random.random()*0.02) #0.01 secs on average
92 #try submitting callback until successful.
93 #rely on regular interrupt to flush queue if we are
94 #unsuccessful.
95 while True:
96 if _testcapi._pending_threadfunc(callback):
97 break;
98
Benjamin Petersone1cdfd72009-01-18 21:02:37 +000099 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000100 #now, stick around until l[0] has grown to 10
101 count = 0;
102 while len(l) != n:
103 #this busy loop is where we expect to be interrupted to
104 #run our callbacks. Note that callbacks are only run on the
105 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000106 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000107 print("(%i)"%(len(l),),)
108 for i in range(1000):
109 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000110 if context and not context.event.is_set():
111 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000112 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000113 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000114 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000115 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000116 print("(%i)"%(len(l),))
117
118 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000119
120 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000121 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000122 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000123 class foo(object):pass
124 context = foo()
125 context.l = []
126 context.n = 2 #submits per thread
127 context.nThreads = n // context.n
128 context.nFinished = 0
129 context.lock = threading.Lock()
130 context.event = threading.Event()
131
132 for i in range(context.nThreads):
133 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000134 t.start()
135 threads.append(t)
136
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000137 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000138
139 for t in threads:
140 t.join()
141
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000142 def pendingcalls_thread(self, context):
143 try:
144 self.pendingcalls_submit(context.l, context.n)
145 finally:
146 with context.lock:
147 context.nFinished += 1
148 nFinished = context.nFinished
149 if False and support.verbose:
150 print("finished threads: ", nFinished)
151 if nFinished == context.nThreads:
152 context.event.set()
153
Benjamin Petersona54c9092009-01-13 02:11:23 +0000154 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200155 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000156 #once. It is ok to ask for too many, because we loop until we find a slot.
157 #the loop can be interrupted to dispatch.
158 #there are only 32 dispatch slots, so we go for twice that!
159 l = []
160 n = 64
161 self.pendingcalls_submit(l, n)
162 self.pendingcalls_wait(l, n)
163
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100164 def test_subinterps(self):
165 # XXX this test leaks in refleak runs
166 import builtins
167 r, w = os.pipe()
168 code = """if 1:
169 import sys, builtins, pickle
170 with open({:d}, "wb") as f:
171 pickle.dump(id(sys.modules), f)
172 pickle.dump(id(builtins), f)
173 """.format(w)
174 with open(r, "rb") as f:
175 ret = _testcapi.run_in_subinterp(code)
176 self.assertEqual(ret, 0)
177 self.assertNotEqual(pickle.load(f), id(sys.modules))
178 self.assertNotEqual(pickle.load(f), id(builtins))
179
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000180# Bug #6012
181class Test6012(unittest.TestCase):
182 def test(self):
183 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000184
Antoine Pitrou8e605772011-04-25 21:21:07 +0200185
186class EmbeddingTest(unittest.TestCase):
187
Antoine Pitrou71cbafb2011-06-30 20:02:54 +0200188 @unittest.skipIf(
189 sys.platform.startswith('win'),
190 "test doesn't work under Windows")
Antoine Pitrou8e605772011-04-25 21:21:07 +0200191 def test_subinterps(self):
192 # XXX only tested under Unix checkouts
193 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
194 oldcwd = os.getcwd()
195 # This is needed otherwise we get a fatal error:
196 # "Py_Initialize: Unable to get the locale encoding
197 # LookupError: no codec search functions registered: can't find encoding"
198 os.chdir(basepath)
199 try:
200 exe = os.path.join(basepath, "Modules", "_testembed")
201 if not os.path.exists(exe):
202 self.skipTest("%r doesn't exist" % exe)
203 p = subprocess.Popen([exe],
204 stdout=subprocess.PIPE,
205 stderr=subprocess.PIPE)
206 (out, err) = p.communicate()
207 self.assertEqual(p.returncode, 0,
208 "bad returncode %d, stderr is %r" %
209 (p.returncode, err))
210 if support.verbose:
211 print()
212 print(out.decode('latin1'))
213 print(err.decode('latin1'))
214 finally:
215 os.chdir(oldcwd)
216
Larry Hastings8f904da2012-06-22 03:56:29 -0700217class SkipitemTest(unittest.TestCase):
218
219 def test_skipitem(self):
220 """
221 If this test failed, you probably added a new "format unit"
222 in Python/getargs.c, but neglected to update our poor friend
223 skipitem() in the same file. (If so, shame on you!)
224
Larry Hastings48ed3602012-06-22 12:58:36 -0700225 With a few exceptions**, this function brute-force tests all
226 printable ASCII*** characters (32 to 126 inclusive) as format units,
227 checking to see that PyArg_ParseTupleAndKeywords() return consistent
228 errors both when the unit is attempted to be used and when it is
229 skipped. If the format unit doesn't exist, we'll get one of two
230 specific error messages (one for used, one for skipped); if it does
231 exist we *won't* get that error--we'll get either no error or some
232 other error. If we get the specific "does not exist" error for one
233 test and not for the other, there's a mismatch, and the test fails.
Larry Hastings8f904da2012-06-22 03:56:29 -0700234
Larry Hastings48ed3602012-06-22 12:58:36 -0700235 ** Some format units have special funny semantics and it would
236 be difficult to accomodate them here. Since these are all
237 well-established and properly skipped in skipitem() we can
238 get away with not testing them--this test is really intended
239 to catch *new* format units.
240
241 *** Python C source files must be ASCII. Therefore it's impossible
242 to have non-ASCII format units.
243
Larry Hastings8f904da2012-06-22 03:56:29 -0700244 """
245 empty_tuple = ()
246 tuple_1 = (0,)
247 dict_b = {'b':1}
248 keywords = ["a", "b"]
249
Larry Hastings48ed3602012-06-22 12:58:36 -0700250 for i in range(32, 127):
Larry Hastings8f904da2012-06-22 03:56:29 -0700251 c = chr(i)
252
Larry Hastings8f904da2012-06-22 03:56:29 -0700253 # skip parentheses, the error reporting is inconsistent about them
254 # skip 'e', it's always a two-character code
255 # skip '|' and '$', they don't represent arguments anyway
Larry Hastings48ed3602012-06-22 12:58:36 -0700256 if c in '()e|$':
Larry Hastings8f904da2012-06-22 03:56:29 -0700257 continue
258
259 # test the format unit when not skipped
260 format = c + "i"
261 try:
262 # (note: the format string must be bytes!)
263 _testcapi.parse_tuple_and_keywords(tuple_1, dict_b,
264 format.encode("ascii"), keywords)
265 when_not_skipped = False
266 except TypeError as e:
267 s = "argument 1 must be impossible<bad format char>, not int"
268 when_not_skipped = (str(e) == s)
269 except RuntimeError as e:
270 when_not_skipped = False
271
272 # test the format unit when skipped
273 optional_format = "|" + format
274 try:
275 _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b,
276 optional_format.encode("ascii"), keywords)
277 when_skipped = False
278 except RuntimeError as e:
279 s = "impossible<bad format char>: '{}'".format(format)
280 when_skipped = (str(e) == s)
281
282 message = ("test_skipitem_parity: "
283 "detected mismatch between convertsimple and skipitem "
284 "for format unit '{}' ({}), not skipped {}, skipped {}".format(
285 c, i, when_skipped, when_not_skipped))
286 self.assertIs(when_skipped, when_not_skipped, message)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200287
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000288def test_main():
Larry Hastings8f904da2012-06-22 03:56:29 -0700289 support.run_unittest(CAPITest, TestPendingCalls,
290 Test6012, EmbeddingTest, SkipitemTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000291
292 for name in dir(_testcapi):
293 if name.startswith('test_'):
294 test = getattr(_testcapi, name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000295 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000296 print("internal", name)
Collin Winter3add4d72007-08-29 23:37:32 +0000297 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000298
299 # some extra thread-state tests driven via _testcapi
300 def TestThreadState():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000301 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000302 print("auto-thread-state")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000303
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000304 idents = []
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000305
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000306 def callback():
Victor Stinner2a129742011-05-30 23:02:52 +0200307 idents.append(threading.get_ident())
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000308
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000309 _testcapi._test_thread_state(callback)
310 a = b = callback
311 time.sleep(1)
312 # Check our main thread is in the list exactly 3 times.
Victor Stinner2a129742011-05-30 23:02:52 +0200313 if idents.count(threading.get_ident()) != 3:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000314 raise support.TestFailed(
Collin Winter3add4d72007-08-29 23:37:32 +0000315 "Couldn't find main thread correctly in the list")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000316
Victor Stinner45df8202010-04-28 22:31:17 +0000317 if threading:
Christian Heimes5e696852008-04-09 08:37:03 +0000318 import time
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000319 TestThreadState()
Georg Brandl2067bfd2008-05-25 13:05:15 +0000320 t = threading.Thread(target=TestThreadState)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000321 t.start()
322 t.join()
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000323
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +0000324
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000325if __name__ == "__main__":
326 test_main()