blob: 4d931f8d3c6e416bc606b6509e20c312235a08c5 [file] [log] [blame]
Tim Petersd66595f2001-02-04 03:09:53 +00001# Run the _testcapi module tests (tests for the Python/C API): by defn,
Guido van Rossum361c5352001-04-13 17:03:04 +00002# these are all functions _testcapi exports whose name begins with 'test_'.
Tim Peters9ea17ac2001-02-02 05:57:15 +00003
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00004from __future__ import with_statement
Antoine Pitrou8e605772011-04-25 21:21:07 +02005import os
Antoine Pitrou2f828f22012-01-18 00:21:11 +01006import pickle
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +00007import random
8import subprocess
Martin v. Löwis6ce7ed22005-03-03 12:26:35 +00009import sys
Benjamin Petersona54c9092009-01-13 02:11:23 +000010import time
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000011import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +000012from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000013try:
Stefan Krahfd24f9e2012-08-20 11:04:24 +020014 import _posixsubprocess
15except ImportError:
16 _posixsubprocess = None
17try:
Victor Stinner45df8202010-04-28 22:31:17 +000018 import threading
19except ImportError:
20 threading = None
Tim Petersd66595f2001-02-04 03:09:53 +000021import _testcapi
Tim Peters9ea17ac2001-02-02 05:57:15 +000022
Benjamin Petersona54c9092009-01-13 02:11:23 +000023
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000024def testfunction(self):
25 """some doc"""
26 return self
27
28class InstanceMethod:
29 id = _testcapi.instancemethod(id)
30 testfunction = _testcapi.instancemethod(testfunction)
31
32class CAPITest(unittest.TestCase):
33
34 def test_instancemethod(self):
35 inst = InstanceMethod()
36 self.assertEqual(id(inst), inst.id())
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000037 self.assertTrue(inst.testfunction() is inst)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000038 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
39 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
40
41 InstanceMethod.testfunction.attribute = "test"
42 self.assertEqual(testfunction.attribute, "test")
43 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
44
Stefan Krah0ca46242010-06-09 08:56:28 +000045 @unittest.skipUnless(threading, 'Threading required for this test.')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000046 def test_no_FatalError_infinite_loop(self):
47 p = subprocess.Popen([sys.executable, "-c",
48 'import _testcapi;'
49 '_testcapi.crash_no_current_thread()'],
50 stdout=subprocess.PIPE,
51 stderr=subprocess.PIPE)
52 (out, err) = p.communicate()
53 self.assertEqual(out, b'')
54 # This used to cause an infinite loop.
Victor Stinner4000ffa2010-05-15 01:40:41 +000055 self.assertEqual(err.rstrip(),
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000056 b'Fatal Python error:'
Victor Stinner4000ffa2010-05-15 01:40:41 +000057 b' PyThreadState_Get: no current thread')
Jeffrey Yasskin8e0bdfd2010-05-13 18:31:05 +000058
Antoine Pitrou915605c2011-02-24 20:53:48 +000059 def test_memoryview_from_NULL_pointer(self):
60 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +000061
Stefan Krahfd24f9e2012-08-20 11:04:24 +020062 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
63 def test_seq_bytes_to_charp_array(self):
64 # Issue #15732: crash in _PySequence_BytesToCharpArray()
65 class Z(object):
66 def __len__(self):
67 return 1
68 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
69 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krah7cacd2e2012-08-21 08:16:09 +020070 # Issue #15736: overflow in _PySequence_BytesToCharpArray()
71 class Z(object):
72 def __len__(self):
73 return sys.maxsize
74 def __getitem__(self, i):
75 return b'x'
76 self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
77 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
Stefan Krahfd24f9e2012-08-20 11:04:24 +020078
Stefan Krahdb579d72012-08-20 14:36:47 +020079 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
80 def test_subprocess_fork_exec(self):
81 class Z(object):
82 def __len__(self):
83 return 1
84
85 # Issue #15738: crash in subprocess_fork_exec()
86 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
87 Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
88
Victor Stinner45df8202010-04-28 22:31:17 +000089@unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Petersona54c9092009-01-13 02:11:23 +000090class TestPendingCalls(unittest.TestCase):
91
92 def pendingcalls_submit(self, l, n):
93 def callback():
94 #this function can be interrupted by thread switching so let's
95 #use an atomic operation
96 l.append(None)
97
98 for i in range(n):
99 time.sleep(random.random()*0.02) #0.01 secs on average
100 #try submitting callback until successful.
101 #rely on regular interrupt to flush queue if we are
102 #unsuccessful.
103 while True:
104 if _testcapi._pending_threadfunc(callback):
105 break;
106
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000107 def pendingcalls_wait(self, l, n, context = None):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000108 #now, stick around until l[0] has grown to 10
109 count = 0;
110 while len(l) != n:
111 #this busy loop is where we expect to be interrupted to
112 #run our callbacks. Note that callbacks are only run on the
113 #main thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000114 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000115 print("(%i)"%(len(l),),)
116 for i in range(1000):
117 a = i*i
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000118 if context and not context.event.is_set():
119 continue
Benjamin Petersona54c9092009-01-13 02:11:23 +0000120 count += 1
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000121 self.assertTrue(count < 10000,
Benjamin Petersona54c9092009-01-13 02:11:23 +0000122 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000123 if False and support.verbose:
Benjamin Petersona54c9092009-01-13 02:11:23 +0000124 print("(%i)"%(len(l),))
125
126 def test_pendingcalls_threaded(self):
Benjamin Petersona54c9092009-01-13 02:11:23 +0000127
128 #do every callback on a separate thread
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000129 n = 32 #total callbacks
Benjamin Petersona54c9092009-01-13 02:11:23 +0000130 threads = []
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000131 class foo(object):pass
132 context = foo()
133 context.l = []
134 context.n = 2 #submits per thread
135 context.nThreads = n // context.n
136 context.nFinished = 0
137 context.lock = threading.Lock()
138 context.event = threading.Event()
139
140 for i in range(context.nThreads):
141 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
Benjamin Petersona54c9092009-01-13 02:11:23 +0000142 t.start()
143 threads.append(t)
144
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000145 self.pendingcalls_wait(context.l, n, context)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000146
147 for t in threads:
148 t.join()
149
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000150 def pendingcalls_thread(self, context):
151 try:
152 self.pendingcalls_submit(context.l, context.n)
153 finally:
154 with context.lock:
155 context.nFinished += 1
156 nFinished = context.nFinished
157 if False and support.verbose:
158 print("finished threads: ", nFinished)
159 if nFinished == context.nThreads:
160 context.event.set()
161
Benjamin Petersona54c9092009-01-13 02:11:23 +0000162 def test_pendingcalls_non_threaded(self):
Ezio Melotti13925002011-03-16 11:05:33 +0200163 #again, just using the main thread, likely they will all be dispatched at
Benjamin Petersona54c9092009-01-13 02:11:23 +0000164 #once. It is ok to ask for too many, because we loop until we find a slot.
165 #the loop can be interrupted to dispatch.
166 #there are only 32 dispatch slots, so we go for twice that!
167 l = []
168 n = 64
169 self.pendingcalls_submit(l, n)
170 self.pendingcalls_wait(l, n)
171
Antoine Pitrou2f828f22012-01-18 00:21:11 +0100172 def test_subinterps(self):
173 # XXX this test leaks in refleak runs
174 import builtins
175 r, w = os.pipe()
176 code = """if 1:
177 import sys, builtins, pickle
178 with open({:d}, "wb") as f:
179 pickle.dump(id(sys.modules), f)
180 pickle.dump(id(builtins), f)
181 """.format(w)
182 with open(r, "rb") as f:
183 ret = _testcapi.run_in_subinterp(code)
184 self.assertEqual(ret, 0)
185 self.assertNotEqual(pickle.load(f), id(sys.modules))
186 self.assertNotEqual(pickle.load(f), id(builtins))
187
Martin v. Löwisc15bdef2009-05-29 14:47:46 +0000188# Bug #6012
189class Test6012(unittest.TestCase):
190 def test(self):
191 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
Benjamin Petersona54c9092009-01-13 02:11:23 +0000192
Antoine Pitrou8e605772011-04-25 21:21:07 +0200193
194class EmbeddingTest(unittest.TestCase):
195
Antoine Pitrou71cbafb2011-06-30 20:02:54 +0200196 @unittest.skipIf(
197 sys.platform.startswith('win'),
198 "test doesn't work under Windows")
Antoine Pitrou8e605772011-04-25 21:21:07 +0200199 def test_subinterps(self):
200 # XXX only tested under Unix checkouts
201 basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
202 oldcwd = os.getcwd()
203 # This is needed otherwise we get a fatal error:
204 # "Py_Initialize: Unable to get the locale encoding
205 # LookupError: no codec search functions registered: can't find encoding"
206 os.chdir(basepath)
207 try:
208 exe = os.path.join(basepath, "Modules", "_testembed")
209 if not os.path.exists(exe):
210 self.skipTest("%r doesn't exist" % exe)
211 p = subprocess.Popen([exe],
212 stdout=subprocess.PIPE,
213 stderr=subprocess.PIPE)
214 (out, err) = p.communicate()
215 self.assertEqual(p.returncode, 0,
216 "bad returncode %d, stderr is %r" %
217 (p.returncode, err))
218 if support.verbose:
219 print()
220 print(out.decode('latin1'))
221 print(err.decode('latin1'))
222 finally:
223 os.chdir(oldcwd)
224
225
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000226def test_main():
Antoine Pitrou8e605772011-04-25 21:21:07 +0200227 support.run_unittest(CAPITest, TestPendingCalls, Test6012, EmbeddingTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000228
229 for name in dir(_testcapi):
230 if name.startswith('test_'):
231 test = getattr(_testcapi, name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000232 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000233 print("internal", name)
Collin Winter3add4d72007-08-29 23:37:32 +0000234 test()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000235
236 # some extra thread-state tests driven via _testcapi
237 def TestThreadState():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000238 if support.verbose:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000239 print("auto-thread-state")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000240
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000241 idents = []
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000242
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000243 def callback():
Georg Brandl2067bfd2008-05-25 13:05:15 +0000244 idents.append(_thread.get_ident())
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000245
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000246 _testcapi._test_thread_state(callback)
247 a = b = callback
248 time.sleep(1)
249 # Check our main thread is in the list exactly 3 times.
Georg Brandl2067bfd2008-05-25 13:05:15 +0000250 if idents.count(_thread.get_ident()) != 3:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000251 raise support.TestFailed(
Collin Winter3add4d72007-08-29 23:37:32 +0000252 "Couldn't find main thread correctly in the list")
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000253
Victor Stinner45df8202010-04-28 22:31:17 +0000254 if threading:
Georg Brandl2067bfd2008-05-25 13:05:15 +0000255 import _thread
Christian Heimes5e696852008-04-09 08:37:03 +0000256 import time
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000257 TestThreadState()
Georg Brandl2067bfd2008-05-25 13:05:15 +0000258 t = threading.Thread(target=TestThreadState)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000259 t.start()
260 t.join()
Mark Hammond8d98d2c2003-04-19 15:41:53 +0000261
Benjamin Peterson9b6df6a2008-10-16 23:56:29 +0000262
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000263if __name__ == "__main__":
264 test_main()