blob: 07ec623210e8f18c4c2a81aa0a9047bdbef2d6de [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00004from __future__ import with_statement
Antoine Pitrou8e605772011-04-25 21:21:07 +02005import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01006import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00007import random
8import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00009import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +000010import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000011import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +000012from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000013try:
14 import threading
15except ImportError:
16 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000017import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000018
Benjamin Petersona54c9092009-01-13 02:11:23 +000019
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000020def testfunction(self):
21 """some doc"""
22 return self
23
24class InstanceMethod:
25 id = _testcapi.instancemethod(id)
26 testfunction = _testcapi.instancemethod(testfunction)
27
28class CAPITest(unittest.TestCase):
29
30 def test_instancemethod(self):
31 inst = InstanceMethod()
32 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000033 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000034 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
35 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
36
37 InstanceMethod.testfunction.attribute = "test"
38 self.assertEqual(testfunction.attribute, "test")
39 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
40
Stefan Krah0ca46242010-06-09 08:56:28 +000041 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000042 def test_no_FatalError_infinite_loop(self):
43 p = subprocess.Popen([sys.executable, "-c",
44 'import _testcapi;'
45 '_testcapi.crash_no_current_thread()'],
46 stdout=subprocess.PIPE,
47 stderr=subprocess.PIPE)
48 (out, err) = p.communicate()
49 self.assertEqual(out, b'')
50 # This used to cause an infinite loop.
Victor Stinner4000ffa2010-05-15 01:40:41 +000051 self.assertEqual(err.rstrip(),
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000052 b'Fatal Python error:'
Victor Stinner4000ffa2010-05-15 01:40:41 +000053 b' PyThreadState_Get: no current thread')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000054
Antoine Pitrou915605c2011-02-24 20:53:48 +000055 def test_memoryview_from_NULL_pointer(self):
56 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000057
Martin v. Löwisaa2efcb2012-04-19 14:33:43 +020058 def test_exc_info(self):
59 raised_exception = ValueError("5")
60 new_exc = TypeError("TEST")
61 try:
62 raise raised_exception
63 except ValueError as e:
64 tb = e.__traceback__
65 orig_sys_exc_info = sys.exc_info()
66 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
67 new_sys_exc_info = sys.exc_info()
68 new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
69 reset_sys_exc_info = sys.exc_info()
70
71 self.assertEqual(orig_exc_info[1], e)
72
73 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
74 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
75 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
76 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
77 self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
78 else:
79 self.assertTrue(False)
80
Victor Stinner45df8202010-04-28 22:31:17 +000081@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +000082class TestPendingCalls(unittest.TestCase):
83
84 def pendingcalls_submit(self, l, n):
85 def callback():
86 #this function can be interrupted by thread switching so let's
87 #use an atomic operation
88 l.append(None)
89
90 for i in range(n):
91 time.sleep(random.random()*0.02) #0.01 secs on average
92 #try submitting callback until successful.
93 #rely on regular interrupt to flush queue if we are
94 #unsuccessful.
95 while True:
96 if _testcapi._pending_threadfunc(callback):
97 break;
98
Benjamin Petersone1cdfd72009-01-18 21:02:37 +000099 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000100 #now, stick around until l[0] has grown to 10
101 count = 0;
102 while len(l) != n:
103 #this busy loop is where we expect to be interrupted to
104 #run our callbacks. Note that callbacks are only run on the
105 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000106 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000107 print("(%i)"%(len(l),),)
108 for i in range(1000):
109 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000110 if context and not context.event.is_set():
111 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000112 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000113 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000114 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000115 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000116 print("(%i)"%(len(l),))
117
118 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000119
120 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000121 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000122 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000123 class foo(object):pass
124 context = foo()
125 context.l = []
126 context.n = 2 #submits per thread
127 context.nThreads = n // context.n
128 context.nFinished = 0
129 context.lock = threading.Lock()
130 context.event = threading.Event()
131
132 for i in range(context.nThreads):
133 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000134 t.start()
135 threads.append(t)
136
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000137 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000138
139 for t in threads:
140 t.join()
141
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000142 def pendingcalls_thread(self, context):
143 try:
144 self.pendingcalls_submit(context.l, context.n)
145 finally:
146 with context.lock:
147 context.nFinished += 1
148 nFinished = context.nFinished
149 if False and support.verbose:
150 print("finished threads: ", nFinished)
151 if nFinished == context.nThreads:
152 context.event.set()
153
Benjamin Petersona54c9092009-01-13 02:11:23 +0000154 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200155 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000156 #once. It is ok to ask for too many, because we loop until we find a slot.
157 #the loop can be interrupted to dispatch.
158 #there are only 32 dispatch slots, so we go for twice that!
159 l = []
160 n = 64
161 self.pendingcalls_submit(l, n)
162 self.pendingcalls_wait(l, n)
163
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100164 def test_subinterps(self):
165 # XXX this test leaks in refleak runs
166 import builtins
167 r, w = os.pipe()
168 code = """if 1:
169 import sys, builtins, pickle
170 with open({:d}, "wb") as f:
171 pickle.dump(id(sys.modules), f)
172 pickle.dump(id(builtins), f)
173 """.format(w)
174 with open(r, "rb") as f:
175 ret = _testcapi.run_in_subinterp(code)
176 self.assertEqual(ret, 0)
177 self.assertNotEqual(pickle.load(f), id(sys.modules))
178 self.assertNotEqual(pickle.load(f), id(builtins))
179
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000180# Bug #6012
181class Test6012(unittest.TestCase):
182 def test(self):
183 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000184
Antoine Pitrou8e605772011-04-25 21:21:07 +0200185
186class EmbeddingTest(unittest.TestCase):
187
Antoine Pitrou71cbafb2011-06-30 20:02:54 +0200188 @unittest.skipIf(
189 sys.platform.startswith('win'),
190 "test doesn't work under Windows")
Antoine Pitrou8e605772011-04-25 21:21:07 +0200191 def test_subinterps(self):
192 # XXX only tested under Unix checkouts
193 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
194 oldcwd = os.getcwd()
195 # This is needed otherwise we get a fatal error:
196 # "Py_Initialize: Unable to get the locale encoding
197 # LookupError: no codec search functions registered: can't find encoding"
198 os.chdir(basepath)
199 try:
200 exe = os.path.join(basepath, "Modules", "_testembed")
201 if not os.path.exists(exe):
202 self.skipTest("%r doesn't exist" % exe)
203 p = subprocess.Popen([exe],
204 stdout=subprocess.PIPE,
205 stderr=subprocess.PIPE)
206 (out, err) = p.communicate()
207 self.assertEqual(p.returncode, 0,
208 "bad returncode %d, stderr is %r" %
209 (p.returncode, err))
210 if support.verbose:
211 print()
212 print(out.decode('latin1'))
213 print(err.decode('latin1'))
214 finally:
215 os.chdir(oldcwd)
216
217
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000218def test_main():
Antoine Pitrou8e605772011-04-25 21:21:07 +0200219 support.run_unittest(CAPITest, TestPendingCalls, Test6012, EmbeddingTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000220
221 for name in dir(_testcapi):
222 if name.startswith('test_'):
223 test = getattr(_testcapi, name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000225 print("internal", name)
Collin Winter3add4d72007-08-29 23:37:32 +0000226 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000227
228 # some extra thread-state tests driven via _testcapi
229 def TestThreadState():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000230 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000231 print("auto-thread-state")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000232
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000233 idents = []
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000234
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000235 def callback():
Victor Stinner2a129742011-05-30 23:02:52 +0200236 idents.append(threading.get_ident())
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000237
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000238 _testcapi._test_thread_state(callback)
239 a = b = callback
240 time.sleep(1)
241 # Check our main thread is in the list exactly 3 times.
Victor Stinner2a129742011-05-30 23:02:52 +0200242 if idents.count(threading.get_ident()) != 3:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000243 raise support.TestFailed(
Collin Winter3add4d72007-08-29 23:37:32 +0000244 "Couldn't find main thread correctly in the list")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000245
Victor Stinner45df8202010-04-28 22:31:17 +0000246 if threading:
Christian Heimes5e696852008-04-09 08:37:03 +0000247 import time
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000248 TestThreadState()
Georg Brandl2067bfd2008-05-25 13:05:15 +0000249 t = threading.Thread(target=TestThreadState)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000250 t.start()
251 t.join()
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000252
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +0000253
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000254if __name__ == "__main__":
255 test_main()