blob: 2d4395e7cd081c642ca189151acd8668fe284302 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00006import queue as pyqueue
7import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00008import io
Antoine Pitroude911b22011-12-21 11:03:24 +01009import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000010import sys
11import os
12import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020013import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000014import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010019import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010020import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010022import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000023
Benjamin Petersone5384b02008-10-04 22:00:42 +000024
R. David Murraya21e4ca2009-03-31 23:16:50 +000025# Skip tests if _multiprocessing wasn't built.
26_multiprocessing = test.support.import_module('_multiprocessing')
27# Skip tests if sem_open implementation is broken.
28test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000029# import threading after _multiprocessing to raise a more revelant error
30# message: "No module named _multiprocessing". _multiprocessing is not compiled
31# without thread support.
32import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000033
Benjamin Petersone711caf2008-06-11 16:44:04 +000034import multiprocessing.dummy
35import multiprocessing.connection
36import multiprocessing.managers
37import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
Charles-François Natalibc8f0822011-09-20 20:36:51 +020040from multiprocessing import util
41
42try:
43 from multiprocessing import reduction
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010044 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
Charles-François Natalibc8f0822011-09-20 20:36:51 +020045except ImportError:
46 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
Brian Curtinafa88b52010-10-07 01:12:19 +000048try:
49 from multiprocessing.sharedctypes import Value, copy
50 HAS_SHAREDCTYPES = True
51except ImportError:
52 HAS_SHAREDCTYPES = False
53
Antoine Pitroubcb39d42011-08-23 19:46:22 +020054try:
55 import msvcrt
56except ImportError:
57 msvcrt = None
58
Benjamin Petersone711caf2008-06-11 16:44:04 +000059#
60#
61#
62
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000063def latin(s):
64 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000065
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
67# Constants
68#
69
70LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000071#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000072
73DELTA = 0.1
74CHECK_TIMINGS = False # making true makes tests take a lot longer
75 # and can sometimes cause some non-serious
76 # failures because some calls block a bit
77 # longer than expected
78if CHECK_TIMINGS:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
80else:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
82
83HAVE_GETVALUE = not getattr(_multiprocessing,
84 'HAVE_BROKEN_SEM_GETVALUE', False)
85
Jesse Noller6214edd2009-01-19 16:23:53 +000086WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020087
Richard Oudkerk59d54042012-05-10 16:11:12 +010088from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090def wait_for_handle(handle, timeout):
91 if timeout is not None and timeout < 0.0:
92 timeout = None
93 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000094
Antoine Pitroubcb39d42011-08-23 19:46:22 +020095try:
96 MAXFD = os.sysconf("SC_OPEN_MAX")
97except:
98 MAXFD = 256
99
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100100# To speed up tests when using the forkserver, we can preload these:
101PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
102
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000104# Some tests require ctypes
105#
106
107try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000108 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000109except ImportError:
110 Structure = object
111 c_int = c_double = None
112
Charles-François Natali221ef672011-11-22 18:55:22 +0100113
114def check_enough_semaphores():
115 """Check that the system supports enough semaphores to run the test."""
116 # minimum number of semaphores available according to POSIX
117 nsems_min = 256
118 try:
119 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
120 except (AttributeError, ValueError):
121 # sysconf not available or setting not available
122 return
123 if nsems == -1 or nsems >= nsems_min:
124 return
125 raise unittest.SkipTest("The OS doesn't support enough semaphores "
126 "to run the test (required: %d)." % nsems_min)
127
128
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000129#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000130# Creates a wrapper for a function which records the time it takes to finish
131#
132
133class TimingWrapper(object):
134
135 def __init__(self, func):
136 self.func = func
137 self.elapsed = None
138
139 def __call__(self, *args, **kwds):
140 t = time.time()
141 try:
142 return self.func(*args, **kwds)
143 finally:
144 self.elapsed = time.time() - t
145
146#
147# Base class for test cases
148#
149
150class BaseTestCase(object):
151
152 ALLOWED_TYPES = ('processes', 'manager', 'threads')
153
154 def assertTimingAlmostEqual(self, a, b):
155 if CHECK_TIMINGS:
156 self.assertAlmostEqual(a, b, 1)
157
158 def assertReturnsIfImplemented(self, value, func, *args):
159 try:
160 res = func(*args)
161 except NotImplementedError:
162 pass
163 else:
164 return self.assertEqual(value, res)
165
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000166 # For the sanity of Windows users, rather than crashing or freezing in
167 # multiple ways.
168 def __reduce__(self, *args):
169 raise NotImplementedError("shouldn't try to pickle a test case")
170
171 __reduce_ex__ = __reduce__
172
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173#
174# Return the value of a semaphore
175#
176
177def get_value(self):
178 try:
179 return self.get_value()
180 except AttributeError:
181 try:
182 return self._Semaphore__value
183 except AttributeError:
184 try:
185 return self._value
186 except AttributeError:
187 raise NotImplementedError
188
189#
190# Testcases
191#
192
193class _TestProcess(BaseTestCase):
194
195 ALLOWED_TYPES = ('processes', 'threads')
196
197 def test_current(self):
198 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600199 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000200
201 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203
204 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000205 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000206 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000208 self.assertEqual(current.ident, os.getpid())
209 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000210
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000211 def test_daemon_argument(self):
212 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600213 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000214
215 # By default uses the current process's daemon flag.
216 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000217 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000218 proc1 = self.Process(target=self._test, daemon=True)
219 self.assertTrue(proc1.daemon)
220 proc2 = self.Process(target=self._test, daemon=False)
221 self.assertFalse(proc2.daemon)
222
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000223 @classmethod
224 def _test(cls, q, *args, **kwds):
225 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 q.put(args)
227 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000228 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000229 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231 q.put(current.pid)
232
233 def test_process(self):
234 q = self.Queue(1)
235 e = self.Event()
236 args = (q, 1, 2)
237 kwargs = {'hello':23, 'bye':2.54}
238 name = 'SomeProcess'
239 p = self.Process(
240 target=self._test, args=args, kwargs=kwargs, name=name
241 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000242 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000243 current = self.current_process()
244
245 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000246 self.assertEqual(p.authkey, current.authkey)
247 self.assertEqual(p.is_alive(), False)
248 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000249 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000251 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000252
253 p.start()
254
Ezio Melottib3aedd42010-11-20 19:04:17 +0000255 self.assertEqual(p.exitcode, None)
256 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000257 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
Ezio Melottib3aedd42010-11-20 19:04:17 +0000259 self.assertEqual(q.get(), args[1:])
260 self.assertEqual(q.get(), kwargs)
261 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000263 self.assertEqual(q.get(), current.authkey)
264 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265
266 p.join()
267
Ezio Melottib3aedd42010-11-20 19:04:17 +0000268 self.assertEqual(p.exitcode, 0)
269 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000270 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000272 @classmethod
273 def _test_terminate(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100274 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275
276 def test_terminate(self):
277 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600278 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279
280 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000281 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282 p.start()
283
284 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000285 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000286 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000287
Richard Oudkerk59d54042012-05-10 16:11:12 +0100288 join = TimingWrapper(p.join)
289
290 self.assertEqual(join(0), None)
291 self.assertTimingAlmostEqual(join.elapsed, 0.0)
292 self.assertEqual(p.is_alive(), True)
293
294 self.assertEqual(join(-1), None)
295 self.assertTimingAlmostEqual(join.elapsed, 0.0)
296 self.assertEqual(p.is_alive(), True)
297
Richard Oudkerk26f92682013-10-17 13:56:18 +0100298 # XXX maybe terminating too soon causes the problems on Gentoo...
299 time.sleep(1)
300
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 p.terminate()
302
Richard Oudkerk4f350792013-10-13 00:49:27 +0100303 if hasattr(signal, 'alarm'):
Richard Oudkerkd44500a2013-10-17 10:38:37 +0100304 # On the Gentoo buildbot waitpid() often seems to block forever.
Richard Oudkerk26f92682013-10-17 13:56:18 +0100305 # We use alarm() to interrupt it if it blocks for too long.
Richard Oudkerk4f350792013-10-13 00:49:27 +0100306 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100307 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100308 old_handler = signal.signal(signal.SIGALRM, handler)
309 try:
310 signal.alarm(10)
311 self.assertEqual(join(), None)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100312 finally:
Richard Oudkerk1e2f67c2013-10-17 14:24:06 +0100313 signal.alarm(0)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100314 signal.signal(signal.SIGALRM, old_handler)
315 else:
316 self.assertEqual(join(), None)
317
Benjamin Petersone711caf2008-06-11 16:44:04 +0000318 self.assertTimingAlmostEqual(join.elapsed, 0.0)
319
320 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000321 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322
323 p.join()
324
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000325 # XXX sometimes get p.exitcode == 0 on Windows ...
326 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 def test_cpu_count(self):
329 try:
330 cpus = multiprocessing.cpu_count()
331 except NotImplementedError:
332 cpus = 1
333 self.assertTrue(type(cpus) is int)
334 self.assertTrue(cpus >= 1)
335
336 def test_active_children(self):
337 self.assertEqual(type(self.active_children()), list)
338
339 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000340 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000341
Jesus Cea94f964f2011-09-09 20:26:57 +0200342 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000344 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345
346 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000347 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000348
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000349 @classmethod
350 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000351 wconn.send(id)
352 if len(id) < 2:
353 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000354 p = cls.Process(
355 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000356 )
357 p.start()
358 p.join()
359
360 def test_recursion(self):
361 rconn, wconn = self.Pipe(duplex=False)
362 self._test_recursion(wconn, [])
363
364 time.sleep(DELTA)
365 result = []
366 while rconn.poll():
367 result.append(rconn.recv())
368
369 expected = [
370 [],
371 [0],
372 [0, 0],
373 [0, 1],
374 [1],
375 [1, 0],
376 [1, 1]
377 ]
378 self.assertEqual(result, expected)
379
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200380 @classmethod
381 def _test_sentinel(cls, event):
382 event.wait(10.0)
383
384 def test_sentinel(self):
385 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600386 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200387 event = self.Event()
388 p = self.Process(target=self._test_sentinel, args=(event,))
389 with self.assertRaises(ValueError):
390 p.sentinel
391 p.start()
392 self.addCleanup(p.join)
393 sentinel = p.sentinel
394 self.assertIsInstance(sentinel, int)
395 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
396 event.set()
397 p.join()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100398 self.assertTrue(wait_for_handle(sentinel, timeout=1))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200399
Benjamin Petersone711caf2008-06-11 16:44:04 +0000400#
401#
402#
403
404class _UpperCaser(multiprocessing.Process):
405
406 def __init__(self):
407 multiprocessing.Process.__init__(self)
408 self.child_conn, self.parent_conn = multiprocessing.Pipe()
409
410 def run(self):
411 self.parent_conn.close()
412 for s in iter(self.child_conn.recv, None):
413 self.child_conn.send(s.upper())
414 self.child_conn.close()
415
416 def submit(self, s):
417 assert type(s) is str
418 self.parent_conn.send(s)
419 return self.parent_conn.recv()
420
421 def stop(self):
422 self.parent_conn.send(None)
423 self.parent_conn.close()
424 self.child_conn.close()
425
426class _TestSubclassingProcess(BaseTestCase):
427
428 ALLOWED_TYPES = ('processes',)
429
430 def test_subclassing(self):
431 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200432 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000433 uppercaser.start()
434 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
435 self.assertEqual(uppercaser.submit('world'), 'WORLD')
436 uppercaser.stop()
437 uppercaser.join()
438
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100439 def test_stderr_flush(self):
440 # sys.stderr is flushed at process shutdown (issue #13812)
441 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600442 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100443
444 testfn = test.support.TESTFN
445 self.addCleanup(test.support.unlink, testfn)
446 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
447 proc.start()
448 proc.join()
449 with open(testfn, 'r') as f:
450 err = f.read()
451 # The whole traceback was printed
452 self.assertIn("ZeroDivisionError", err)
453 self.assertIn("test_multiprocessing.py", err)
454 self.assertIn("1/0 # MARKER", err)
455
456 @classmethod
457 def _test_stderr_flush(cls, testfn):
458 sys.stderr = open(testfn, 'w')
459 1/0 # MARKER
460
461
Richard Oudkerk29471de2012-06-06 19:04:57 +0100462 @classmethod
463 def _test_sys_exit(cls, reason, testfn):
464 sys.stderr = open(testfn, 'w')
465 sys.exit(reason)
466
467 def test_sys_exit(self):
468 # See Issue 13854
469 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600470 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk29471de2012-06-06 19:04:57 +0100471
472 testfn = test.support.TESTFN
473 self.addCleanup(test.support.unlink, testfn)
474
Richard Oudkerk8731d7b2013-11-17 17:24:11 +0000475 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100476 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
477 p.daemon = True
478 p.start()
479 p.join(5)
480 self.assertEqual(p.exitcode, code)
481
482 with open(testfn, 'r') as f:
483 self.assertEqual(f.read().rstrip(), str(reason))
484
485 for reason in (True, False, 8):
486 p = self.Process(target=sys.exit, args=(reason,))
487 p.daemon = True
488 p.start()
489 p.join(5)
490 self.assertEqual(p.exitcode, reason)
491
Benjamin Petersone711caf2008-06-11 16:44:04 +0000492#
493#
494#
495
496def queue_empty(q):
497 if hasattr(q, 'empty'):
498 return q.empty()
499 else:
500 return q.qsize() == 0
501
502def queue_full(q, maxsize):
503 if hasattr(q, 'full'):
504 return q.full()
505 else:
506 return q.qsize() == maxsize
507
508
509class _TestQueue(BaseTestCase):
510
511
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000512 @classmethod
513 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000514 child_can_start.wait()
515 for i in range(6):
516 queue.get()
517 parent_can_continue.set()
518
519 def test_put(self):
520 MAXSIZE = 6
521 queue = self.Queue(maxsize=MAXSIZE)
522 child_can_start = self.Event()
523 parent_can_continue = self.Event()
524
525 proc = self.Process(
526 target=self._test_put,
527 args=(queue, child_can_start, parent_can_continue)
528 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000529 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 proc.start()
531
532 self.assertEqual(queue_empty(queue), True)
533 self.assertEqual(queue_full(queue, MAXSIZE), False)
534
535 queue.put(1)
536 queue.put(2, True)
537 queue.put(3, True, None)
538 queue.put(4, False)
539 queue.put(5, False, None)
540 queue.put_nowait(6)
541
542 # the values may be in buffer but not yet in pipe so sleep a bit
543 time.sleep(DELTA)
544
545 self.assertEqual(queue_empty(queue), False)
546 self.assertEqual(queue_full(queue, MAXSIZE), True)
547
548 put = TimingWrapper(queue.put)
549 put_nowait = TimingWrapper(queue.put_nowait)
550
551 self.assertRaises(pyqueue.Full, put, 7, False)
552 self.assertTimingAlmostEqual(put.elapsed, 0)
553
554 self.assertRaises(pyqueue.Full, put, 7, False, None)
555 self.assertTimingAlmostEqual(put.elapsed, 0)
556
557 self.assertRaises(pyqueue.Full, put_nowait, 7)
558 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
559
560 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
561 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
562
563 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
564 self.assertTimingAlmostEqual(put.elapsed, 0)
565
566 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
567 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
568
569 child_can_start.set()
570 parent_can_continue.wait()
571
572 self.assertEqual(queue_empty(queue), True)
573 self.assertEqual(queue_full(queue, MAXSIZE), False)
574
575 proc.join()
576
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000577 @classmethod
578 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000579 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000580 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000581 queue.put(2)
582 queue.put(3)
583 queue.put(4)
584 queue.put(5)
585 parent_can_continue.set()
586
587 def test_get(self):
588 queue = self.Queue()
589 child_can_start = self.Event()
590 parent_can_continue = self.Event()
591
592 proc = self.Process(
593 target=self._test_get,
594 args=(queue, child_can_start, parent_can_continue)
595 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000596 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000597 proc.start()
598
599 self.assertEqual(queue_empty(queue), True)
600
601 child_can_start.set()
602 parent_can_continue.wait()
603
604 time.sleep(DELTA)
605 self.assertEqual(queue_empty(queue), False)
606
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000607 # Hangs unexpectedly, remove for now
608 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000609 self.assertEqual(queue.get(True, None), 2)
610 self.assertEqual(queue.get(True), 3)
611 self.assertEqual(queue.get(timeout=1), 4)
612 self.assertEqual(queue.get_nowait(), 5)
613
614 self.assertEqual(queue_empty(queue), True)
615
616 get = TimingWrapper(queue.get)
617 get_nowait = TimingWrapper(queue.get_nowait)
618
619 self.assertRaises(pyqueue.Empty, get, False)
620 self.assertTimingAlmostEqual(get.elapsed, 0)
621
622 self.assertRaises(pyqueue.Empty, get, False, None)
623 self.assertTimingAlmostEqual(get.elapsed, 0)
624
625 self.assertRaises(pyqueue.Empty, get_nowait)
626 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
627
628 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
629 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
630
631 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
632 self.assertTimingAlmostEqual(get.elapsed, 0)
633
634 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
635 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
636
637 proc.join()
638
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000639 @classmethod
640 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000641 for i in range(10, 20):
642 queue.put(i)
643 # note that at this point the items may only be buffered, so the
644 # process cannot shutdown until the feeder thread has finished
645 # pushing items onto the pipe.
646
647 def test_fork(self):
648 # Old versions of Queue would fail to create a new feeder
649 # thread for a forked process if the original process had its
650 # own feeder thread. This test checks that this no longer
651 # happens.
652
653 queue = self.Queue()
654
655 # put items on queue so that main process starts a feeder thread
656 for i in range(10):
657 queue.put(i)
658
659 # wait to make sure thread starts before we fork a new process
660 time.sleep(DELTA)
661
662 # fork process
663 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200664 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000665 p.start()
666
667 # check that all expected items are in the queue
668 for i in range(20):
669 self.assertEqual(queue.get(), i)
670 self.assertRaises(pyqueue.Empty, queue.get, False)
671
672 p.join()
673
674 def test_qsize(self):
675 q = self.Queue()
676 try:
677 self.assertEqual(q.qsize(), 0)
678 except NotImplementedError:
Zachary Ware9fe6d862013-12-08 00:20:35 -0600679 self.skipTest('qsize method not implemented')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000680 q.put(1)
681 self.assertEqual(q.qsize(), 1)
682 q.put(5)
683 self.assertEqual(q.qsize(), 2)
684 q.get()
685 self.assertEqual(q.qsize(), 1)
686 q.get()
687 self.assertEqual(q.qsize(), 0)
688
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000689 @classmethod
690 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000691 for obj in iter(q.get, None):
692 time.sleep(DELTA)
693 q.task_done()
694
695 def test_task_done(self):
696 queue = self.JoinableQueue()
697
Benjamin Petersone711caf2008-06-11 16:44:04 +0000698 workers = [self.Process(target=self._test_task_done, args=(queue,))
699 for i in range(4)]
700
701 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200702 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000703 p.start()
704
705 for i in range(10):
706 queue.put(i)
707
708 queue.join()
709
710 for p in workers:
711 queue.put(None)
712
713 for p in workers:
714 p.join()
715
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200716 def test_timeout(self):
717 q = multiprocessing.Queue()
718 start = time.time()
Victor Stinneraad7b2e2015-02-05 14:25:05 +0100719 self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200720 delta = time.time() - start
Victor Stinneraad7b2e2015-02-05 14:25:05 +0100721 # Tolerate a delta of 30 ms because of the bad clock resolution on
722 # Windows (usually 15.6 ms)
723 self.assertGreaterEqual(delta, 0.170)
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200724
Benjamin Petersone711caf2008-06-11 16:44:04 +0000725#
726#
727#
728
729class _TestLock(BaseTestCase):
730
731 def test_lock(self):
732 lock = self.Lock()
733 self.assertEqual(lock.acquire(), True)
734 self.assertEqual(lock.acquire(False), False)
735 self.assertEqual(lock.release(), None)
736 self.assertRaises((ValueError, threading.ThreadError), lock.release)
737
738 def test_rlock(self):
739 lock = self.RLock()
740 self.assertEqual(lock.acquire(), True)
741 self.assertEqual(lock.acquire(), True)
742 self.assertEqual(lock.acquire(), True)
743 self.assertEqual(lock.release(), None)
744 self.assertEqual(lock.release(), None)
745 self.assertEqual(lock.release(), None)
746 self.assertRaises((AssertionError, RuntimeError), lock.release)
747
Jesse Nollerf8d00852009-03-31 03:25:07 +0000748 def test_lock_context(self):
749 with self.Lock():
750 pass
751
Benjamin Petersone711caf2008-06-11 16:44:04 +0000752
753class _TestSemaphore(BaseTestCase):
754
755 def _test_semaphore(self, sem):
756 self.assertReturnsIfImplemented(2, get_value, sem)
757 self.assertEqual(sem.acquire(), True)
758 self.assertReturnsIfImplemented(1, get_value, sem)
759 self.assertEqual(sem.acquire(), True)
760 self.assertReturnsIfImplemented(0, get_value, sem)
761 self.assertEqual(sem.acquire(False), False)
762 self.assertReturnsIfImplemented(0, get_value, sem)
763 self.assertEqual(sem.release(), None)
764 self.assertReturnsIfImplemented(1, get_value, sem)
765 self.assertEqual(sem.release(), None)
766 self.assertReturnsIfImplemented(2, get_value, sem)
767
768 def test_semaphore(self):
769 sem = self.Semaphore(2)
770 self._test_semaphore(sem)
771 self.assertEqual(sem.release(), None)
772 self.assertReturnsIfImplemented(3, get_value, sem)
773 self.assertEqual(sem.release(), None)
774 self.assertReturnsIfImplemented(4, get_value, sem)
775
776 def test_bounded_semaphore(self):
777 sem = self.BoundedSemaphore(2)
778 self._test_semaphore(sem)
779 # Currently fails on OS/X
780 #if HAVE_GETVALUE:
781 # self.assertRaises(ValueError, sem.release)
782 # self.assertReturnsIfImplemented(2, get_value, sem)
783
784 def test_timeout(self):
785 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600786 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787
788 sem = self.Semaphore(0)
789 acquire = TimingWrapper(sem.acquire)
790
791 self.assertEqual(acquire(False), False)
792 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
793
794 self.assertEqual(acquire(False, None), False)
795 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
796
797 self.assertEqual(acquire(False, TIMEOUT1), False)
798 self.assertTimingAlmostEqual(acquire.elapsed, 0)
799
800 self.assertEqual(acquire(True, TIMEOUT2), False)
801 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
802
803 self.assertEqual(acquire(timeout=TIMEOUT3), False)
804 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
805
806
807class _TestCondition(BaseTestCase):
808
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000809 @classmethod
810 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811 cond.acquire()
812 sleeping.release()
813 cond.wait(timeout)
814 woken.release()
815 cond.release()
816
817 def check_invariant(self, cond):
818 # this is only supposed to succeed when there are no sleepers
819 if self.TYPE == 'processes':
820 try:
821 sleepers = (cond._sleeping_count.get_value() -
822 cond._woken_count.get_value())
823 self.assertEqual(sleepers, 0)
824 self.assertEqual(cond._wait_semaphore.get_value(), 0)
825 except NotImplementedError:
826 pass
827
828 def test_notify(self):
829 cond = self.Condition()
830 sleeping = self.Semaphore(0)
831 woken = self.Semaphore(0)
832
833 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000834 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 p.start()
836
837 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000838 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000839 p.start()
840
841 # wait for both children to start sleeping
842 sleeping.acquire()
843 sleeping.acquire()
844
845 # check no process/thread has woken up
846 time.sleep(DELTA)
847 self.assertReturnsIfImplemented(0, get_value, woken)
848
849 # wake up one process/thread
850 cond.acquire()
851 cond.notify()
852 cond.release()
853
854 # check one process/thread has woken up
855 time.sleep(DELTA)
856 self.assertReturnsIfImplemented(1, get_value, woken)
857
858 # wake up another
859 cond.acquire()
860 cond.notify()
861 cond.release()
862
863 # check other has woken up
864 time.sleep(DELTA)
865 self.assertReturnsIfImplemented(2, get_value, woken)
866
867 # check state is not mucked up
868 self.check_invariant(cond)
869 p.join()
870
871 def test_notify_all(self):
872 cond = self.Condition()
873 sleeping = self.Semaphore(0)
874 woken = self.Semaphore(0)
875
876 # start some threads/processes which will timeout
877 for i in range(3):
878 p = self.Process(target=self.f,
879 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000880 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881 p.start()
882
883 t = threading.Thread(target=self.f,
884 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000885 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000886 t.start()
887
888 # wait for them all to sleep
889 for i in range(6):
890 sleeping.acquire()
891
892 # check they have all timed out
893 for i in range(6):
894 woken.acquire()
895 self.assertReturnsIfImplemented(0, get_value, woken)
896
897 # check state is not mucked up
898 self.check_invariant(cond)
899
900 # start some more threads/processes
901 for i in range(3):
902 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000903 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 p.start()
905
906 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000907 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908 t.start()
909
910 # wait for them to all sleep
911 for i in range(6):
912 sleeping.acquire()
913
914 # check no process/thread has woken up
915 time.sleep(DELTA)
916 self.assertReturnsIfImplemented(0, get_value, woken)
917
918 # wake them all up
919 cond.acquire()
920 cond.notify_all()
921 cond.release()
922
923 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200924 for i in range(10):
925 try:
926 if get_value(woken) == 6:
927 break
928 except NotImplementedError:
929 break
930 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000931 self.assertReturnsIfImplemented(6, get_value, woken)
932
933 # check state is not mucked up
934 self.check_invariant(cond)
935
936 def test_timeout(self):
937 cond = self.Condition()
938 wait = TimingWrapper(cond.wait)
939 cond.acquire()
940 res = wait(TIMEOUT1)
941 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000942 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000943 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
944
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200945 @classmethod
946 def _test_waitfor_f(cls, cond, state):
947 with cond:
948 state.value = 0
949 cond.notify()
950 result = cond.wait_for(lambda : state.value==4)
951 if not result or state.value != 4:
952 sys.exit(1)
953
954 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
955 def test_waitfor(self):
956 # based on test in test/lock_tests.py
957 cond = self.Condition()
958 state = self.Value('i', -1)
959
960 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
961 p.daemon = True
962 p.start()
963
964 with cond:
965 result = cond.wait_for(lambda : state.value==0)
966 self.assertTrue(result)
967 self.assertEqual(state.value, 0)
968
969 for i in range(4):
970 time.sleep(0.01)
971 with cond:
972 state.value += 1
973 cond.notify()
974
975 p.join(5)
976 self.assertFalse(p.is_alive())
977 self.assertEqual(p.exitcode, 0)
978
979 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100980 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
981 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200982 with cond:
983 expected = 0.1
984 dt = time.time()
985 result = cond.wait_for(lambda : state.value==4, timeout=expected)
986 dt = time.time() - dt
987 # borrow logic in assertTimeout() from test/lock_tests.py
988 if not result and expected * 0.6 < dt < expected * 10.0:
989 success.value = True
990
991 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
992 def test_waitfor_timeout(self):
993 # based on test in test/lock_tests.py
994 cond = self.Condition()
995 state = self.Value('i', 0)
996 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100997 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200998
999 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001000 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001001 p.daemon = True
1002 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001003 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001004
1005 # Only increment 3 times, so state == 4 is never reached.
1006 for i in range(3):
1007 time.sleep(0.01)
1008 with cond:
1009 state.value += 1
1010 cond.notify()
1011
1012 p.join(5)
1013 self.assertTrue(success.value)
1014
Richard Oudkerk98449932012-06-05 13:15:29 +01001015 @classmethod
1016 def _test_wait_result(cls, c, pid):
1017 with c:
1018 c.notify()
1019 time.sleep(1)
1020 if pid is not None:
1021 os.kill(pid, signal.SIGINT)
1022
1023 def test_wait_result(self):
1024 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1025 pid = os.getpid()
1026 else:
1027 pid = None
1028
1029 c = self.Condition()
1030 with c:
1031 self.assertFalse(c.wait(0))
1032 self.assertFalse(c.wait(0.1))
1033
1034 p = self.Process(target=self._test_wait_result, args=(c, pid))
1035 p.start()
1036
1037 self.assertTrue(c.wait(10))
1038 if pid is not None:
1039 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1040
1041 p.join()
1042
Benjamin Petersone711caf2008-06-11 16:44:04 +00001043
1044class _TestEvent(BaseTestCase):
1045
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001046 @classmethod
1047 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 time.sleep(TIMEOUT2)
1049 event.set()
1050
1051 def test_event(self):
1052 event = self.Event()
1053 wait = TimingWrapper(event.wait)
1054
Ezio Melotti13925002011-03-16 11:05:33 +02001055 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001056 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001057 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058
Benjamin Peterson965ce872009-04-05 21:24:58 +00001059 # Removed, threading.Event.wait() will return the value of the __flag
1060 # instead of None. API Shear with the semaphore backed mp.Event
1061 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001062 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001063 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001064 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1065
1066 event.set()
1067
1068 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001069 self.assertEqual(event.is_set(), True)
1070 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001071 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001072 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1074 # self.assertEqual(event.is_set(), True)
1075
1076 event.clear()
1077
1078 #self.assertEqual(event.is_set(), False)
1079
Jesus Cea94f964f2011-09-09 20:26:57 +02001080 p = self.Process(target=self._test_event, args=(event,))
1081 p.daemon = True
1082 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001083 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001084
1085#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001086# Tests for Barrier - adapted from tests in test/lock_tests.py
1087#
1088
1089# Many of the tests for threading.Barrier use a list as an atomic
1090# counter: a value is appended to increment the counter, and the
1091# length of the list gives the value. We use the class DummyList
1092# for the same purpose.
1093
1094class _DummyList(object):
1095
1096 def __init__(self):
1097 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1098 lock = multiprocessing.Lock()
1099 self.__setstate__((wrapper, lock))
1100 self._lengthbuf[0] = 0
1101
1102 def __setstate__(self, state):
1103 (self._wrapper, self._lock) = state
1104 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1105
1106 def __getstate__(self):
1107 return (self._wrapper, self._lock)
1108
1109 def append(self, _):
1110 with self._lock:
1111 self._lengthbuf[0] += 1
1112
1113 def __len__(self):
1114 with self._lock:
1115 return self._lengthbuf[0]
1116
1117def _wait():
1118 # A crude wait/yield function not relying on synchronization primitives.
1119 time.sleep(0.01)
1120
1121
1122class Bunch(object):
1123 """
1124 A bunch of threads.
1125 """
1126 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1127 """
1128 Construct a bunch of `n` threads running the same function `f`.
1129 If `wait_before_exit` is True, the threads won't terminate until
1130 do_finish() is called.
1131 """
1132 self.f = f
1133 self.args = args
1134 self.n = n
1135 self.started = namespace.DummyList()
1136 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001137 self._can_exit = namespace.Event()
1138 if not wait_before_exit:
1139 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001140 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001141 p = namespace.Process(target=self.task)
1142 p.daemon = True
1143 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001144
1145 def task(self):
1146 pid = os.getpid()
1147 self.started.append(pid)
1148 try:
1149 self.f(*self.args)
1150 finally:
1151 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001152 self._can_exit.wait(30)
1153 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001154
1155 def wait_for_started(self):
1156 while len(self.started) < self.n:
1157 _wait()
1158
1159 def wait_for_finished(self):
1160 while len(self.finished) < self.n:
1161 _wait()
1162
1163 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001164 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001165
1166
1167class AppendTrue(object):
1168 def __init__(self, obj):
1169 self.obj = obj
1170 def __call__(self):
1171 self.obj.append(True)
1172
1173
1174class _TestBarrier(BaseTestCase):
1175 """
1176 Tests for Barrier objects.
1177 """
1178 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001179 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001180
1181 def setUp(self):
1182 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1183
1184 def tearDown(self):
1185 self.barrier.abort()
1186 self.barrier = None
1187
1188 def DummyList(self):
1189 if self.TYPE == 'threads':
1190 return []
1191 elif self.TYPE == 'manager':
1192 return self.manager.list()
1193 else:
1194 return _DummyList()
1195
1196 def run_threads(self, f, args):
1197 b = Bunch(self, f, args, self.N-1)
1198 f(*args)
1199 b.wait_for_finished()
1200
1201 @classmethod
1202 def multipass(cls, barrier, results, n):
1203 m = barrier.parties
1204 assert m == cls.N
1205 for i in range(n):
1206 results[0].append(True)
1207 assert len(results[1]) == i * m
1208 barrier.wait()
1209 results[1].append(True)
1210 assert len(results[0]) == (i + 1) * m
1211 barrier.wait()
1212 try:
1213 assert barrier.n_waiting == 0
1214 except NotImplementedError:
1215 pass
1216 assert not barrier.broken
1217
1218 def test_barrier(self, passes=1):
1219 """
1220 Test that a barrier is passed in lockstep
1221 """
1222 results = [self.DummyList(), self.DummyList()]
1223 self.run_threads(self.multipass, (self.barrier, results, passes))
1224
1225 def test_barrier_10(self):
1226 """
1227 Test that a barrier works for 10 consecutive runs
1228 """
1229 return self.test_barrier(10)
1230
1231 @classmethod
1232 def _test_wait_return_f(cls, barrier, queue):
1233 res = barrier.wait()
1234 queue.put(res)
1235
1236 def test_wait_return(self):
1237 """
1238 test the return value from barrier.wait
1239 """
1240 queue = self.Queue()
1241 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1242 results = [queue.get() for i in range(self.N)]
1243 self.assertEqual(results.count(0), 1)
1244
1245 @classmethod
1246 def _test_action_f(cls, barrier, results):
1247 barrier.wait()
1248 if len(results) != 1:
1249 raise RuntimeError
1250
1251 def test_action(self):
1252 """
1253 Test the 'action' callback
1254 """
1255 results = self.DummyList()
1256 barrier = self.Barrier(self.N, action=AppendTrue(results))
1257 self.run_threads(self._test_action_f, (barrier, results))
1258 self.assertEqual(len(results), 1)
1259
1260 @classmethod
1261 def _test_abort_f(cls, barrier, results1, results2):
1262 try:
1263 i = barrier.wait()
1264 if i == cls.N//2:
1265 raise RuntimeError
1266 barrier.wait()
1267 results1.append(True)
1268 except threading.BrokenBarrierError:
1269 results2.append(True)
1270 except RuntimeError:
1271 barrier.abort()
1272
1273 def test_abort(self):
1274 """
1275 Test that an abort will put the barrier in a broken state
1276 """
1277 results1 = self.DummyList()
1278 results2 = self.DummyList()
1279 self.run_threads(self._test_abort_f,
1280 (self.barrier, results1, results2))
1281 self.assertEqual(len(results1), 0)
1282 self.assertEqual(len(results2), self.N-1)
1283 self.assertTrue(self.barrier.broken)
1284
1285 @classmethod
1286 def _test_reset_f(cls, barrier, results1, results2, results3):
1287 i = barrier.wait()
1288 if i == cls.N//2:
1289 # Wait until the other threads are all in the barrier.
1290 while barrier.n_waiting < cls.N-1:
1291 time.sleep(0.001)
1292 barrier.reset()
1293 else:
1294 try:
1295 barrier.wait()
1296 results1.append(True)
1297 except threading.BrokenBarrierError:
1298 results2.append(True)
1299 # Now, pass the barrier again
1300 barrier.wait()
1301 results3.append(True)
1302
1303 def test_reset(self):
1304 """
1305 Test that a 'reset' on a barrier frees the waiting threads
1306 """
1307 results1 = self.DummyList()
1308 results2 = self.DummyList()
1309 results3 = self.DummyList()
1310 self.run_threads(self._test_reset_f,
1311 (self.barrier, results1, results2, results3))
1312 self.assertEqual(len(results1), 0)
1313 self.assertEqual(len(results2), self.N-1)
1314 self.assertEqual(len(results3), self.N)
1315
1316 @classmethod
1317 def _test_abort_and_reset_f(cls, barrier, barrier2,
1318 results1, results2, results3):
1319 try:
1320 i = barrier.wait()
1321 if i == cls.N//2:
1322 raise RuntimeError
1323 barrier.wait()
1324 results1.append(True)
1325 except threading.BrokenBarrierError:
1326 results2.append(True)
1327 except RuntimeError:
1328 barrier.abort()
1329 # Synchronize and reset the barrier. Must synchronize first so
1330 # that everyone has left it when we reset, and after so that no
1331 # one enters it before the reset.
1332 if barrier2.wait() == cls.N//2:
1333 barrier.reset()
1334 barrier2.wait()
1335 barrier.wait()
1336 results3.append(True)
1337
1338 def test_abort_and_reset(self):
1339 """
1340 Test that a barrier can be reset after being broken.
1341 """
1342 results1 = self.DummyList()
1343 results2 = self.DummyList()
1344 results3 = self.DummyList()
1345 barrier2 = self.Barrier(self.N)
1346
1347 self.run_threads(self._test_abort_and_reset_f,
1348 (self.barrier, barrier2, results1, results2, results3))
1349 self.assertEqual(len(results1), 0)
1350 self.assertEqual(len(results2), self.N-1)
1351 self.assertEqual(len(results3), self.N)
1352
1353 @classmethod
1354 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001355 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001356 if i == cls.N//2:
1357 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001358 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001359 try:
1360 barrier.wait(0.5)
1361 except threading.BrokenBarrierError:
1362 results.append(True)
1363
1364 def test_timeout(self):
1365 """
1366 Test wait(timeout)
1367 """
1368 results = self.DummyList()
1369 self.run_threads(self._test_timeout_f, (self.barrier, results))
1370 self.assertEqual(len(results), self.barrier.parties)
1371
1372 @classmethod
1373 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001374 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001375 if i == cls.N//2:
1376 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001377 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001378 try:
1379 barrier.wait()
1380 except threading.BrokenBarrierError:
1381 results.append(True)
1382
1383 def test_default_timeout(self):
1384 """
1385 Test the barrier's default timeout
1386 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001387 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001388 results = self.DummyList()
1389 self.run_threads(self._test_default_timeout_f, (barrier, results))
1390 self.assertEqual(len(results), barrier.parties)
1391
1392 def test_single_thread(self):
1393 b = self.Barrier(1)
1394 b.wait()
1395 b.wait()
1396
1397 @classmethod
1398 def _test_thousand_f(cls, barrier, passes, conn, lock):
1399 for i in range(passes):
1400 barrier.wait()
1401 with lock:
1402 conn.send(i)
1403
1404 def test_thousand(self):
1405 if self.TYPE == 'manager':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001406 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk3730a172012-06-15 18:26:07 +01001407 passes = 1000
1408 lock = self.Lock()
1409 conn, child_conn = self.Pipe(False)
1410 for j in range(self.N):
1411 p = self.Process(target=self._test_thousand_f,
1412 args=(self.barrier, passes, child_conn, lock))
1413 p.start()
1414
1415 for i in range(passes):
1416 for j in range(self.N):
1417 self.assertEqual(conn.recv(), i)
1418
1419#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001420#
1421#
1422
1423class _TestValue(BaseTestCase):
1424
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001425 ALLOWED_TYPES = ('processes',)
1426
Benjamin Petersone711caf2008-06-11 16:44:04 +00001427 codes_values = [
1428 ('i', 4343, 24234),
1429 ('d', 3.625, -4.25),
1430 ('h', -232, 234),
1431 ('c', latin('x'), latin('y'))
1432 ]
1433
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001434 def setUp(self):
1435 if not HAS_SHAREDCTYPES:
1436 self.skipTest("requires multiprocessing.sharedctypes")
1437
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001438 @classmethod
1439 def _test(cls, values):
1440 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001441 sv.value = cv[2]
1442
1443
1444 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001445 if raw:
1446 values = [self.RawValue(code, value)
1447 for code, value, _ in self.codes_values]
1448 else:
1449 values = [self.Value(code, value)
1450 for code, value, _ in self.codes_values]
1451
1452 for sv, cv in zip(values, self.codes_values):
1453 self.assertEqual(sv.value, cv[1])
1454
1455 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001456 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001457 proc.start()
1458 proc.join()
1459
1460 for sv, cv in zip(values, self.codes_values):
1461 self.assertEqual(sv.value, cv[2])
1462
1463 def test_rawvalue(self):
1464 self.test_value(raw=True)
1465
1466 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001467 val1 = self.Value('i', 5)
1468 lock1 = val1.get_lock()
1469 obj1 = val1.get_obj()
1470
1471 val2 = self.Value('i', 5, lock=None)
1472 lock2 = val2.get_lock()
1473 obj2 = val2.get_obj()
1474
1475 lock = self.Lock()
1476 val3 = self.Value('i', 5, lock=lock)
1477 lock3 = val3.get_lock()
1478 obj3 = val3.get_obj()
1479 self.assertEqual(lock, lock3)
1480
Jesse Nollerb0516a62009-01-18 03:11:38 +00001481 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001482 self.assertFalse(hasattr(arr4, 'get_lock'))
1483 self.assertFalse(hasattr(arr4, 'get_obj'))
1484
Jesse Nollerb0516a62009-01-18 03:11:38 +00001485 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1486
1487 arr5 = self.RawValue('i', 5)
1488 self.assertFalse(hasattr(arr5, 'get_lock'))
1489 self.assertFalse(hasattr(arr5, 'get_obj'))
1490
Benjamin Petersone711caf2008-06-11 16:44:04 +00001491
1492class _TestArray(BaseTestCase):
1493
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001494 ALLOWED_TYPES = ('processes',)
1495
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001496 @classmethod
1497 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001498 for i in range(1, len(seq)):
1499 seq[i] += seq[i-1]
1500
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001501 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001502 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001503 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1504 if raw:
1505 arr = self.RawArray('i', seq)
1506 else:
1507 arr = self.Array('i', seq)
1508
1509 self.assertEqual(len(arr), len(seq))
1510 self.assertEqual(arr[3], seq[3])
1511 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1512
1513 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1514
1515 self.assertEqual(list(arr[:]), seq)
1516
1517 self.f(seq)
1518
1519 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001520 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001521 p.start()
1522 p.join()
1523
1524 self.assertEqual(list(arr[:]), seq)
1525
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001526 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001527 def test_array_from_size(self):
1528 size = 10
1529 # Test for zeroing (see issue #11675).
1530 # The repetition below strengthens the test by increasing the chances
1531 # of previously allocated non-zero memory being used for the new array
1532 # on the 2nd and 3rd loops.
1533 for _ in range(3):
1534 arr = self.Array('i', size)
1535 self.assertEqual(len(arr), size)
1536 self.assertEqual(list(arr), [0] * size)
1537 arr[:] = range(10)
1538 self.assertEqual(list(arr), list(range(10)))
1539 del arr
1540
1541 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001542 def test_rawarray(self):
1543 self.test_array(raw=True)
1544
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001545 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001546 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001547 arr1 = self.Array('i', list(range(10)))
1548 lock1 = arr1.get_lock()
1549 obj1 = arr1.get_obj()
1550
1551 arr2 = self.Array('i', list(range(10)), lock=None)
1552 lock2 = arr2.get_lock()
1553 obj2 = arr2.get_obj()
1554
1555 lock = self.Lock()
1556 arr3 = self.Array('i', list(range(10)), lock=lock)
1557 lock3 = arr3.get_lock()
1558 obj3 = arr3.get_obj()
1559 self.assertEqual(lock, lock3)
1560
Jesse Nollerb0516a62009-01-18 03:11:38 +00001561 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001562 self.assertFalse(hasattr(arr4, 'get_lock'))
1563 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001564 self.assertRaises(AttributeError,
1565 self.Array, 'i', range(10), lock='notalock')
1566
1567 arr5 = self.RawArray('i', range(10))
1568 self.assertFalse(hasattr(arr5, 'get_lock'))
1569 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001570
1571#
1572#
1573#
1574
1575class _TestContainers(BaseTestCase):
1576
1577 ALLOWED_TYPES = ('manager',)
1578
1579 def test_list(self):
1580 a = self.list(list(range(10)))
1581 self.assertEqual(a[:], list(range(10)))
1582
1583 b = self.list()
1584 self.assertEqual(b[:], [])
1585
1586 b.extend(list(range(5)))
1587 self.assertEqual(b[:], list(range(5)))
1588
1589 self.assertEqual(b[2], 2)
1590 self.assertEqual(b[2:10], [2,3,4])
1591
1592 b *= 2
1593 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1594
1595 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1596
1597 self.assertEqual(a[:], list(range(10)))
1598
1599 d = [a, b]
1600 e = self.list(d)
1601 self.assertEqual(
1602 e[:],
1603 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1604 )
1605
1606 f = self.list([a])
1607 a.append('hello')
1608 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1609
1610 def test_dict(self):
1611 d = self.dict()
1612 indices = list(range(65, 70))
1613 for i in indices:
1614 d[i] = chr(i)
1615 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1616 self.assertEqual(sorted(d.keys()), indices)
1617 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1618 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1619
1620 def test_namespace(self):
1621 n = self.Namespace()
1622 n.name = 'Bob'
1623 n.job = 'Builder'
1624 n._hidden = 'hidden'
1625 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1626 del n.job
1627 self.assertEqual(str(n), "Namespace(name='Bob')")
1628 self.assertTrue(hasattr(n, 'name'))
1629 self.assertTrue(not hasattr(n, 'job'))
1630
1631#
1632#
1633#
1634
1635def sqr(x, wait=0.0):
1636 time.sleep(wait)
1637 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001638
Antoine Pitroude911b22011-12-21 11:03:24 +01001639def mul(x, y):
1640 return x*y
1641
Benjamin Petersone711caf2008-06-11 16:44:04 +00001642class _TestPool(BaseTestCase):
1643
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01001644 @classmethod
1645 def setUpClass(cls):
1646 super().setUpClass()
1647 cls.pool = cls.Pool(4)
1648
1649 @classmethod
1650 def tearDownClass(cls):
1651 cls.pool.terminate()
1652 cls.pool.join()
1653 cls.pool = None
1654 super().tearDownClass()
1655
Benjamin Petersone711caf2008-06-11 16:44:04 +00001656 def test_apply(self):
1657 papply = self.pool.apply
1658 self.assertEqual(papply(sqr, (5,)), sqr(5))
1659 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1660
1661 def test_map(self):
1662 pmap = self.pool.map
1663 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1664 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1665 list(map(sqr, list(range(100)))))
1666
Antoine Pitroude911b22011-12-21 11:03:24 +01001667 def test_starmap(self):
1668 psmap = self.pool.starmap
1669 tuples = list(zip(range(10), range(9,-1, -1)))
1670 self.assertEqual(psmap(mul, tuples),
1671 list(itertools.starmap(mul, tuples)))
1672 tuples = list(zip(range(100), range(99,-1, -1)))
1673 self.assertEqual(psmap(mul, tuples, chunksize=20),
1674 list(itertools.starmap(mul, tuples)))
1675
1676 def test_starmap_async(self):
1677 tuples = list(zip(range(100), range(99,-1, -1)))
1678 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1679 list(itertools.starmap(mul, tuples)))
1680
Hynek Schlawack254af262012-10-27 12:53:02 +02001681 def test_map_async(self):
1682 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1683 list(map(sqr, list(range(10)))))
1684
1685 def test_map_async_callbacks(self):
1686 call_args = self.manager.list() if self.TYPE == 'manager' else []
1687 self.pool.map_async(int, ['1'],
1688 callback=call_args.append,
1689 error_callback=call_args.append).wait()
1690 self.assertEqual(1, len(call_args))
1691 self.assertEqual([1], call_args[0])
1692 self.pool.map_async(int, ['a'],
1693 callback=call_args.append,
1694 error_callback=call_args.append).wait()
1695 self.assertEqual(2, len(call_args))
1696 self.assertIsInstance(call_args[1], ValueError)
1697
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001698 def test_map_unplicklable(self):
1699 # Issue #19425 -- failure to pickle should not cause a hang
1700 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001701 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001702 class A(object):
1703 def __reduce__(self):
1704 raise RuntimeError('cannot pickle')
1705 with self.assertRaises(RuntimeError):
1706 self.pool.map(sqr, [A()]*10)
1707
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001708 def test_map_chunksize(self):
1709 try:
1710 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1711 except multiprocessing.TimeoutError:
1712 self.fail("pool.map_async with chunksize stalled on null list")
1713
Benjamin Petersone711caf2008-06-11 16:44:04 +00001714 def test_async(self):
1715 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1716 get = TimingWrapper(res.get)
1717 self.assertEqual(get(), 49)
1718 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1719
1720 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00001721 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001722 get = TimingWrapper(res.get)
1723 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1724 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1725
1726 def test_imap(self):
1727 it = self.pool.imap(sqr, list(range(10)))
1728 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1729
1730 it = self.pool.imap(sqr, list(range(10)))
1731 for i in range(10):
1732 self.assertEqual(next(it), i*i)
1733 self.assertRaises(StopIteration, it.__next__)
1734
1735 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1736 for i in range(1000):
1737 self.assertEqual(next(it), i*i)
1738 self.assertRaises(StopIteration, it.__next__)
1739
1740 def test_imap_unordered(self):
1741 it = self.pool.imap_unordered(sqr, list(range(1000)))
1742 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1743
1744 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1745 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1746
1747 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001748 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1749 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1750
Benjamin Petersone711caf2008-06-11 16:44:04 +00001751 p = multiprocessing.Pool(3)
1752 self.assertEqual(3, len(p._pool))
1753 p.close()
1754 p.join()
1755
1756 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001757 result = self.pool.map_async(
1758 time.sleep, [0.1 for i in range(10000)], chunksize=1
1759 )
1760 self.pool.terminate()
1761 join = TimingWrapper(self.pool.join)
1762 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001763 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001764
Richard Oudkerke41682b2012-06-06 19:04:57 +01001765 def test_empty_iterable(self):
1766 # See Issue 12157
1767 p = self.Pool(1)
1768
1769 self.assertEqual(p.map(sqr, []), [])
1770 self.assertEqual(list(p.imap(sqr, [])), [])
1771 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1772 self.assertEqual(p.map_async(sqr, []).get(), [])
1773
1774 p.close()
1775 p.join()
1776
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001777 def test_context(self):
1778 if self.TYPE == 'processes':
1779 L = list(range(10))
1780 expected = [sqr(i) for i in L]
1781 with multiprocessing.Pool(2) as p:
1782 r = p.map_async(sqr, L)
1783 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001784 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001785
Richard Oudkerk85757832013-05-06 11:38:25 +01001786 @classmethod
1787 def _test_traceback(cls):
1788 raise RuntimeError(123) # some comment
1789
1790 def test_traceback(self):
1791 # We want ensure that the traceback from the child process is
1792 # contained in the traceback raised in the main process.
1793 if self.TYPE == 'processes':
1794 with self.Pool(1) as p:
1795 try:
1796 p.apply(self._test_traceback)
1797 except Exception as e:
1798 exc = e
1799 else:
1800 raise AssertionError('expected RuntimeError')
1801 self.assertIs(type(exc), RuntimeError)
1802 self.assertEqual(exc.args, (123,))
1803 cause = exc.__cause__
1804 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
1805 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
1806
1807 with test.support.captured_stderr() as f1:
1808 try:
1809 raise exc
1810 except RuntimeError:
1811 sys.excepthook(*sys.exc_info())
1812 self.assertIn('raise RuntimeError(123) # some comment',
1813 f1.getvalue())
1814
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001815 @classmethod
1816 def _test_wrapped_exception(cls):
1817 raise RuntimeError('foo')
1818
1819 def test_wrapped_exception(self):
1820 # Issue #20980: Should not wrap exception when using thread pool
1821 with self.Pool(1) as p:
1822 with self.assertRaises(RuntimeError):
1823 p.apply(self._test_wrapped_exception)
1824
1825
Ask Solem2afcbf22010-11-09 20:55:52 +00001826def raising():
1827 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001828
Ask Solem2afcbf22010-11-09 20:55:52 +00001829def unpickleable_result():
1830 return lambda: 42
1831
1832class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001833 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001834
1835 def test_async_error_callback(self):
1836 p = multiprocessing.Pool(2)
1837
1838 scratchpad = [None]
1839 def errback(exc):
1840 scratchpad[0] = exc
1841
1842 res = p.apply_async(raising, error_callback=errback)
1843 self.assertRaises(KeyError, res.get)
1844 self.assertTrue(scratchpad[0])
1845 self.assertIsInstance(scratchpad[0], KeyError)
1846
1847 p.close()
1848 p.join()
1849
1850 def test_unpickleable_result(self):
1851 from multiprocessing.pool import MaybeEncodingError
1852 p = multiprocessing.Pool(2)
1853
1854 # Make sure we don't lose pool processes because of encoding errors.
1855 for iteration in range(20):
1856
1857 scratchpad = [None]
1858 def errback(exc):
1859 scratchpad[0] = exc
1860
1861 res = p.apply_async(unpickleable_result, error_callback=errback)
1862 self.assertRaises(MaybeEncodingError, res.get)
1863 wrapped = scratchpad[0]
1864 self.assertTrue(wrapped)
1865 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1866 self.assertIsNotNone(wrapped.exc)
1867 self.assertIsNotNone(wrapped.value)
1868
1869 p.close()
1870 p.join()
1871
1872class _TestPoolWorkerLifetime(BaseTestCase):
1873 ALLOWED_TYPES = ('processes', )
1874
Jesse Noller1f0b6582010-01-27 03:36:01 +00001875 def test_pool_worker_lifetime(self):
1876 p = multiprocessing.Pool(3, maxtasksperchild=10)
1877 self.assertEqual(3, len(p._pool))
1878 origworkerpids = [w.pid for w in p._pool]
1879 # Run many tasks so each worker gets replaced (hopefully)
1880 results = []
1881 for i in range(100):
1882 results.append(p.apply_async(sqr, (i, )))
1883 # Fetch the results and verify we got the right answers,
1884 # also ensuring all the tasks have completed.
1885 for (j, res) in enumerate(results):
1886 self.assertEqual(res.get(), sqr(j))
1887 # Refill the pool
1888 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001889 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001890 # (countdown * DELTA = 5 seconds max startup process time)
1891 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001892 while countdown and not all(w.is_alive() for w in p._pool):
1893 countdown -= 1
1894 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001895 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001896 # All pids should be assigned. See issue #7805.
1897 self.assertNotIn(None, origworkerpids)
1898 self.assertNotIn(None, finalworkerpids)
1899 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001900 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1901 p.close()
1902 p.join()
1903
Charles-François Natalif8859e12011-10-24 18:45:29 +02001904 def test_pool_worker_lifetime_early_close(self):
1905 # Issue #10332: closing a pool whose workers have limited lifetimes
1906 # before all the tasks completed would make join() hang.
1907 p = multiprocessing.Pool(3, maxtasksperchild=1)
1908 results = []
1909 for i in range(6):
1910 results.append(p.apply_async(sqr, (i, 0.3)))
1911 p.close()
1912 p.join()
1913 # check the results
1914 for (j, res) in enumerate(results):
1915 self.assertEqual(res.get(), sqr(j))
1916
Benjamin Petersone711caf2008-06-11 16:44:04 +00001917#
1918# Test of creating a customized manager class
1919#
1920
1921from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1922
1923class FooBar(object):
1924 def f(self):
1925 return 'f()'
1926 def g(self):
1927 raise ValueError
1928 def _h(self):
1929 return '_h()'
1930
1931def baz():
1932 for i in range(10):
1933 yield i*i
1934
1935class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001936 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001937 def __iter__(self):
1938 return self
1939 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001940 return self._callmethod('__next__')
1941
1942class MyManager(BaseManager):
1943 pass
1944
1945MyManager.register('Foo', callable=FooBar)
1946MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1947MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1948
1949
1950class _TestMyManager(BaseTestCase):
1951
1952 ALLOWED_TYPES = ('manager',)
1953
1954 def test_mymanager(self):
1955 manager = MyManager()
1956 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001957 self.common(manager)
1958 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001959
Richard Oudkerkac385712012-06-18 21:29:30 +01001960 # If the manager process exited cleanly then the exitcode
1961 # will be zero. Otherwise (after a short timeout)
1962 # terminate() is used, resulting in an exitcode of -SIGTERM.
1963 self.assertEqual(manager._process.exitcode, 0)
1964
1965 def test_mymanager_context(self):
1966 with MyManager() as manager:
1967 self.common(manager)
1968 self.assertEqual(manager._process.exitcode, 0)
1969
1970 def test_mymanager_context_prestarted(self):
1971 manager = MyManager()
1972 manager.start()
1973 with manager:
1974 self.common(manager)
1975 self.assertEqual(manager._process.exitcode, 0)
1976
1977 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001978 foo = manager.Foo()
1979 bar = manager.Bar()
1980 baz = manager.baz()
1981
1982 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1983 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1984
1985 self.assertEqual(foo_methods, ['f', 'g'])
1986 self.assertEqual(bar_methods, ['f', '_h'])
1987
1988 self.assertEqual(foo.f(), 'f()')
1989 self.assertRaises(ValueError, foo.g)
1990 self.assertEqual(foo._callmethod('f'), 'f()')
1991 self.assertRaises(RemoteError, foo._callmethod, '_h')
1992
1993 self.assertEqual(bar.f(), 'f()')
1994 self.assertEqual(bar._h(), '_h()')
1995 self.assertEqual(bar._callmethod('f'), 'f()')
1996 self.assertEqual(bar._callmethod('_h'), '_h()')
1997
1998 self.assertEqual(list(baz), [i*i for i in range(10)])
1999
Richard Oudkerk73d9a292012-06-14 15:30:10 +01002000
Benjamin Petersone711caf2008-06-11 16:44:04 +00002001#
2002# Test of connecting to a remote server and using xmlrpclib for serialization
2003#
2004
2005_queue = pyqueue.Queue()
2006def get_queue():
2007 return _queue
2008
2009class QueueManager(BaseManager):
2010 '''manager class used by server process'''
2011QueueManager.register('get_queue', callable=get_queue)
2012
2013class QueueManager2(BaseManager):
2014 '''manager class which specifies the same interface as QueueManager'''
2015QueueManager2.register('get_queue')
2016
2017
2018SERIALIZER = 'xmlrpclib'
2019
2020class _TestRemoteManager(BaseTestCase):
2021
2022 ALLOWED_TYPES = ('manager',)
2023
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002024 @classmethod
2025 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002026 manager = QueueManager2(
2027 address=address, authkey=authkey, serializer=SERIALIZER
2028 )
2029 manager.connect()
2030 queue = manager.get_queue()
2031 queue.put(('hello world', None, True, 2.25))
2032
2033 def test_remote(self):
2034 authkey = os.urandom(32)
2035
2036 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002037 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00002038 )
2039 manager.start()
2040
2041 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002042 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002043 p.start()
2044
2045 manager2 = QueueManager2(
2046 address=manager.address, authkey=authkey, serializer=SERIALIZER
2047 )
2048 manager2.connect()
2049 queue = manager2.get_queue()
2050
2051 # Note that xmlrpclib will deserialize object as a list not a tuple
2052 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
2053
2054 # Because we are using xmlrpclib for serialization instead of
2055 # pickle this will cause a serialization error.
2056 self.assertRaises(Exception, queue.put, time.sleep)
2057
2058 # Make queue finalizer run before the server is stopped
2059 del queue
2060 manager.shutdown()
2061
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002062class _TestManagerRestart(BaseTestCase):
2063
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002064 @classmethod
2065 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002066 manager = QueueManager(
2067 address=address, authkey=authkey, serializer=SERIALIZER)
2068 manager.connect()
2069 queue = manager.get_queue()
2070 queue.put('hello world')
2071
2072 def test_rapid_restart(self):
2073 authkey = os.urandom(32)
2074 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002075 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002076 srvr = manager.get_server()
2077 addr = srvr.address
2078 # Close the connection.Listener socket which gets opened as a part
2079 # of manager.get_server(). It's not needed for the test.
2080 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002081 manager.start()
2082
2083 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002084 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002085 p.start()
2086 queue = manager.get_queue()
2087 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002088 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002089 manager.shutdown()
2090 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002091 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002092 try:
2093 manager.start()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002094 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002095 if e.errno != errno.EADDRINUSE:
2096 raise
2097 # Retry after some time, in case the old socket was lingering
2098 # (sporadic failure on buildbots)
2099 time.sleep(1.0)
2100 manager = QueueManager(
2101 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002102 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002103
Benjamin Petersone711caf2008-06-11 16:44:04 +00002104#
2105#
2106#
2107
2108SENTINEL = latin('')
2109
2110class _TestConnection(BaseTestCase):
2111
2112 ALLOWED_TYPES = ('processes', 'threads')
2113
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002114 @classmethod
2115 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002116 for msg in iter(conn.recv_bytes, SENTINEL):
2117 conn.send_bytes(msg)
2118 conn.close()
2119
2120 def test_connection(self):
2121 conn, child_conn = self.Pipe()
2122
2123 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002124 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002125 p.start()
2126
2127 seq = [1, 2.25, None]
2128 msg = latin('hello world')
2129 longmsg = msg * 10
2130 arr = array.array('i', list(range(4)))
2131
2132 if self.TYPE == 'processes':
2133 self.assertEqual(type(conn.fileno()), int)
2134
2135 self.assertEqual(conn.send(seq), None)
2136 self.assertEqual(conn.recv(), seq)
2137
2138 self.assertEqual(conn.send_bytes(msg), None)
2139 self.assertEqual(conn.recv_bytes(), msg)
2140
2141 if self.TYPE == 'processes':
2142 buffer = array.array('i', [0]*10)
2143 expected = list(arr) + [0] * (10 - len(arr))
2144 self.assertEqual(conn.send_bytes(arr), None)
2145 self.assertEqual(conn.recv_bytes_into(buffer),
2146 len(arr) * buffer.itemsize)
2147 self.assertEqual(list(buffer), expected)
2148
2149 buffer = array.array('i', [0]*10)
2150 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2151 self.assertEqual(conn.send_bytes(arr), None)
2152 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2153 len(arr) * buffer.itemsize)
2154 self.assertEqual(list(buffer), expected)
2155
2156 buffer = bytearray(latin(' ' * 40))
2157 self.assertEqual(conn.send_bytes(longmsg), None)
2158 try:
2159 res = conn.recv_bytes_into(buffer)
2160 except multiprocessing.BufferTooShort as e:
2161 self.assertEqual(e.args, (longmsg,))
2162 else:
2163 self.fail('expected BufferTooShort, got %s' % res)
2164
2165 poll = TimingWrapper(conn.poll)
2166
2167 self.assertEqual(poll(), False)
2168 self.assertTimingAlmostEqual(poll.elapsed, 0)
2169
Richard Oudkerk59d54042012-05-10 16:11:12 +01002170 self.assertEqual(poll(-1), False)
2171 self.assertTimingAlmostEqual(poll.elapsed, 0)
2172
Benjamin Petersone711caf2008-06-11 16:44:04 +00002173 self.assertEqual(poll(TIMEOUT1), False)
2174 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2175
2176 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002177 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002178
2179 self.assertEqual(poll(TIMEOUT1), True)
2180 self.assertTimingAlmostEqual(poll.elapsed, 0)
2181
2182 self.assertEqual(conn.recv(), None)
2183
2184 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2185 conn.send_bytes(really_big_msg)
2186 self.assertEqual(conn.recv_bytes(), really_big_msg)
2187
2188 conn.send_bytes(SENTINEL) # tell child to quit
2189 child_conn.close()
2190
2191 if self.TYPE == 'processes':
2192 self.assertEqual(conn.readable, True)
2193 self.assertEqual(conn.writable, True)
2194 self.assertRaises(EOFError, conn.recv)
2195 self.assertRaises(EOFError, conn.recv_bytes)
2196
2197 p.join()
2198
2199 def test_duplex_false(self):
2200 reader, writer = self.Pipe(duplex=False)
2201 self.assertEqual(writer.send(1), None)
2202 self.assertEqual(reader.recv(), 1)
2203 if self.TYPE == 'processes':
2204 self.assertEqual(reader.readable, True)
2205 self.assertEqual(reader.writable, False)
2206 self.assertEqual(writer.readable, False)
2207 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002208 self.assertRaises(OSError, reader.send, 2)
2209 self.assertRaises(OSError, writer.recv)
2210 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002211
2212 def test_spawn_close(self):
2213 # We test that a pipe connection can be closed by parent
2214 # process immediately after child is spawned. On Windows this
2215 # would have sometimes failed on old versions because
2216 # child_conn would be closed before the child got a chance to
2217 # duplicate it.
2218 conn, child_conn = self.Pipe()
2219
2220 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002221 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002222 p.start()
2223 child_conn.close() # this might complete before child initializes
2224
2225 msg = latin('hello')
2226 conn.send_bytes(msg)
2227 self.assertEqual(conn.recv_bytes(), msg)
2228
2229 conn.send_bytes(SENTINEL)
2230 conn.close()
2231 p.join()
2232
2233 def test_sendbytes(self):
2234 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002235 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002236
2237 msg = latin('abcdefghijklmnopqrstuvwxyz')
2238 a, b = self.Pipe()
2239
2240 a.send_bytes(msg)
2241 self.assertEqual(b.recv_bytes(), msg)
2242
2243 a.send_bytes(msg, 5)
2244 self.assertEqual(b.recv_bytes(), msg[5:])
2245
2246 a.send_bytes(msg, 7, 8)
2247 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2248
2249 a.send_bytes(msg, 26)
2250 self.assertEqual(b.recv_bytes(), latin(''))
2251
2252 a.send_bytes(msg, 26, 0)
2253 self.assertEqual(b.recv_bytes(), latin(''))
2254
2255 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2256
2257 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2258
2259 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2260
2261 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2262
2263 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2264
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002265 @classmethod
2266 def _is_fd_assigned(cls, fd):
2267 try:
2268 os.fstat(fd)
2269 except OSError as e:
2270 if e.errno == errno.EBADF:
2271 return False
2272 raise
2273 else:
2274 return True
2275
2276 @classmethod
2277 def _writefd(cls, conn, data, create_dummy_fds=False):
2278 if create_dummy_fds:
2279 for i in range(0, 256):
2280 if not cls._is_fd_assigned(i):
2281 os.dup2(conn.fileno(), i)
2282 fd = reduction.recv_handle(conn)
2283 if msvcrt:
2284 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2285 os.write(fd, data)
2286 os.close(fd)
2287
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002288 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002289 def test_fd_transfer(self):
2290 if self.TYPE != 'processes':
2291 self.skipTest("only makes sense with processes")
2292 conn, child_conn = self.Pipe(duplex=True)
2293
2294 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002295 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002296 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002297 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002298 with open(test.support.TESTFN, "wb") as f:
2299 fd = f.fileno()
2300 if msvcrt:
2301 fd = msvcrt.get_osfhandle(fd)
2302 reduction.send_handle(conn, fd, p.pid)
2303 p.join()
2304 with open(test.support.TESTFN, "rb") as f:
2305 self.assertEqual(f.read(), b"foo")
2306
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002307 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002308 @unittest.skipIf(sys.platform == "win32",
2309 "test semantics don't make sense on Windows")
2310 @unittest.skipIf(MAXFD <= 256,
2311 "largest assignable fd number is too small")
2312 @unittest.skipUnless(hasattr(os, "dup2"),
2313 "test needs os.dup2()")
2314 def test_large_fd_transfer(self):
2315 # With fd > 256 (issue #11657)
2316 if self.TYPE != 'processes':
2317 self.skipTest("only makes sense with processes")
2318 conn, child_conn = self.Pipe(duplex=True)
2319
2320 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002321 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002322 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002323 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002324 with open(test.support.TESTFN, "wb") as f:
2325 fd = f.fileno()
2326 for newfd in range(256, MAXFD):
2327 if not self._is_fd_assigned(newfd):
2328 break
2329 else:
2330 self.fail("could not find an unassigned large file descriptor")
2331 os.dup2(fd, newfd)
2332 try:
2333 reduction.send_handle(conn, newfd, p.pid)
2334 finally:
2335 os.close(newfd)
2336 p.join()
2337 with open(test.support.TESTFN, "rb") as f:
2338 self.assertEqual(f.read(), b"bar")
2339
Jesus Cea4507e642011-09-21 03:53:25 +02002340 @classmethod
2341 def _send_data_without_fd(self, conn):
2342 os.write(conn.fileno(), b"\0")
2343
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002344 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002345 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2346 def test_missing_fd_transfer(self):
2347 # Check that exception is raised when received data is not
2348 # accompanied by a file descriptor in ancillary data.
2349 if self.TYPE != 'processes':
2350 self.skipTest("only makes sense with processes")
2351 conn, child_conn = self.Pipe(duplex=True)
2352
2353 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2354 p.daemon = True
2355 p.start()
2356 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2357 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002358
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002359 def test_context(self):
2360 a, b = self.Pipe()
2361
2362 with a, b:
2363 a.send(1729)
2364 self.assertEqual(b.recv(), 1729)
2365 if self.TYPE == 'processes':
2366 self.assertFalse(a.closed)
2367 self.assertFalse(b.closed)
2368
2369 if self.TYPE == 'processes':
2370 self.assertTrue(a.closed)
2371 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002372 self.assertRaises(OSError, a.recv)
2373 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002374
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002375class _TestListener(BaseTestCase):
2376
Richard Oudkerk91257752012-06-15 21:53:34 +01002377 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002378
2379 def test_multiple_bind(self):
2380 for family in self.connection.families:
2381 l = self.connection.Listener(family=family)
2382 self.addCleanup(l.close)
2383 self.assertRaises(OSError, self.connection.Listener,
2384 l.address, family)
2385
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002386 def test_context(self):
2387 with self.connection.Listener() as l:
2388 with self.connection.Client(l.address) as c:
2389 with l.accept() as d:
2390 c.send(1729)
2391 self.assertEqual(d.recv(), 1729)
2392
2393 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002394 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002395
Benjamin Petersone711caf2008-06-11 16:44:04 +00002396class _TestListenerClient(BaseTestCase):
2397
2398 ALLOWED_TYPES = ('processes', 'threads')
2399
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002400 @classmethod
2401 def _test(cls, address):
2402 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002403 conn.send('hello')
2404 conn.close()
2405
2406 def test_listener_client(self):
2407 for family in self.connection.families:
2408 l = self.connection.Listener(family=family)
2409 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002410 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002411 p.start()
2412 conn = l.accept()
2413 self.assertEqual(conn.recv(), 'hello')
2414 p.join()
2415 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002416
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002417 def test_issue14725(self):
2418 l = self.connection.Listener()
2419 p = self.Process(target=self._test, args=(l.address,))
2420 p.daemon = True
2421 p.start()
2422 time.sleep(1)
2423 # On Windows the client process should by now have connected,
2424 # written data and closed the pipe handle by now. This causes
2425 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2426 # 14725.
2427 conn = l.accept()
2428 self.assertEqual(conn.recv(), 'hello')
2429 conn.close()
2430 p.join()
2431 l.close()
2432
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002433 def test_issue16955(self):
2434 for fam in self.connection.families:
2435 l = self.connection.Listener(family=fam)
2436 c = self.connection.Client(l.address)
2437 a = l.accept()
2438 a.send_bytes(b"hello")
2439 self.assertTrue(c.poll(1))
2440 a.close()
2441 c.close()
2442 l.close()
2443
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002444class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002445
2446 ALLOWED_TYPES = ('processes', 'threads')
2447
2448 def test_empty_string(self):
2449 a, b = self.Pipe()
2450 self.assertEqual(a.poll(), False)
2451 b.send_bytes(b'')
2452 self.assertEqual(a.poll(), True)
2453 self.assertEqual(a.poll(), True)
2454
2455 @classmethod
2456 def _child_strings(cls, conn, strings):
2457 for s in strings:
2458 time.sleep(0.1)
2459 conn.send_bytes(s)
2460 conn.close()
2461
2462 def test_strings(self):
2463 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2464 a, b = self.Pipe()
2465 p = self.Process(target=self._child_strings, args=(b, strings))
2466 p.start()
2467
2468 for s in strings:
2469 for i in range(200):
2470 if a.poll(0.01):
2471 break
2472 x = a.recv_bytes()
2473 self.assertEqual(s, x)
2474
2475 p.join()
2476
2477 @classmethod
2478 def _child_boundaries(cls, r):
2479 # Polling may "pull" a message in to the child process, but we
2480 # don't want it to pull only part of a message, as that would
2481 # corrupt the pipe for any other processes which might later
2482 # read from it.
2483 r.poll(5)
2484
2485 def test_boundaries(self):
2486 r, w = self.Pipe(False)
2487 p = self.Process(target=self._child_boundaries, args=(r,))
2488 p.start()
2489 time.sleep(2)
2490 L = [b"first", b"second"]
2491 for obj in L:
2492 w.send_bytes(obj)
2493 w.close()
2494 p.join()
2495 self.assertIn(r.recv_bytes(), L)
2496
2497 @classmethod
2498 def _child_dont_merge(cls, b):
2499 b.send_bytes(b'a')
2500 b.send_bytes(b'b')
2501 b.send_bytes(b'cd')
2502
2503 def test_dont_merge(self):
2504 a, b = self.Pipe()
2505 self.assertEqual(a.poll(0.0), False)
2506 self.assertEqual(a.poll(0.1), False)
2507
2508 p = self.Process(target=self._child_dont_merge, args=(b,))
2509 p.start()
2510
2511 self.assertEqual(a.recv_bytes(), b'a')
2512 self.assertEqual(a.poll(1.0), True)
2513 self.assertEqual(a.poll(1.0), True)
2514 self.assertEqual(a.recv_bytes(), b'b')
2515 self.assertEqual(a.poll(1.0), True)
2516 self.assertEqual(a.poll(1.0), True)
2517 self.assertEqual(a.poll(0.0), True)
2518 self.assertEqual(a.recv_bytes(), b'cd')
2519
2520 p.join()
2521
Benjamin Petersone711caf2008-06-11 16:44:04 +00002522#
2523# Test of sending connection and socket objects between processes
2524#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002525
2526@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002527class _TestPicklingConnections(BaseTestCase):
2528
2529 ALLOWED_TYPES = ('processes',)
2530
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002531 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002532 def tearDownClass(cls):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002533 from multiprocessing import resource_sharer
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002534 resource_sharer.stop(timeout=5)
2535
2536 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002537 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002538 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002539 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002540 conn.send(l.address)
2541 new_conn = l.accept()
2542 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002543 new_conn.close()
2544 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002545
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002546 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002547 l.bind((test.support.HOST, 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002548 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002549 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002550 new_conn, addr = l.accept()
2551 conn.send(new_conn)
2552 new_conn.close()
2553 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002554
2555 conn.recv()
2556
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002557 @classmethod
2558 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002559 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002560 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002561 client.send(msg.upper())
2562 client.close()
2563
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002564 address, msg = conn.recv()
2565 client = socket.socket()
2566 client.connect(address)
2567 client.sendall(msg.upper())
2568 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002569
2570 conn.close()
2571
2572 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002573 families = self.connection.families
2574
2575 lconn, lconn0 = self.Pipe()
2576 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002577 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002578 lp.start()
2579 lconn0.close()
2580
2581 rconn, rconn0 = self.Pipe()
2582 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002583 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002584 rp.start()
2585 rconn0.close()
2586
2587 for fam in families:
2588 msg = ('This connection uses family %s' % fam).encode('ascii')
2589 address = lconn.recv()
2590 rconn.send((address, msg))
2591 new_conn = lconn.recv()
2592 self.assertEqual(new_conn.recv(), msg.upper())
2593
2594 rconn.send(None)
2595
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002596 msg = latin('This connection uses a normal socket')
2597 address = lconn.recv()
2598 rconn.send((address, msg))
2599 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002600 buf = []
2601 while True:
2602 s = new_conn.recv(100)
2603 if not s:
2604 break
2605 buf.append(s)
2606 buf = b''.join(buf)
2607 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002608 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002609
2610 lconn.send(None)
2611
2612 rconn.close()
2613 lconn.close()
2614
2615 lp.join()
2616 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002617
2618 @classmethod
2619 def child_access(cls, conn):
2620 w = conn.recv()
2621 w.send('all is well')
2622 w.close()
2623
2624 r = conn.recv()
2625 msg = r.recv()
2626 conn.send(msg*2)
2627
2628 conn.close()
2629
2630 def test_access(self):
2631 # On Windows, if we do not specify a destination pid when
2632 # using DupHandle then we need to be careful to use the
2633 # correct access flags for DuplicateHandle(), or else
2634 # DupHandle.detach() will raise PermissionError. For example,
2635 # for a read only pipe handle we should use
2636 # access=FILE_GENERIC_READ. (Unfortunately
2637 # DUPLICATE_SAME_ACCESS does not work.)
2638 conn, child_conn = self.Pipe()
2639 p = self.Process(target=self.child_access, args=(child_conn,))
2640 p.daemon = True
2641 p.start()
2642 child_conn.close()
2643
2644 r, w = self.Pipe(duplex=False)
2645 conn.send(w)
2646 w.close()
2647 self.assertEqual(r.recv(), 'all is well')
2648 r.close()
2649
2650 r, w = self.Pipe(duplex=False)
2651 conn.send(r)
2652 r.close()
2653 w.send('foobar')
2654 w.close()
2655 self.assertEqual(conn.recv(), 'foobar'*2)
2656
Benjamin Petersone711caf2008-06-11 16:44:04 +00002657#
2658#
2659#
2660
2661class _TestHeap(BaseTestCase):
2662
2663 ALLOWED_TYPES = ('processes',)
2664
2665 def test_heap(self):
2666 iterations = 5000
2667 maxblocks = 50
2668 blocks = []
2669
2670 # create and destroy lots of blocks of different sizes
2671 for i in range(iterations):
2672 size = int(random.lognormvariate(0, 1) * 1000)
2673 b = multiprocessing.heap.BufferWrapper(size)
2674 blocks.append(b)
2675 if len(blocks) > maxblocks:
2676 i = random.randrange(maxblocks)
2677 del blocks[i]
2678
2679 # get the heap object
2680 heap = multiprocessing.heap.BufferWrapper._heap
2681
2682 # verify the state of the heap
2683 all = []
2684 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002685 heap._lock.acquire()
2686 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002687 for L in list(heap._len_to_seq.values()):
2688 for arena, start, stop in L:
2689 all.append((heap._arenas.index(arena), start, stop,
2690 stop-start, 'free'))
2691 for arena, start, stop in heap._allocated_blocks:
2692 all.append((heap._arenas.index(arena), start, stop,
2693 stop-start, 'occupied'))
2694 occupied += (stop-start)
2695
2696 all.sort()
2697
2698 for i in range(len(all)-1):
2699 (arena, start, stop) = all[i][:3]
2700 (narena, nstart, nstop) = all[i+1][:3]
2701 self.assertTrue((arena != narena and nstart == 0) or
2702 (stop == nstart))
2703
Charles-François Natali778db492011-07-02 14:35:49 +02002704 def test_free_from_gc(self):
2705 # Check that freeing of blocks by the garbage collector doesn't deadlock
2706 # (issue #12352).
2707 # Make sure the GC is enabled, and set lower collection thresholds to
2708 # make collections more frequent (and increase the probability of
2709 # deadlock).
2710 if not gc.isenabled():
2711 gc.enable()
2712 self.addCleanup(gc.disable)
2713 thresholds = gc.get_threshold()
2714 self.addCleanup(gc.set_threshold, *thresholds)
2715 gc.set_threshold(10)
2716
2717 # perform numerous block allocations, with cyclic references to make
2718 # sure objects are collected asynchronously by the gc
2719 for i in range(5000):
2720 a = multiprocessing.heap.BufferWrapper(1)
2721 b = multiprocessing.heap.BufferWrapper(1)
2722 # circular references
2723 a.buddy = b
2724 b.buddy = a
2725
Benjamin Petersone711caf2008-06-11 16:44:04 +00002726#
2727#
2728#
2729
Benjamin Petersone711caf2008-06-11 16:44:04 +00002730class _Foo(Structure):
2731 _fields_ = [
2732 ('x', c_int),
2733 ('y', c_double)
2734 ]
2735
2736class _TestSharedCTypes(BaseTestCase):
2737
2738 ALLOWED_TYPES = ('processes',)
2739
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002740 def setUp(self):
2741 if not HAS_SHAREDCTYPES:
2742 self.skipTest("requires multiprocessing.sharedctypes")
2743
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002744 @classmethod
2745 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002746 x.value *= 2
2747 y.value *= 2
2748 foo.x *= 2
2749 foo.y *= 2
2750 string.value *= 2
2751 for i in range(len(arr)):
2752 arr[i] *= 2
2753
2754 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002755 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002756 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002757 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002758 arr = self.Array('d', list(range(10)), lock=lock)
2759 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002760 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002761
2762 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002763 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002764 p.start()
2765 p.join()
2766
2767 self.assertEqual(x.value, 14)
2768 self.assertAlmostEqual(y.value, 2.0/3.0)
2769 self.assertEqual(foo.x, 6)
2770 self.assertAlmostEqual(foo.y, 4.0)
2771 for i in range(10):
2772 self.assertAlmostEqual(arr[i], i*2)
2773 self.assertEqual(string.value, latin('hellohello'))
2774
2775 def test_synchronize(self):
2776 self.test_sharedctypes(lock=True)
2777
2778 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002779 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002780 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002781 foo.x = 0
2782 foo.y = 0
2783 self.assertEqual(bar.x, 2)
2784 self.assertAlmostEqual(bar.y, 5.0)
2785
2786#
2787#
2788#
2789
2790class _TestFinalize(BaseTestCase):
2791
2792 ALLOWED_TYPES = ('processes',)
2793
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002794 @classmethod
2795 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002796 class Foo(object):
2797 pass
2798
2799 a = Foo()
2800 util.Finalize(a, conn.send, args=('a',))
2801 del a # triggers callback for a
2802
2803 b = Foo()
2804 close_b = util.Finalize(b, conn.send, args=('b',))
2805 close_b() # triggers callback for b
2806 close_b() # does nothing because callback has already been called
2807 del b # does nothing because callback has already been called
2808
2809 c = Foo()
2810 util.Finalize(c, conn.send, args=('c',))
2811
2812 d10 = Foo()
2813 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2814
2815 d01 = Foo()
2816 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2817 d02 = Foo()
2818 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2819 d03 = Foo()
2820 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2821
2822 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2823
2824 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2825
Ezio Melotti13925002011-03-16 11:05:33 +02002826 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002827 # garbage collecting locals
2828 util._exit_function()
2829 conn.close()
2830 os._exit(0)
2831
2832 def test_finalize(self):
2833 conn, child_conn = self.Pipe()
2834
2835 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002836 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002837 p.start()
2838 p.join()
2839
2840 result = [obj for obj in iter(conn.recv, 'STOP')]
2841 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2842
2843#
2844# Test that from ... import * works for each module
2845#
2846
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002847class _TestImportStar(unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002848
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002849 def get_module_names(self):
2850 import glob
2851 folder = os.path.dirname(multiprocessing.__file__)
2852 pattern = os.path.join(folder, '*.py')
2853 files = glob.glob(pattern)
2854 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
2855 modules = ['multiprocessing.' + m for m in modules]
2856 modules.remove('multiprocessing.__init__')
2857 modules.append('multiprocessing')
2858 return modules
Benjamin Petersone711caf2008-06-11 16:44:04 +00002859
2860 def test_import(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002861 modules = self.get_module_names()
2862 if sys.platform == 'win32':
2863 modules.remove('multiprocessing.popen_fork')
2864 modules.remove('multiprocessing.popen_forkserver')
2865 modules.remove('multiprocessing.popen_spawn_posix')
2866 else:
2867 modules.remove('multiprocessing.popen_spawn_win32')
2868 if not HAS_REDUCTION:
2869 modules.remove('multiprocessing.popen_forkserver')
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002870
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002871 if c_int is None:
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002872 # This module requires _ctypes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002873 modules.remove('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002874
2875 for name in modules:
2876 __import__(name)
2877 mod = sys.modules[name]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002878 self.assertTrue(hasattr(mod, '__all__'), name)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002879
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002880 for attr in mod.__all__:
Benjamin Petersone711caf2008-06-11 16:44:04 +00002881 self.assertTrue(
2882 hasattr(mod, attr),
2883 '%r does not have attribute %r' % (mod, attr)
2884 )
2885
2886#
2887# Quick test that logging works -- does not test logging output
2888#
2889
2890class _TestLogging(BaseTestCase):
2891
2892 ALLOWED_TYPES = ('processes',)
2893
2894 def test_enable_logging(self):
2895 logger = multiprocessing.get_logger()
2896 logger.setLevel(util.SUBWARNING)
2897 self.assertTrue(logger is not None)
2898 logger.debug('this will not be printed')
2899 logger.info('nor will this')
2900 logger.setLevel(LOG_LEVEL)
2901
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002902 @classmethod
2903 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002904 logger = multiprocessing.get_logger()
2905 conn.send(logger.getEffectiveLevel())
2906
2907 def test_level(self):
2908 LEVEL1 = 32
2909 LEVEL2 = 37
2910
2911 logger = multiprocessing.get_logger()
2912 root_logger = logging.getLogger()
2913 root_level = root_logger.level
2914
2915 reader, writer = multiprocessing.Pipe(duplex=False)
2916
2917 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002918 p = self.Process(target=self._test_level, args=(writer,))
2919 p.daemon = True
2920 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002921 self.assertEqual(LEVEL1, reader.recv())
2922
2923 logger.setLevel(logging.NOTSET)
2924 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002925 p = self.Process(target=self._test_level, args=(writer,))
2926 p.daemon = True
2927 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002928 self.assertEqual(LEVEL2, reader.recv())
2929
2930 root_logger.setLevel(root_level)
2931 logger.setLevel(level=LOG_LEVEL)
2932
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002933
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002934# class _TestLoggingProcessName(BaseTestCase):
2935#
2936# def handle(self, record):
2937# assert record.processName == multiprocessing.current_process().name
2938# self.__handled = True
2939#
2940# def test_logging(self):
2941# handler = logging.Handler()
2942# handler.handle = self.handle
2943# self.__handled = False
2944# # Bypass getLogger() and side-effects
2945# logger = logging.getLoggerClass()(
2946# 'multiprocessing.test.TestLoggingProcessName')
2947# logger.addHandler(handler)
2948# logger.propagate = False
2949#
2950# logger.warn('foo')
2951# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002952
Benjamin Petersone711caf2008-06-11 16:44:04 +00002953#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002954# Check that Process.join() retries if os.waitpid() fails with EINTR
2955#
2956
2957class _TestPollEintr(BaseTestCase):
2958
2959 ALLOWED_TYPES = ('processes',)
2960
2961 @classmethod
2962 def _killer(cls, pid):
Richard Oudkerk6a53af82013-08-28 13:50:19 +01002963 time.sleep(0.1)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002964 os.kill(pid, signal.SIGUSR1)
2965
2966 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2967 def test_poll_eintr(self):
2968 got_signal = [False]
2969 def record(*args):
2970 got_signal[0] = True
2971 pid = os.getpid()
2972 oldhandler = signal.signal(signal.SIGUSR1, record)
2973 try:
2974 killer = self.Process(target=self._killer, args=(pid,))
2975 killer.start()
Richard Oudkerk6a53af82013-08-28 13:50:19 +01002976 try:
2977 p = self.Process(target=time.sleep, args=(2,))
2978 p.start()
2979 p.join()
2980 finally:
2981 killer.join()
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002982 self.assertTrue(got_signal[0])
2983 self.assertEqual(p.exitcode, 0)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002984 finally:
2985 signal.signal(signal.SIGUSR1, oldhandler)
2986
2987#
Jesse Noller6214edd2009-01-19 16:23:53 +00002988# Test to verify handle verification, see issue 3321
2989#
2990
2991class TestInvalidHandle(unittest.TestCase):
2992
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002993 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002994 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002995 conn = multiprocessing.connection.Connection(44977608)
Charles-François Natali6703bb42013-09-06 21:12:22 +02002996 # check that poll() doesn't crash
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002997 try:
Charles-François Natali6703bb42013-09-06 21:12:22 +02002998 conn.poll()
2999 except (ValueError, OSError):
3000 pass
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003001 finally:
3002 # Hack private attribute _handle to avoid printing an error
3003 # in conn.__del__
3004 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003005 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003006 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003007
Benjamin Petersone711caf2008-06-11 16:44:04 +00003008
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003009
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003010class OtherTest(unittest.TestCase):
3011 # TODO: add more tests for deliver/answer challenge.
3012 def test_deliver_challenge_auth_failure(self):
3013 class _FakeConnection(object):
3014 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003015 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003016 def send_bytes(self, data):
3017 pass
3018 self.assertRaises(multiprocessing.AuthenticationError,
3019 multiprocessing.connection.deliver_challenge,
3020 _FakeConnection(), b'abc')
3021
3022 def test_answer_challenge_auth_failure(self):
3023 class _FakeConnection(object):
3024 def __init__(self):
3025 self.count = 0
3026 def recv_bytes(self, size):
3027 self.count += 1
3028 if self.count == 1:
3029 return multiprocessing.connection.CHALLENGE
3030 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003031 return b'something bogus'
3032 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003033 def send_bytes(self, data):
3034 pass
3035 self.assertRaises(multiprocessing.AuthenticationError,
3036 multiprocessing.connection.answer_challenge,
3037 _FakeConnection(), b'abc')
3038
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003039#
3040# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3041#
3042
3043def initializer(ns):
3044 ns.test += 1
3045
3046class TestInitializers(unittest.TestCase):
3047 def setUp(self):
3048 self.mgr = multiprocessing.Manager()
3049 self.ns = self.mgr.Namespace()
3050 self.ns.test = 0
3051
3052 def tearDown(self):
3053 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003054 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003055
3056 def test_manager_initializer(self):
3057 m = multiprocessing.managers.SyncManager()
3058 self.assertRaises(TypeError, m.start, 1)
3059 m.start(initializer, (self.ns,))
3060 self.assertEqual(self.ns.test, 1)
3061 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003062 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003063
3064 def test_pool_initializer(self):
3065 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3066 p = multiprocessing.Pool(1, initializer, (self.ns,))
3067 p.close()
3068 p.join()
3069 self.assertEqual(self.ns.test, 1)
3070
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003071#
3072# Issue 5155, 5313, 5331: Test process in processes
3073# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3074#
3075
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003076def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003077 try:
3078 item = q.get(block=False)
3079 except pyqueue.Empty:
3080 pass
3081
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003082def _test_process(q):
3083 queue = multiprocessing.Queue()
3084 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3085 subProc.daemon = True
3086 subProc.start()
3087 subProc.join()
3088
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003089def _afunc(x):
3090 return x*x
3091
3092def pool_in_process():
3093 pool = multiprocessing.Pool(processes=4)
3094 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003095 pool.close()
3096 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003097
3098class _file_like(object):
3099 def __init__(self, delegate):
3100 self._delegate = delegate
3101 self._pid = None
3102
3103 @property
3104 def cache(self):
3105 pid = os.getpid()
3106 # There are no race conditions since fork keeps only the running thread
3107 if pid != self._pid:
3108 self._pid = pid
3109 self._cache = []
3110 return self._cache
3111
3112 def write(self, data):
3113 self.cache.append(data)
3114
3115 def flush(self):
3116 self._delegate.write(''.join(self.cache))
3117 self._cache = []
3118
3119class TestStdinBadfiledescriptor(unittest.TestCase):
3120
3121 def test_queue_in_process(self):
3122 queue = multiprocessing.Queue()
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003123 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003124 proc.start()
3125 proc.join()
3126
3127 def test_pool_in_process(self):
3128 p = multiprocessing.Process(target=pool_in_process)
3129 p.start()
3130 p.join()
3131
3132 def test_flushing(self):
3133 sio = io.StringIO()
3134 flike = _file_like(sio)
3135 flike.write('foo')
3136 proc = multiprocessing.Process(target=lambda: flike.flush())
3137 flike.flush()
3138 assert sio.getvalue() == 'foo'
3139
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003140
3141class TestWait(unittest.TestCase):
3142
3143 @classmethod
3144 def _child_test_wait(cls, w, slow):
3145 for i in range(10):
3146 if slow:
3147 time.sleep(random.random()*0.1)
3148 w.send((i, os.getpid()))
3149 w.close()
3150
3151 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003152 from multiprocessing.connection import wait
3153 readers = []
3154 procs = []
3155 messages = []
3156
3157 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003158 r, w = multiprocessing.Pipe(duplex=False)
3159 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003160 p.daemon = True
3161 p.start()
3162 w.close()
3163 readers.append(r)
3164 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003165 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003166
3167 while readers:
3168 for r in wait(readers):
3169 try:
3170 msg = r.recv()
3171 except EOFError:
3172 readers.remove(r)
3173 r.close()
3174 else:
3175 messages.append(msg)
3176
3177 messages.sort()
3178 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3179 self.assertEqual(messages, expected)
3180
3181 @classmethod
3182 def _child_test_wait_socket(cls, address, slow):
3183 s = socket.socket()
3184 s.connect(address)
3185 for i in range(10):
3186 if slow:
3187 time.sleep(random.random()*0.1)
3188 s.sendall(('%s\n' % i).encode('ascii'))
3189 s.close()
3190
3191 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003192 from multiprocessing.connection import wait
3193 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003194 l.bind((test.support.HOST, 0))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003195 l.listen(4)
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003196 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003197 readers = []
3198 procs = []
3199 dic = {}
3200
3201 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003202 p = multiprocessing.Process(target=self._child_test_wait_socket,
3203 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003204 p.daemon = True
3205 p.start()
3206 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003207 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003208
3209 for i in range(4):
3210 r, _ = l.accept()
3211 readers.append(r)
3212 dic[r] = []
3213 l.close()
3214
3215 while readers:
3216 for r in wait(readers):
3217 msg = r.recv(32)
3218 if not msg:
3219 readers.remove(r)
3220 r.close()
3221 else:
3222 dic[r].append(msg)
3223
3224 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3225 for v in dic.values():
3226 self.assertEqual(b''.join(v), expected)
3227
3228 def test_wait_slow(self):
3229 self.test_wait(True)
3230
3231 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003232 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003233
3234 def test_wait_timeout(self):
3235 from multiprocessing.connection import wait
3236
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003237 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003238 a, b = multiprocessing.Pipe()
3239
3240 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003241 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003242 delta = time.time() - start
3243
3244 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003245 self.assertLess(delta, expected * 2)
3246 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003247
3248 b.send(None)
3249
3250 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003251 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003252 delta = time.time() - start
3253
3254 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003255 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003256
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003257 @classmethod
3258 def signal_and_sleep(cls, sem, period):
3259 sem.release()
3260 time.sleep(period)
3261
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003262 def test_wait_integer(self):
3263 from multiprocessing.connection import wait
3264
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003265 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01003266 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003267 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003268 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003269 p = multiprocessing.Process(target=self.signal_and_sleep,
3270 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003271
3272 p.start()
3273 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003274 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003275
3276 start = time.time()
3277 res = wait([a, p.sentinel, b], expected + 20)
3278 delta = time.time() - start
3279
3280 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003281 self.assertLess(delta, expected + 2)
3282 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003283
3284 a.send(None)
3285
3286 start = time.time()
3287 res = wait([a, p.sentinel, b], 20)
3288 delta = time.time() - start
3289
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003290 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003291 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003292
3293 b.send(None)
3294
3295 start = time.time()
3296 res = wait([a, p.sentinel, b], 20)
3297 delta = time.time() - start
3298
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003299 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003300 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003301
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003302 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003303 p.join()
3304
Richard Oudkerk59d54042012-05-10 16:11:12 +01003305 def test_neg_timeout(self):
3306 from multiprocessing.connection import wait
3307 a, b = multiprocessing.Pipe()
3308 t = time.time()
3309 res = wait([a], timeout=-1)
3310 t = time.time() - t
3311 self.assertEqual(res, [])
3312 self.assertLess(t, 1)
3313 a.close()
3314 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003315
Antoine Pitrou709176f2012-04-01 17:19:09 +02003316#
3317# Issue 14151: Test invalid family on invalid environment
3318#
3319
3320class TestInvalidFamily(unittest.TestCase):
3321
3322 @unittest.skipIf(WIN32, "skipped on Windows")
3323 def test_invalid_family(self):
3324 with self.assertRaises(ValueError):
3325 multiprocessing.connection.Listener(r'\\.\test')
3326
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003327 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3328 def test_invalid_family_win32(self):
3329 with self.assertRaises(ValueError):
3330 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003331
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003332#
3333# Issue 12098: check sys.flags of child matches that for parent
3334#
3335
3336class TestFlags(unittest.TestCase):
3337 @classmethod
3338 def run_in_grandchild(cls, conn):
3339 conn.send(tuple(sys.flags))
3340
3341 @classmethod
3342 def run_in_child(cls):
3343 import json
3344 r, w = multiprocessing.Pipe(duplex=False)
3345 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3346 p.start()
3347 grandchild_flags = r.recv()
3348 p.join()
3349 r.close()
3350 w.close()
3351 flags = (tuple(sys.flags), grandchild_flags)
3352 print(json.dumps(flags))
3353
3354 def test_flags(self):
3355 import json, subprocess
3356 # start child process using unusual flags
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003357 prog = ('from test._test_multiprocessing import TestFlags; ' +
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003358 'TestFlags.run_in_child()')
3359 data = subprocess.check_output(
3360 [sys.executable, '-E', '-S', '-O', '-c', prog])
3361 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3362 self.assertEqual(child_flags, grandchild_flags)
3363
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003364#
3365# Test interaction with socket timeouts - see Issue #6056
3366#
3367
3368class TestTimeouts(unittest.TestCase):
3369 @classmethod
3370 def _test_timeout(cls, child, address):
3371 time.sleep(1)
3372 child.send(123)
3373 child.close()
3374 conn = multiprocessing.connection.Client(address)
3375 conn.send(456)
3376 conn.close()
3377
3378 def test_timeout(self):
3379 old_timeout = socket.getdefaulttimeout()
3380 try:
3381 socket.setdefaulttimeout(0.1)
3382 parent, child = multiprocessing.Pipe(duplex=True)
3383 l = multiprocessing.connection.Listener(family='AF_INET')
3384 p = multiprocessing.Process(target=self._test_timeout,
3385 args=(child, l.address))
3386 p.start()
3387 child.close()
3388 self.assertEqual(parent.recv(), 123)
3389 parent.close()
3390 conn = l.accept()
3391 self.assertEqual(conn.recv(), 456)
3392 conn.close()
3393 l.close()
3394 p.join(10)
3395 finally:
3396 socket.setdefaulttimeout(old_timeout)
3397
Richard Oudkerke88a2442012-08-14 11:41:32 +01003398#
3399# Test what happens with no "if __name__ == '__main__'"
3400#
3401
3402class TestNoForkBomb(unittest.TestCase):
3403 def test_noforkbomb(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003404 sm = multiprocessing.get_start_method()
Richard Oudkerke88a2442012-08-14 11:41:32 +01003405 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003406 if sm != 'fork':
3407 rc, out, err = test.script_helper.assert_python_failure(name, sm)
Richard Oudkerke88a2442012-08-14 11:41:32 +01003408 self.assertEqual('', out.decode('ascii'))
3409 self.assertIn('RuntimeError', err.decode('ascii'))
3410 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003411 rc, out, err = test.script_helper.assert_python_ok(name, sm)
Richard Oudkerke88a2442012-08-14 11:41:32 +01003412 self.assertEqual('123', out.decode('ascii').rstrip())
3413 self.assertEqual('', err.decode('ascii'))
3414
3415#
Richard Oudkerk409c3132013-04-17 20:58:00 +01003416# Issue #17555: ForkAwareThreadLock
3417#
3418
3419class TestForkAwareThreadLock(unittest.TestCase):
3420 # We recurisvely start processes. Issue #17555 meant that the
3421 # after fork registry would get duplicate entries for the same
3422 # lock. The size of the registry at generation n was ~2**n.
3423
3424 @classmethod
3425 def child(cls, n, conn):
3426 if n > 1:
3427 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3428 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003429 conn.close()
3430 p.join(timeout=5)
Richard Oudkerk409c3132013-04-17 20:58:00 +01003431 else:
3432 conn.send(len(util._afterfork_registry))
3433 conn.close()
3434
3435 def test_lock(self):
3436 r, w = multiprocessing.Pipe(False)
3437 l = util.ForkAwareThreadLock()
3438 old_size = len(util._afterfork_registry)
3439 p = multiprocessing.Process(target=self.child, args=(5, w))
3440 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003441 w.close()
Richard Oudkerk409c3132013-04-17 20:58:00 +01003442 new_size = r.recv()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003443 p.join(timeout=5)
Richard Oudkerk409c3132013-04-17 20:58:00 +01003444 self.assertLessEqual(new_size, old_size)
3445
3446#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003447# Check that non-forked child processes do not inherit unneeded fds/handles
3448#
3449
3450class TestCloseFds(unittest.TestCase):
3451
3452 def get_high_socket_fd(self):
3453 if WIN32:
3454 # The child process will not have any socket handles, so
3455 # calling socket.fromfd() should produce WSAENOTSOCK even
3456 # if there is a handle of the same number.
3457 return socket.socket().detach()
3458 else:
3459 # We want to produce a socket with an fd high enough that a
3460 # freshly created child process will not have any fds as high.
3461 fd = socket.socket().detach()
3462 to_close = []
3463 while fd < 50:
3464 to_close.append(fd)
3465 fd = os.dup(fd)
3466 for x in to_close:
3467 os.close(x)
3468 return fd
3469
3470 def close(self, fd):
3471 if WIN32:
3472 socket.socket(fileno=fd).close()
3473 else:
3474 os.close(fd)
3475
3476 @classmethod
3477 def _test_closefds(cls, conn, fd):
3478 try:
3479 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
3480 except Exception as e:
3481 conn.send(e)
3482 else:
3483 s.close()
3484 conn.send(None)
3485
3486 def test_closefd(self):
3487 if not HAS_REDUCTION:
3488 raise unittest.SkipTest('requires fd pickling')
3489
3490 reader, writer = multiprocessing.Pipe()
3491 fd = self.get_high_socket_fd()
3492 try:
3493 p = multiprocessing.Process(target=self._test_closefds,
3494 args=(writer, fd))
3495 p.start()
3496 writer.close()
3497 e = reader.recv()
3498 p.join(timeout=5)
3499 finally:
3500 self.close(fd)
3501 writer.close()
3502 reader.close()
3503
3504 if multiprocessing.get_start_method() == 'fork':
3505 self.assertIs(e, None)
3506 else:
3507 WSAENOTSOCK = 10038
3508 self.assertIsInstance(e, OSError)
3509 self.assertTrue(e.errno == errno.EBADF or
3510 e.winerror == WSAENOTSOCK, e)
3511
3512#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003513# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3514#
3515
3516class TestIgnoreEINTR(unittest.TestCase):
3517
3518 @classmethod
3519 def _test_ignore(cls, conn):
3520 def handler(signum, frame):
3521 pass
3522 signal.signal(signal.SIGUSR1, handler)
3523 conn.send('ready')
3524 x = conn.recv()
3525 conn.send(x)
3526 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
3527
3528 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3529 def test_ignore(self):
3530 conn, child_conn = multiprocessing.Pipe()
3531 try:
3532 p = multiprocessing.Process(target=self._test_ignore,
3533 args=(child_conn,))
3534 p.daemon = True
3535 p.start()
3536 child_conn.close()
3537 self.assertEqual(conn.recv(), 'ready')
3538 time.sleep(0.1)
3539 os.kill(p.pid, signal.SIGUSR1)
3540 time.sleep(0.1)
3541 conn.send(1234)
3542 self.assertEqual(conn.recv(), 1234)
3543 time.sleep(0.1)
3544 os.kill(p.pid, signal.SIGUSR1)
3545 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3546 time.sleep(0.1)
3547 p.join()
3548 finally:
3549 conn.close()
3550
3551 @classmethod
3552 def _test_ignore_listener(cls, conn):
3553 def handler(signum, frame):
3554 pass
3555 signal.signal(signal.SIGUSR1, handler)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003556 with multiprocessing.connection.Listener() as l:
3557 conn.send(l.address)
3558 a = l.accept()
3559 a.send('welcome')
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003560
3561 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3562 def test_ignore_listener(self):
3563 conn, child_conn = multiprocessing.Pipe()
3564 try:
3565 p = multiprocessing.Process(target=self._test_ignore_listener,
3566 args=(child_conn,))
3567 p.daemon = True
3568 p.start()
3569 child_conn.close()
3570 address = conn.recv()
3571 time.sleep(0.1)
3572 os.kill(p.pid, signal.SIGUSR1)
3573 time.sleep(0.1)
3574 client = multiprocessing.connection.Client(address)
3575 self.assertEqual(client.recv(), 'welcome')
3576 p.join()
3577 finally:
3578 conn.close()
3579
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003580class TestStartMethod(unittest.TestCase):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003581 @classmethod
3582 def _check_context(cls, conn):
3583 conn.send(multiprocessing.get_start_method())
3584
3585 def check_context(self, ctx):
3586 r, w = ctx.Pipe(duplex=False)
3587 p = ctx.Process(target=self._check_context, args=(w,))
3588 p.start()
3589 w.close()
3590 child_method = r.recv()
3591 r.close()
3592 p.join()
3593 self.assertEqual(child_method, ctx.get_start_method())
3594
3595 def test_context(self):
3596 for method in ('fork', 'spawn', 'forkserver'):
3597 try:
3598 ctx = multiprocessing.get_context(method)
3599 except ValueError:
3600 continue
3601 self.assertEqual(ctx.get_start_method(), method)
3602 self.assertIs(ctx.get_context(), ctx)
3603 self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
3604 self.assertRaises(ValueError, ctx.set_start_method, None)
3605 self.check_context(ctx)
3606
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003607 def test_set_get(self):
3608 multiprocessing.set_forkserver_preload(PRELOAD)
3609 count = 0
3610 old_method = multiprocessing.get_start_method()
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003611 try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003612 for method in ('fork', 'spawn', 'forkserver'):
3613 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003614 multiprocessing.set_start_method(method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003615 except ValueError:
3616 continue
3617 self.assertEqual(multiprocessing.get_start_method(), method)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003618 ctx = multiprocessing.get_context()
3619 self.assertEqual(ctx.get_start_method(), method)
3620 self.assertTrue(type(ctx).__name__.lower().startswith(method))
3621 self.assertTrue(
3622 ctx.Process.__name__.lower().startswith(method))
3623 self.check_context(multiprocessing)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003624 count += 1
3625 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003626 multiprocessing.set_start_method(old_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003627 self.assertGreaterEqual(count, 1)
3628
3629 def test_get_all(self):
3630 methods = multiprocessing.get_all_start_methods()
3631 if sys.platform == 'win32':
3632 self.assertEqual(methods, ['spawn'])
3633 else:
3634 self.assertTrue(methods == ['fork', 'spawn'] or
3635 methods == ['fork', 'spawn', 'forkserver'])
3636
3637#
3638# Check that killing process does not leak named semaphores
3639#
3640
3641@unittest.skipIf(sys.platform == "win32",
3642 "test semantics don't make sense on Windows")
3643class TestSemaphoreTracker(unittest.TestCase):
3644 def test_semaphore_tracker(self):
3645 import subprocess
3646 cmd = '''if 1:
3647 import multiprocessing as mp, time, os
3648 mp.set_start_method("spawn")
3649 lock1 = mp.Lock()
3650 lock2 = mp.Lock()
3651 os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
3652 os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
3653 time.sleep(10)
3654 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003655 r, w = os.pipe()
3656 p = subprocess.Popen([sys.executable,
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003657 '-c', cmd % (w, w)],
Richard Oudkerk67e51982013-08-22 23:37:23 +01003658 pass_fds=[w],
3659 stderr=subprocess.PIPE)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003660 os.close(w)
3661 with open(r, 'rb', closefd=True) as f:
3662 name1 = f.readline().rstrip().decode('ascii')
3663 name2 = f.readline().rstrip().decode('ascii')
3664 _multiprocessing.sem_unlink(name1)
3665 p.terminate()
3666 p.wait()
Richard Oudkerk42a526c2014-02-21 22:29:58 +00003667 time.sleep(2.0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003668 with self.assertRaises(OSError) as ctx:
3669 _multiprocessing.sem_unlink(name2)
3670 # docs say it should be ENOENT, but OSX seems to give EINVAL
3671 self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
Richard Oudkerk67e51982013-08-22 23:37:23 +01003672 err = p.stderr.read().decode('utf-8')
3673 p.stderr.close()
3674 expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
3675 self.assertRegex(err, expected)
3676 self.assertRegex(err, 'semaphore_tracker: %r: \[Errno' % name1)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003677
3678#
3679# Mixins
3680#
3681
3682class ProcessesMixin(object):
3683 TYPE = 'processes'
3684 Process = multiprocessing.Process
3685 connection = multiprocessing.connection
3686 current_process = staticmethod(multiprocessing.current_process)
3687 active_children = staticmethod(multiprocessing.active_children)
3688 Pool = staticmethod(multiprocessing.Pool)
3689 Pipe = staticmethod(multiprocessing.Pipe)
3690 Queue = staticmethod(multiprocessing.Queue)
3691 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
3692 Lock = staticmethod(multiprocessing.Lock)
3693 RLock = staticmethod(multiprocessing.RLock)
3694 Semaphore = staticmethod(multiprocessing.Semaphore)
3695 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
3696 Condition = staticmethod(multiprocessing.Condition)
3697 Event = staticmethod(multiprocessing.Event)
3698 Barrier = staticmethod(multiprocessing.Barrier)
3699 Value = staticmethod(multiprocessing.Value)
3700 Array = staticmethod(multiprocessing.Array)
3701 RawValue = staticmethod(multiprocessing.RawValue)
3702 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003703
Benjamin Petersone711caf2008-06-11 16:44:04 +00003704
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003705class ManagerMixin(object):
3706 TYPE = 'manager'
3707 Process = multiprocessing.Process
3708 Queue = property(operator.attrgetter('manager.Queue'))
3709 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3710 Lock = property(operator.attrgetter('manager.Lock'))
3711 RLock = property(operator.attrgetter('manager.RLock'))
3712 Semaphore = property(operator.attrgetter('manager.Semaphore'))
3713 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3714 Condition = property(operator.attrgetter('manager.Condition'))
3715 Event = property(operator.attrgetter('manager.Event'))
3716 Barrier = property(operator.attrgetter('manager.Barrier'))
3717 Value = property(operator.attrgetter('manager.Value'))
3718 Array = property(operator.attrgetter('manager.Array'))
3719 list = property(operator.attrgetter('manager.list'))
3720 dict = property(operator.attrgetter('manager.dict'))
3721 Namespace = property(operator.attrgetter('manager.Namespace'))
3722
3723 @classmethod
3724 def Pool(cls, *args, **kwds):
3725 return cls.manager.Pool(*args, **kwds)
3726
3727 @classmethod
3728 def setUpClass(cls):
3729 cls.manager = multiprocessing.Manager()
3730
3731 @classmethod
3732 def tearDownClass(cls):
3733 # only the manager process should be returned by active_children()
3734 # but this can take a bit on slow machines, so wait a few seconds
3735 # if there are other children too (see #17395)
3736 t = 0.01
3737 while len(multiprocessing.active_children()) > 1 and t < 5:
3738 time.sleep(t)
3739 t *= 2
3740 gc.collect() # do garbage collection
3741 if cls.manager._number_of_objects() != 0:
3742 # This is not really an error since some tests do not
3743 # ensure that all processes which hold a reference to a
3744 # managed object have been joined.
3745 print('Shared objects which still exist at manager shutdown:')
3746 print(cls.manager._debug_info())
3747 cls.manager.shutdown()
3748 cls.manager.join()
3749 cls.manager = None
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003750
3751
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003752class ThreadsMixin(object):
3753 TYPE = 'threads'
3754 Process = multiprocessing.dummy.Process
3755 connection = multiprocessing.dummy.connection
3756 current_process = staticmethod(multiprocessing.dummy.current_process)
3757 active_children = staticmethod(multiprocessing.dummy.active_children)
3758 Pool = staticmethod(multiprocessing.Pool)
3759 Pipe = staticmethod(multiprocessing.dummy.Pipe)
3760 Queue = staticmethod(multiprocessing.dummy.Queue)
3761 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3762 Lock = staticmethod(multiprocessing.dummy.Lock)
3763 RLock = staticmethod(multiprocessing.dummy.RLock)
3764 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3765 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3766 Condition = staticmethod(multiprocessing.dummy.Condition)
3767 Event = staticmethod(multiprocessing.dummy.Event)
3768 Barrier = staticmethod(multiprocessing.dummy.Barrier)
3769 Value = staticmethod(multiprocessing.dummy.Value)
3770 Array = staticmethod(multiprocessing.dummy.Array)
3771
3772#
3773# Functions used to create test cases from the base ones in this module
3774#
3775
3776def install_tests_in_module_dict(remote_globs, start_method):
3777 __module__ = remote_globs['__name__']
3778 local_globs = globals()
3779 ALL_TYPES = {'processes', 'threads', 'manager'}
3780
3781 for name, base in local_globs.items():
3782 if not isinstance(base, type):
3783 continue
3784 if issubclass(base, BaseTestCase):
3785 if base is BaseTestCase:
3786 continue
3787 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
3788 for type_ in base.ALLOWED_TYPES:
3789 newname = 'With' + type_.capitalize() + name[1:]
3790 Mixin = local_globs[type_.capitalize() + 'Mixin']
3791 class Temp(base, Mixin, unittest.TestCase):
3792 pass
3793 Temp.__name__ = Temp.__qualname__ = newname
3794 Temp.__module__ = __module__
3795 remote_globs[newname] = Temp
3796 elif issubclass(base, unittest.TestCase):
3797 class Temp(base, object):
3798 pass
3799 Temp.__name__ = Temp.__qualname__ = name
3800 Temp.__module__ = __module__
3801 remote_globs[name] = Temp
3802
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01003803 dangling = [None, None]
3804 old_start_method = [None]
3805
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003806 def setUpModule():
3807 multiprocessing.set_forkserver_preload(PRELOAD)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01003808 multiprocessing.process._cleanup()
3809 dangling[0] = multiprocessing.process._dangling.copy()
3810 dangling[1] = threading._dangling.copy()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003811 old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003812 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003813 multiprocessing.set_start_method(start_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003814 except ValueError:
3815 raise unittest.SkipTest(start_method +
3816 ' start method not supported')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003817
3818 if sys.platform.startswith("linux"):
3819 try:
3820 lock = multiprocessing.RLock()
3821 except OSError:
3822 raise unittest.SkipTest("OSError raises on RLock creation, "
3823 "see issue 3111!")
3824 check_enough_semaphores()
3825 util.get_temp_dir() # creates temp directory
3826 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3827
3828 def tearDownModule():
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003829 multiprocessing.set_start_method(old_start_method[0], force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003830 # pause a bit so we don't get warning about dangling threads/processes
3831 time.sleep(0.5)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01003832 multiprocessing.process._cleanup()
3833 gc.collect()
3834 tmp = set(multiprocessing.process._dangling) - set(dangling[0])
3835 if tmp:
3836 print('Dangling processes:', tmp, file=sys.stderr)
3837 del tmp
3838 tmp = set(threading._dangling) - set(dangling[1])
3839 if tmp:
3840 print('Dangling threads:', tmp, file=sys.stderr)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003841
3842 remote_globs['setUpModule'] = setUpModule
3843 remote_globs['tearDownModule'] = tearDownModule