blob: e24bd6fb8e212b485c5cc8cb4196731617161083 [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00004from __future__ import with_statement
Antoine Pitrou8e605772011-04-25 21:21:07 +02005import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01006import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00007import random
8import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00009import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +000010import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000011import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +000012from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000013try:
14 import threading
15except ImportError:
16 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000017import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000018
Benjamin Petersona54c9092009-01-13 02:11:23 +000019
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000020def testfunction(self):
21 """some doc"""
22 return self
23
24class InstanceMethod:
25 id = _testcapi.instancemethod(id)
26 testfunction = _testcapi.instancemethod(testfunction)
27
28class CAPITest(unittest.TestCase):
29
30 def test_instancemethod(self):
31 inst = InstanceMethod()
32 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000033 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000034 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
35 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
36
37 InstanceMethod.testfunction.attribute = "test"
38 self.assertEqual(testfunction.attribute, "test")
39 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
40
Stefan Krah0ca46242010-06-09 08:56:28 +000041 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000042 def test_no_FatalError_infinite_loop(self):
43 p = subprocess.Popen([sys.executable, "-c",
44 'import _testcapi;'
45 '_testcapi.crash_no_current_thread()'],
46 stdout=subprocess.PIPE,
47 stderr=subprocess.PIPE)
48 (out, err) = p.communicate()
49 self.assertEqual(out, b'')
50 # This used to cause an infinite loop.
Vinay Sajip73954042012-05-06 11:34:50 +010051 self.assertTrue(err.rstrip().startswith(
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000052 b'Fatal Python error:'
Vinay Sajip73954042012-05-06 11:34:50 +010053 b' PyThreadState_Get: no current thread'))
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000054
Antoine Pitrou915605c2011-02-24 20:53:48 +000055 def test_memoryview_from_NULL_pointer(self):
56 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000057
Martin v. Löwisaa2efcb2012-04-19 14:33:43 +020058 def test_exc_info(self):
59 raised_exception = ValueError("5")
60 new_exc = TypeError("TEST")
61 try:
62 raise raised_exception
63 except ValueError as e:
64 tb = e.__traceback__
65 orig_sys_exc_info = sys.exc_info()
66 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
67 new_sys_exc_info = sys.exc_info()
68 new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
69 reset_sys_exc_info = sys.exc_info()
70
71 self.assertEqual(orig_exc_info[1], e)
72
73 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
74 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
75 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
76 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
77 self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
78 else:
79 self.assertTrue(False)
80
Victor Stinner45df8202010-04-28 22:31:17 +000081@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +000082class TestPendingCalls(unittest.TestCase):
83
84 def pendingcalls_submit(self, l, n):
85 def callback():
86 #this function can be interrupted by thread switching so let's
87 #use an atomic operation
88 l.append(None)
89
90 for i in range(n):
91 time.sleep(random.random()*0.02) #0.01 secs on average
92 #try submitting callback until successful.
93 #rely on regular interrupt to flush queue if we are
94 #unsuccessful.
95 while True:
96 if _testcapi._pending_threadfunc(callback):
97 break;
98
Benjamin Petersone1cdfd72009-01-18 21:02:37 +000099 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000100 #now, stick around until l[0] has grown to 10
101 count = 0;
102 while len(l) != n:
103 #this busy loop is where we expect to be interrupted to
104 #run our callbacks. Note that callbacks are only run on the
105 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000106 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000107 print("(%i)"%(len(l),),)
108 for i in range(1000):
109 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000110 if context and not context.event.is_set():
111 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000112 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000113 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000114 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000115 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000116 print("(%i)"%(len(l),))
117
118 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000119
120 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000121 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000122 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000123 class foo(object):pass
124 context = foo()
125 context.l = []
126 context.n = 2 #submits per thread
127 context.nThreads = n // context.n
128 context.nFinished = 0
129 context.lock = threading.Lock()
130 context.event = threading.Event()
131
132 for i in range(context.nThreads):
133 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000134 t.start()
135 threads.append(t)
136
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000137 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000138
139 for t in threads:
140 t.join()
141
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000142 def pendingcalls_thread(self, context):
143 try:
144 self.pendingcalls_submit(context.l, context.n)
145 finally:
146 with context.lock:
147 context.nFinished += 1
148 nFinished = context.nFinished
149 if False and support.verbose:
150 print("finished threads: ", nFinished)
151 if nFinished == context.nThreads:
152 context.event.set()
153
Benjamin Petersona54c9092009-01-13 02:11:23 +0000154 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200155 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000156 #once. It is ok to ask for too many, because we loop until we find a slot.
157 #the loop can be interrupted to dispatch.
158 #there are only 32 dispatch slots, so we go for twice that!
159 l = []
160 n = 64
161 self.pendingcalls_submit(l, n)
162 self.pendingcalls_wait(l, n)
163
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100164 def test_subinterps(self):
165 # XXX this test leaks in refleak runs
166 import builtins
167 r, w = os.pipe()
168 code = """if 1:
169 import sys, builtins, pickle
170 with open({:d}, "wb") as f:
171 pickle.dump(id(sys.modules), f)
172 pickle.dump(id(builtins), f)
173 """.format(w)
174 with open(r, "rb") as f:
175 ret = _testcapi.run_in_subinterp(code)
176 self.assertEqual(ret, 0)
177 self.assertNotEqual(pickle.load(f), id(sys.modules))
178 self.assertNotEqual(pickle.load(f), id(builtins))
179
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000180# Bug #6012
181class Test6012(unittest.TestCase):
182 def test(self):
183 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000184
Antoine Pitrou8e605772011-04-25 21:21:07 +0200185
186class EmbeddingTest(unittest.TestCase):
187
Antoine Pitrou71cbafb2011-06-30 20:02:54 +0200188 @unittest.skipIf(
189 sys.platform.startswith('win'),
190 "test doesn't work under Windows")
Antoine Pitrou8e605772011-04-25 21:21:07 +0200191 def test_subinterps(self):
192 # XXX only tested under Unix checkouts
193 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
194 oldcwd = os.getcwd()
195 # This is needed otherwise we get a fatal error:
196 # "Py_Initialize: Unable to get the locale encoding
197 # LookupError: no codec search functions registered: can't find encoding"
198 os.chdir(basepath)
199 try:
200 exe = os.path.join(basepath, "Modules", "_testembed")
201 if not os.path.exists(exe):
202 self.skipTest("%r doesn't exist" % exe)
203 p = subprocess.Popen([exe],
204 stdout=subprocess.PIPE,
205 stderr=subprocess.PIPE)
206 (out, err) = p.communicate()
207 self.assertEqual(p.returncode, 0,
208 "bad returncode %d, stderr is %r" %
209 (p.returncode, err))
210 if support.verbose:
211 print()
212 print(out.decode('latin1'))
213 print(err.decode('latin1'))
214 finally:
215 os.chdir(oldcwd)
216
Larry Hastings8f904da2012-06-22 03:56:29 -0700217class SkipitemTest(unittest.TestCase):
218
219 def test_skipitem(self):
220 """
221 If this test failed, you probably added a new "format unit"
222 in Python/getargs.c, but neglected to update our poor friend
223 skipitem() in the same file. (If so, shame on you!)
224
225 This function brute-force tests all** ASCII characters (1 to 127
226 inclusive) as format units, checking to see that
227 PyArg_ParseTupleAndKeywords() return consistent errors both when
228 the unit is attempted to be used and when it is skipped. If the
229 format unit doesn't exist, we'll get one of two specific error
230 messages (one for used, one for skipped); if it does exist we
231 *won't* get that error--we'll get either no error or some other
232 error. If we get the "does not exist" error for one test and
233 not for the other, there's a mismatch, and the test fails.
234
235 ** Okay, it actually skips some ASCII characters. Some characters
236 have special funny semantics, and it would be difficult to
237 accomodate them here.
238 """
239 empty_tuple = ()
240 tuple_1 = (0,)
241 dict_b = {'b':1}
242 keywords = ["a", "b"]
243
244 # Python C source files must be ASCII,
245 # therefore we'll never have a format unit > 127
246 for i in range(1, 128):
247 c = chr(i)
248
249 # skip non-printable characters, no one is insane enough to define
250 # one as a format unit
251 # skip parentheses, the error reporting is inconsistent about them
252 # skip 'e', it's always a two-character code
253 # skip '|' and '$', they don't represent arguments anyway
254 if (not c.isprintable()) or (c in '()e|$'):
255 continue
256
257 # test the format unit when not skipped
258 format = c + "i"
259 try:
260 # (note: the format string must be bytes!)
261 _testcapi.parse_tuple_and_keywords(tuple_1, dict_b,
262 format.encode("ascii"), keywords)
263 when_not_skipped = False
264 except TypeError as e:
265 s = "argument 1 must be impossible<bad format char>, not int"
266 when_not_skipped = (str(e) == s)
267 except RuntimeError as e:
268 when_not_skipped = False
269
270 # test the format unit when skipped
271 optional_format = "|" + format
272 try:
273 _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b,
274 optional_format.encode("ascii"), keywords)
275 when_skipped = False
276 except RuntimeError as e:
277 s = "impossible<bad format char>: '{}'".format(format)
278 when_skipped = (str(e) == s)
279
280 message = ("test_skipitem_parity: "
281 "detected mismatch between convertsimple and skipitem "
282 "for format unit '{}' ({}), not skipped {}, skipped {}".format(
283 c, i, when_skipped, when_not_skipped))
284 self.assertIs(when_skipped, when_not_skipped, message)
Antoine Pitrou8e605772011-04-25 21:21:07 +0200285
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000286def test_main():
Larry Hastings8f904da2012-06-22 03:56:29 -0700287 support.run_unittest(CAPITest, TestPendingCalls,
288 Test6012, EmbeddingTest, SkipitemTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000289
290 for name in dir(_testcapi):
291 if name.startswith('test_'):
292 test = getattr(_testcapi, name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000293 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000294 print("internal", name)
Collin Winter3add4d72007-08-29 23:37:32 +0000295 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000296
297 # some extra thread-state tests driven via _testcapi
298 def TestThreadState():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000299 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000300 print("auto-thread-state")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000301
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000302 idents = []
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000303
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000304 def callback():
Victor Stinner2a129742011-05-30 23:02:52 +0200305 idents.append(threading.get_ident())
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000306
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000307 _testcapi._test_thread_state(callback)
308 a = b = callback
309 time.sleep(1)
310 # Check our main thread is in the list exactly 3 times.
Victor Stinner2a129742011-05-30 23:02:52 +0200311 if idents.count(threading.get_ident()) != 3:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000312 raise support.TestFailed(
Collin Winter3add4d72007-08-29 23:37:32 +0000313 "Couldn't find main thread correctly in the list")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000314
Victor Stinner45df8202010-04-28 22:31:17 +0000315 if threading:
Christian Heimes5e696852008-04-09 08:37:03 +0000316 import time
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000317 TestThreadState()
Georg Brandl2067bfd2008-05-25 13:05:15 +0000318 t = threading.Thread(target=TestThreadState)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000319 t.start()
320 t.join()
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000321
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +0000322
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000323if __name__ == "__main__":
324 test_main()