blob: 612e639846d42c183cace7af0be48277d4d8363f [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00004from __future__ import with_statement
Antoine Pitrou8e605772011-04-25 21:21:07 +02005import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01006import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00007import random
8import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00009import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +000010import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000011import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +000012from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000013try:
Stefan Krahfd24f9e2012-08-20 11:04:24 +020014 import _posixsubprocess
15except ImportError:
16 _posixsubprocess = None
17try:
Victor Stinner45df8202010-04-28 22:31:17 +000018 import threading
19except ImportError:
20 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000021import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000022
Benjamin Petersona54c9092009-01-13 02:11:23 +000023
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000024def testfunction(self):
25 """some doc"""
26 return self
27
28class InstanceMethod:
29 id = _testcapi.instancemethod(id)
30 testfunction = _testcapi.instancemethod(testfunction)
31
32class CAPITest(unittest.TestCase):
33
34 def test_instancemethod(self):
35 inst = InstanceMethod()
36 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000037 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000038 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
39 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
40
41 InstanceMethod.testfunction.attribute = "test"
42 self.assertEqual(testfunction.attribute, "test")
43 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
44
Stefan Krah0ca46242010-06-09 08:56:28 +000045 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000046 def test_no_FatalError_infinite_loop(self):
47 p = subprocess.Popen([sys.executable, "-c",
48 'import _testcapi;'
49 '_testcapi.crash_no_current_thread()'],
50 stdout=subprocess.PIPE,
51 stderr=subprocess.PIPE)
52 (out, err) = p.communicate()
53 self.assertEqual(out, b'')
54 # This used to cause an infinite loop.
Victor Stinner4000ffa2010-05-15 01:40:41 +000055 self.assertEqual(err.rstrip(),
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000056 b'Fatal Python error:'
Victor Stinner4000ffa2010-05-15 01:40:41 +000057 b' PyThreadState_Get: no current thread')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000058
Antoine Pitrou915605c2011-02-24 20:53:48 +000059 def test_memoryview_from_NULL_pointer(self):
60 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000061
Stefan Krahfd24f9e2012-08-20 11:04:24 +020062 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
63 def test_seq_bytes_to_charp_array(self):
64 # Issue #15732: crash in _PySequence_BytesToCharpArray()
65 class Z(object):
66 def __len__(self):
67 return 1
68 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
69 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
70
Stefan Krahdb579d72012-08-20 14:36:47 +020071 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
72 def test_subprocess_fork_exec(self):
73 class Z(object):
74 def __len__(self):
75 return 1
76
77 # Issue #15738: crash in subprocess_fork_exec()
78 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
79 Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
80
Victor Stinner45df8202010-04-28 22:31:17 +000081@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +000082class TestPendingCalls(unittest.TestCase):
83
84 def pendingcalls_submit(self, l, n):
85 def callback():
86 #this function can be interrupted by thread switching so let's
87 #use an atomic operation
88 l.append(None)
89
90 for i in range(n):
91 time.sleep(random.random()*0.02) #0.01 secs on average
92 #try submitting callback until successful.
93 #rely on regular interrupt to flush queue if we are
94 #unsuccessful.
95 while True:
96 if _testcapi._pending_threadfunc(callback):
97 break;
98
Benjamin Petersone1cdfd72009-01-18 21:02:37 +000099 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000100 #now, stick around until l[0] has grown to 10
101 count = 0;
102 while len(l) != n:
103 #this busy loop is where we expect to be interrupted to
104 #run our callbacks. Note that callbacks are only run on the
105 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000106 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000107 print("(%i)"%(len(l),),)
108 for i in range(1000):
109 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000110 if context and not context.event.is_set():
111 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000112 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000113 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000114 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000115 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000116 print("(%i)"%(len(l),))
117
118 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000119
120 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000121 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000122 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000123 class foo(object):pass
124 context = foo()
125 context.l = []
126 context.n = 2 #submits per thread
127 context.nThreads = n // context.n
128 context.nFinished = 0
129 context.lock = threading.Lock()
130 context.event = threading.Event()
131
132 for i in range(context.nThreads):
133 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000134 t.start()
135 threads.append(t)
136
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000137 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000138
139 for t in threads:
140 t.join()
141
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000142 def pendingcalls_thread(self, context):
143 try:
144 self.pendingcalls_submit(context.l, context.n)
145 finally:
146 with context.lock:
147 context.nFinished += 1
148 nFinished = context.nFinished
149 if False and support.verbose:
150 print("finished threads: ", nFinished)
151 if nFinished == context.nThreads:
152 context.event.set()
153
Benjamin Petersona54c9092009-01-13 02:11:23 +0000154 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200155 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000156 #once. It is ok to ask for too many, because we loop until we find a slot.
157 #the loop can be interrupted to dispatch.
158 #there are only 32 dispatch slots, so we go for twice that!
159 l = []
160 n = 64
161 self.pendingcalls_submit(l, n)
162 self.pendingcalls_wait(l, n)
163
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100164 def test_subinterps(self):
165 # XXX this test leaks in refleak runs
166 import builtins
167 r, w = os.pipe()
168 code = """if 1:
169 import sys, builtins, pickle
170 with open({:d}, "wb") as f:
171 pickle.dump(id(sys.modules), f)
172 pickle.dump(id(builtins), f)
173 """.format(w)
174 with open(r, "rb") as f:
175 ret = _testcapi.run_in_subinterp(code)
176 self.assertEqual(ret, 0)
177 self.assertNotEqual(pickle.load(f), id(sys.modules))
178 self.assertNotEqual(pickle.load(f), id(builtins))
179
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000180# Bug #6012
181class Test6012(unittest.TestCase):
182 def test(self):
183 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000184
Antoine Pitrou8e605772011-04-25 21:21:07 +0200185
186class EmbeddingTest(unittest.TestCase):
187
Antoine Pitrou71cbafb2011-06-30 20:02:54 +0200188 @unittest.skipIf(
189 sys.platform.startswith('win'),
190 "test doesn't work under Windows")
Antoine Pitrou8e605772011-04-25 21:21:07 +0200191 def test_subinterps(self):
192 # XXX only tested under Unix checkouts
193 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
194 oldcwd = os.getcwd()
195 # This is needed otherwise we get a fatal error:
196 # "Py_Initialize: Unable to get the locale encoding
197 # LookupError: no codec search functions registered: can't find encoding"
198 os.chdir(basepath)
199 try:
200 exe = os.path.join(basepath, "Modules", "_testembed")
201 if not os.path.exists(exe):
202 self.skipTest("%r doesn't exist" % exe)
203 p = subprocess.Popen([exe],
204 stdout=subprocess.PIPE,
205 stderr=subprocess.PIPE)
206 (out, err) = p.communicate()
207 self.assertEqual(p.returncode, 0,
208 "bad returncode %d, stderr is %r" %
209 (p.returncode, err))
210 if support.verbose:
211 print()
212 print(out.decode('latin1'))
213 print(err.decode('latin1'))
214 finally:
215 os.chdir(oldcwd)
216
217
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000218def test_main():
Antoine Pitrou8e605772011-04-25 21:21:07 +0200219 support.run_unittest(CAPITest, TestPendingCalls, Test6012, EmbeddingTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000220
221 for name in dir(_testcapi):
222 if name.startswith('test_'):
223 test = getattr(_testcapi, name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000225 print("internal", name)
Collin Winter3add4d72007-08-29 23:37:32 +0000226 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000227
228 # some extra thread-state tests driven via _testcapi
229 def TestThreadState():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000230 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000231 print("auto-thread-state")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000232
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000233 idents = []
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000234
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000235 def callback():
Georg Brandl2067bfd2008-05-25 13:05:15 +0000236 idents.append(_thread.get_ident())
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000237
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000238 _testcapi._test_thread_state(callback)
239 a = b = callback
240 time.sleep(1)
241 # Check our main thread is in the list exactly 3 times.
Georg Brandl2067bfd2008-05-25 13:05:15 +0000242 if idents.count(_thread.get_ident()) != 3:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000243 raise support.TestFailed(
Collin Winter3add4d72007-08-29 23:37:32 +0000244 "Couldn't find main thread correctly in the list")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000245
Victor Stinner45df8202010-04-28 22:31:17 +0000246 if threading:
Georg Brandl2067bfd2008-05-25 13:05:15 +0000247 import _thread
Christian Heimes5e696852008-04-09 08:37:03 +0000248 import time
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000249 TestThreadState()
Georg Brandl2067bfd2008-05-25 13:05:15 +0000250 t = threading.Thread(target=TestThreadState)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000251 t.start()
252 t.join()
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000253
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +0000254
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000255if __name__ == "__main__":
256 test_main()