blob: 42b2d91c80480277f459dcf151e95f5e110f139b [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
178class _TestProcess(BaseTestCase):
179
180 ALLOWED_TYPES = ('processes', 'threads')
181
182 def test_current(self):
183 if self.TYPE == 'threads':
184 return
185
186 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000191 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEqual(current.ident, os.getpid())
194 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 q.put(args)
200 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000223 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225
226 p.start()
227
Ezio Melotti2623a372010-11-21 13:34:58 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
Ezio Melotti2623a372010-11-21 13:34:58 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.join()
240
Ezio Melotti2623a372010-11-21 13:34:58 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
251 return
252
253 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000254 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000258 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
271
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288
Jesus Cea6f6016b2011-09-09 20:26:57 +0200289 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000290 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000291 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000292
293 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000294 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000295
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000296 @classmethod
297 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298 from multiprocessing import forking
299 wconn.send(id)
300 if len(id) < 2:
301 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 p = cls.Process(
303 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 )
305 p.start()
306 p.join()
307
308 def test_recursion(self):
309 rconn, wconn = self.Pipe(duplex=False)
310 self._test_recursion(wconn, [])
311
312 time.sleep(DELTA)
313 result = []
314 while rconn.poll():
315 result.append(rconn.recv())
316
317 expected = [
318 [],
319 [0],
320 [0, 0],
321 [0, 1],
322 [1],
323 [1, 0],
324 [1, 1]
325 ]
326 self.assertEqual(result, expected)
327
328#
329#
330#
331
332class _UpperCaser(multiprocessing.Process):
333
334 def __init__(self):
335 multiprocessing.Process.__init__(self)
336 self.child_conn, self.parent_conn = multiprocessing.Pipe()
337
338 def run(self):
339 self.parent_conn.close()
340 for s in iter(self.child_conn.recv, None):
341 self.child_conn.send(s.upper())
342 self.child_conn.close()
343
344 def submit(self, s):
345 assert type(s) is str
346 self.parent_conn.send(s)
347 return self.parent_conn.recv()
348
349 def stop(self):
350 self.parent_conn.send(None)
351 self.parent_conn.close()
352 self.child_conn.close()
353
354class _TestSubclassingProcess(BaseTestCase):
355
356 ALLOWED_TYPES = ('processes',)
357
358 def test_subclassing(self):
359 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200360 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000361 uppercaser.start()
362 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
363 self.assertEqual(uppercaser.submit('world'), 'WORLD')
364 uppercaser.stop()
365 uppercaser.join()
366
367#
368#
369#
370
371def queue_empty(q):
372 if hasattr(q, 'empty'):
373 return q.empty()
374 else:
375 return q.qsize() == 0
376
377def queue_full(q, maxsize):
378 if hasattr(q, 'full'):
379 return q.full()
380 else:
381 return q.qsize() == maxsize
382
383
384class _TestQueue(BaseTestCase):
385
386
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000387 @classmethod
388 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000389 child_can_start.wait()
390 for i in range(6):
391 queue.get()
392 parent_can_continue.set()
393
394 def test_put(self):
395 MAXSIZE = 6
396 queue = self.Queue(maxsize=MAXSIZE)
397 child_can_start = self.Event()
398 parent_can_continue = self.Event()
399
400 proc = self.Process(
401 target=self._test_put,
402 args=(queue, child_can_start, parent_can_continue)
403 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000404 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000405 proc.start()
406
407 self.assertEqual(queue_empty(queue), True)
408 self.assertEqual(queue_full(queue, MAXSIZE), False)
409
410 queue.put(1)
411 queue.put(2, True)
412 queue.put(3, True, None)
413 queue.put(4, False)
414 queue.put(5, False, None)
415 queue.put_nowait(6)
416
417 # the values may be in buffer but not yet in pipe so sleep a bit
418 time.sleep(DELTA)
419
420 self.assertEqual(queue_empty(queue), False)
421 self.assertEqual(queue_full(queue, MAXSIZE), True)
422
423 put = TimingWrapper(queue.put)
424 put_nowait = TimingWrapper(queue.put_nowait)
425
426 self.assertRaises(Queue.Full, put, 7, False)
427 self.assertTimingAlmostEqual(put.elapsed, 0)
428
429 self.assertRaises(Queue.Full, put, 7, False, None)
430 self.assertTimingAlmostEqual(put.elapsed, 0)
431
432 self.assertRaises(Queue.Full, put_nowait, 7)
433 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
434
435 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
436 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
437
438 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
439 self.assertTimingAlmostEqual(put.elapsed, 0)
440
441 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
442 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
443
444 child_can_start.set()
445 parent_can_continue.wait()
446
447 self.assertEqual(queue_empty(queue), True)
448 self.assertEqual(queue_full(queue, MAXSIZE), False)
449
450 proc.join()
451
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000452 @classmethod
453 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000454 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000455 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000456 queue.put(2)
457 queue.put(3)
458 queue.put(4)
459 queue.put(5)
460 parent_can_continue.set()
461
462 def test_get(self):
463 queue = self.Queue()
464 child_can_start = self.Event()
465 parent_can_continue = self.Event()
466
467 proc = self.Process(
468 target=self._test_get,
469 args=(queue, child_can_start, parent_can_continue)
470 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000471 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000472 proc.start()
473
474 self.assertEqual(queue_empty(queue), True)
475
476 child_can_start.set()
477 parent_can_continue.wait()
478
479 time.sleep(DELTA)
480 self.assertEqual(queue_empty(queue), False)
481
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000482 # Hangs unexpectedly, remove for now
483 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000484 self.assertEqual(queue.get(True, None), 2)
485 self.assertEqual(queue.get(True), 3)
486 self.assertEqual(queue.get(timeout=1), 4)
487 self.assertEqual(queue.get_nowait(), 5)
488
489 self.assertEqual(queue_empty(queue), True)
490
491 get = TimingWrapper(queue.get)
492 get_nowait = TimingWrapper(queue.get_nowait)
493
494 self.assertRaises(Queue.Empty, get, False)
495 self.assertTimingAlmostEqual(get.elapsed, 0)
496
497 self.assertRaises(Queue.Empty, get, False, None)
498 self.assertTimingAlmostEqual(get.elapsed, 0)
499
500 self.assertRaises(Queue.Empty, get_nowait)
501 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
502
503 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
504 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
505
506 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
507 self.assertTimingAlmostEqual(get.elapsed, 0)
508
509 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
510 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
511
512 proc.join()
513
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000514 @classmethod
515 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000516 for i in range(10, 20):
517 queue.put(i)
518 # note that at this point the items may only be buffered, so the
519 # process cannot shutdown until the feeder thread has finished
520 # pushing items onto the pipe.
521
522 def test_fork(self):
523 # Old versions of Queue would fail to create a new feeder
524 # thread for a forked process if the original process had its
525 # own feeder thread. This test checks that this no longer
526 # happens.
527
528 queue = self.Queue()
529
530 # put items on queue so that main process starts a feeder thread
531 for i in range(10):
532 queue.put(i)
533
534 # wait to make sure thread starts before we fork a new process
535 time.sleep(DELTA)
536
537 # fork process
538 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200539 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000540 p.start()
541
542 # check that all expected items are in the queue
543 for i in range(20):
544 self.assertEqual(queue.get(), i)
545 self.assertRaises(Queue.Empty, queue.get, False)
546
547 p.join()
548
549 def test_qsize(self):
550 q = self.Queue()
551 try:
552 self.assertEqual(q.qsize(), 0)
553 except NotImplementedError:
554 return
555 q.put(1)
556 self.assertEqual(q.qsize(), 1)
557 q.put(5)
558 self.assertEqual(q.qsize(), 2)
559 q.get()
560 self.assertEqual(q.qsize(), 1)
561 q.get()
562 self.assertEqual(q.qsize(), 0)
563
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000564 @classmethod
565 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000566 for obj in iter(q.get, None):
567 time.sleep(DELTA)
568 q.task_done()
569
570 def test_task_done(self):
571 queue = self.JoinableQueue()
572
573 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000574 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000575
576 workers = [self.Process(target=self._test_task_done, args=(queue,))
577 for i in xrange(4)]
578
579 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200580 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000581 p.start()
582
583 for i in xrange(10):
584 queue.put(i)
585
586 queue.join()
587
588 for p in workers:
589 queue.put(None)
590
591 for p in workers:
592 p.join()
593
594#
595#
596#
597
598class _TestLock(BaseTestCase):
599
600 def test_lock(self):
601 lock = self.Lock()
602 self.assertEqual(lock.acquire(), True)
603 self.assertEqual(lock.acquire(False), False)
604 self.assertEqual(lock.release(), None)
605 self.assertRaises((ValueError, threading.ThreadError), lock.release)
606
607 def test_rlock(self):
608 lock = self.RLock()
609 self.assertEqual(lock.acquire(), True)
610 self.assertEqual(lock.acquire(), True)
611 self.assertEqual(lock.acquire(), True)
612 self.assertEqual(lock.release(), None)
613 self.assertEqual(lock.release(), None)
614 self.assertEqual(lock.release(), None)
615 self.assertRaises((AssertionError, RuntimeError), lock.release)
616
Jesse Noller82eb5902009-03-30 23:29:31 +0000617 def test_lock_context(self):
618 with self.Lock():
619 pass
620
Benjamin Petersondfd79492008-06-13 19:13:39 +0000621
622class _TestSemaphore(BaseTestCase):
623
624 def _test_semaphore(self, sem):
625 self.assertReturnsIfImplemented(2, get_value, sem)
626 self.assertEqual(sem.acquire(), True)
627 self.assertReturnsIfImplemented(1, get_value, sem)
628 self.assertEqual(sem.acquire(), True)
629 self.assertReturnsIfImplemented(0, get_value, sem)
630 self.assertEqual(sem.acquire(False), False)
631 self.assertReturnsIfImplemented(0, get_value, sem)
632 self.assertEqual(sem.release(), None)
633 self.assertReturnsIfImplemented(1, get_value, sem)
634 self.assertEqual(sem.release(), None)
635 self.assertReturnsIfImplemented(2, get_value, sem)
636
637 def test_semaphore(self):
638 sem = self.Semaphore(2)
639 self._test_semaphore(sem)
640 self.assertEqual(sem.release(), None)
641 self.assertReturnsIfImplemented(3, get_value, sem)
642 self.assertEqual(sem.release(), None)
643 self.assertReturnsIfImplemented(4, get_value, sem)
644
645 def test_bounded_semaphore(self):
646 sem = self.BoundedSemaphore(2)
647 self._test_semaphore(sem)
648 # Currently fails on OS/X
649 #if HAVE_GETVALUE:
650 # self.assertRaises(ValueError, sem.release)
651 # self.assertReturnsIfImplemented(2, get_value, sem)
652
653 def test_timeout(self):
654 if self.TYPE != 'processes':
655 return
656
657 sem = self.Semaphore(0)
658 acquire = TimingWrapper(sem.acquire)
659
660 self.assertEqual(acquire(False), False)
661 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
662
663 self.assertEqual(acquire(False, None), False)
664 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
665
666 self.assertEqual(acquire(False, TIMEOUT1), False)
667 self.assertTimingAlmostEqual(acquire.elapsed, 0)
668
669 self.assertEqual(acquire(True, TIMEOUT2), False)
670 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
671
672 self.assertEqual(acquire(timeout=TIMEOUT3), False)
673 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
674
675
676class _TestCondition(BaseTestCase):
677
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000678 @classmethod
679 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000680 cond.acquire()
681 sleeping.release()
682 cond.wait(timeout)
683 woken.release()
684 cond.release()
685
686 def check_invariant(self, cond):
687 # this is only supposed to succeed when there are no sleepers
688 if self.TYPE == 'processes':
689 try:
690 sleepers = (cond._sleeping_count.get_value() -
691 cond._woken_count.get_value())
692 self.assertEqual(sleepers, 0)
693 self.assertEqual(cond._wait_semaphore.get_value(), 0)
694 except NotImplementedError:
695 pass
696
697 def test_notify(self):
698 cond = self.Condition()
699 sleeping = self.Semaphore(0)
700 woken = self.Semaphore(0)
701
702 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000703 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704 p.start()
705
706 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000707 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000708 p.start()
709
710 # wait for both children to start sleeping
711 sleeping.acquire()
712 sleeping.acquire()
713
714 # check no process/thread has woken up
715 time.sleep(DELTA)
716 self.assertReturnsIfImplemented(0, get_value, woken)
717
718 # wake up one process/thread
719 cond.acquire()
720 cond.notify()
721 cond.release()
722
723 # check one process/thread has woken up
724 time.sleep(DELTA)
725 self.assertReturnsIfImplemented(1, get_value, woken)
726
727 # wake up another
728 cond.acquire()
729 cond.notify()
730 cond.release()
731
732 # check other has woken up
733 time.sleep(DELTA)
734 self.assertReturnsIfImplemented(2, get_value, woken)
735
736 # check state is not mucked up
737 self.check_invariant(cond)
738 p.join()
739
740 def test_notify_all(self):
741 cond = self.Condition()
742 sleeping = self.Semaphore(0)
743 woken = self.Semaphore(0)
744
745 # start some threads/processes which will timeout
746 for i in range(3):
747 p = self.Process(target=self.f,
748 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000749 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000750 p.start()
751
752 t = threading.Thread(target=self.f,
753 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000754 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000755 t.start()
756
757 # wait for them all to sleep
758 for i in xrange(6):
759 sleeping.acquire()
760
761 # check they have all timed out
762 for i in xrange(6):
763 woken.acquire()
764 self.assertReturnsIfImplemented(0, get_value, woken)
765
766 # check state is not mucked up
767 self.check_invariant(cond)
768
769 # start some more threads/processes
770 for i in range(3):
771 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000772 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000773 p.start()
774
775 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000776 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000777 t.start()
778
779 # wait for them to all sleep
780 for i in xrange(6):
781 sleeping.acquire()
782
783 # check no process/thread has woken up
784 time.sleep(DELTA)
785 self.assertReturnsIfImplemented(0, get_value, woken)
786
787 # wake them all up
788 cond.acquire()
789 cond.notify_all()
790 cond.release()
791
792 # check they have all woken
793 time.sleep(DELTA)
794 self.assertReturnsIfImplemented(6, get_value, woken)
795
796 # check state is not mucked up
797 self.check_invariant(cond)
798
799 def test_timeout(self):
800 cond = self.Condition()
801 wait = TimingWrapper(cond.wait)
802 cond.acquire()
803 res = wait(TIMEOUT1)
804 cond.release()
805 self.assertEqual(res, None)
806 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
807
808
809class _TestEvent(BaseTestCase):
810
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000811 @classmethod
812 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000813 time.sleep(TIMEOUT2)
814 event.set()
815
816 def test_event(self):
817 event = self.Event()
818 wait = TimingWrapper(event.wait)
819
Ezio Melottic2077b02011-03-16 12:34:31 +0200820 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000821 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000822 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000823
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000824 # Removed, threading.Event.wait() will return the value of the __flag
825 # instead of None. API Shear with the semaphore backed mp.Event
826 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000827 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000828 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000829 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
830
831 event.set()
832
833 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000834 self.assertEqual(event.is_set(), True)
835 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000836 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000837 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000838 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
839 # self.assertEqual(event.is_set(), True)
840
841 event.clear()
842
843 #self.assertEqual(event.is_set(), False)
844
Jesus Cea6f6016b2011-09-09 20:26:57 +0200845 p = self.Process(target=self._test_event, args=(event,))
846 p.daemon = True
847 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000848 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000849
850#
851#
852#
853
854class _TestValue(BaseTestCase):
855
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000856 ALLOWED_TYPES = ('processes',)
857
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 codes_values = [
859 ('i', 4343, 24234),
860 ('d', 3.625, -4.25),
861 ('h', -232, 234),
862 ('c', latin('x'), latin('y'))
863 ]
864
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000865 def setUp(self):
866 if not HAS_SHAREDCTYPES:
867 self.skipTest("requires multiprocessing.sharedctypes")
868
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000869 @classmethod
870 def _test(cls, values):
871 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000872 sv.value = cv[2]
873
874
875 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000876 if raw:
877 values = [self.RawValue(code, value)
878 for code, value, _ in self.codes_values]
879 else:
880 values = [self.Value(code, value)
881 for code, value, _ in self.codes_values]
882
883 for sv, cv in zip(values, self.codes_values):
884 self.assertEqual(sv.value, cv[1])
885
886 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200887 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000888 proc.start()
889 proc.join()
890
891 for sv, cv in zip(values, self.codes_values):
892 self.assertEqual(sv.value, cv[2])
893
894 def test_rawvalue(self):
895 self.test_value(raw=True)
896
897 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000898 val1 = self.Value('i', 5)
899 lock1 = val1.get_lock()
900 obj1 = val1.get_obj()
901
902 val2 = self.Value('i', 5, lock=None)
903 lock2 = val2.get_lock()
904 obj2 = val2.get_obj()
905
906 lock = self.Lock()
907 val3 = self.Value('i', 5, lock=lock)
908 lock3 = val3.get_lock()
909 obj3 = val3.get_obj()
910 self.assertEqual(lock, lock3)
911
Jesse Noller6ab22152009-01-18 02:45:38 +0000912 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000913 self.assertFalse(hasattr(arr4, 'get_lock'))
914 self.assertFalse(hasattr(arr4, 'get_obj'))
915
Jesse Noller6ab22152009-01-18 02:45:38 +0000916 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
917
918 arr5 = self.RawValue('i', 5)
919 self.assertFalse(hasattr(arr5, 'get_lock'))
920 self.assertFalse(hasattr(arr5, 'get_obj'))
921
Benjamin Petersondfd79492008-06-13 19:13:39 +0000922
923class _TestArray(BaseTestCase):
924
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000925 ALLOWED_TYPES = ('processes',)
926
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000927 @classmethod
928 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 for i in range(1, len(seq)):
930 seq[i] += seq[i-1]
931
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000932 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000933 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000934 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
935 if raw:
936 arr = self.RawArray('i', seq)
937 else:
938 arr = self.Array('i', seq)
939
940 self.assertEqual(len(arr), len(seq))
941 self.assertEqual(arr[3], seq[3])
942 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
943
944 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
945
946 self.assertEqual(list(arr[:]), seq)
947
948 self.f(seq)
949
950 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200951 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000952 p.start()
953 p.join()
954
955 self.assertEqual(list(arr[:]), seq)
956
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000957 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000958 def test_array_from_size(self):
959 size = 10
960 # Test for zeroing (see issue #11675).
961 # The repetition below strengthens the test by increasing the chances
962 # of previously allocated non-zero memory being used for the new array
963 # on the 2nd and 3rd loops.
964 for _ in range(3):
965 arr = self.Array('i', size)
966 self.assertEqual(len(arr), size)
967 self.assertEqual(list(arr), [0] * size)
968 arr[:] = range(10)
969 self.assertEqual(list(arr), range(10))
970 del arr
971
972 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000973 def test_rawarray(self):
974 self.test_array(raw=True)
975
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000976 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000977 def test_array_accepts_long(self):
978 arr = self.Array('i', 10L)
979 self.assertEqual(len(arr), 10)
980 raw_arr = self.RawArray('i', 10L)
981 self.assertEqual(len(raw_arr), 10)
982
983 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000984 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000985 arr1 = self.Array('i', range(10))
986 lock1 = arr1.get_lock()
987 obj1 = arr1.get_obj()
988
989 arr2 = self.Array('i', range(10), lock=None)
990 lock2 = arr2.get_lock()
991 obj2 = arr2.get_obj()
992
993 lock = self.Lock()
994 arr3 = self.Array('i', range(10), lock=lock)
995 lock3 = arr3.get_lock()
996 obj3 = arr3.get_obj()
997 self.assertEqual(lock, lock3)
998
Jesse Noller6ab22152009-01-18 02:45:38 +0000999 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001000 self.assertFalse(hasattr(arr4, 'get_lock'))
1001 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001002 self.assertRaises(AttributeError,
1003 self.Array, 'i', range(10), lock='notalock')
1004
1005 arr5 = self.RawArray('i', range(10))
1006 self.assertFalse(hasattr(arr5, 'get_lock'))
1007 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001008
1009#
1010#
1011#
1012
1013class _TestContainers(BaseTestCase):
1014
1015 ALLOWED_TYPES = ('manager',)
1016
1017 def test_list(self):
1018 a = self.list(range(10))
1019 self.assertEqual(a[:], range(10))
1020
1021 b = self.list()
1022 self.assertEqual(b[:], [])
1023
1024 b.extend(range(5))
1025 self.assertEqual(b[:], range(5))
1026
1027 self.assertEqual(b[2], 2)
1028 self.assertEqual(b[2:10], [2,3,4])
1029
1030 b *= 2
1031 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1032
1033 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1034
1035 self.assertEqual(a[:], range(10))
1036
1037 d = [a, b]
1038 e = self.list(d)
1039 self.assertEqual(
1040 e[:],
1041 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1042 )
1043
1044 f = self.list([a])
1045 a.append('hello')
1046 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1047
1048 def test_dict(self):
1049 d = self.dict()
1050 indices = range(65, 70)
1051 for i in indices:
1052 d[i] = chr(i)
1053 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1054 self.assertEqual(sorted(d.keys()), indices)
1055 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1056 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1057
1058 def test_namespace(self):
1059 n = self.Namespace()
1060 n.name = 'Bob'
1061 n.job = 'Builder'
1062 n._hidden = 'hidden'
1063 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1064 del n.job
1065 self.assertEqual(str(n), "Namespace(name='Bob')")
1066 self.assertTrue(hasattr(n, 'name'))
1067 self.assertTrue(not hasattr(n, 'job'))
1068
1069#
1070#
1071#
1072
1073def sqr(x, wait=0.0):
1074 time.sleep(wait)
1075 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001076class _TestPool(BaseTestCase):
1077
1078 def test_apply(self):
1079 papply = self.pool.apply
1080 self.assertEqual(papply(sqr, (5,)), sqr(5))
1081 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1082
1083 def test_map(self):
1084 pmap = self.pool.map
1085 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1086 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1087 map(sqr, range(100)))
1088
Jesse Noller7530e472009-07-16 14:23:04 +00001089 def test_map_chunksize(self):
1090 try:
1091 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1092 except multiprocessing.TimeoutError:
1093 self.fail("pool.map_async with chunksize stalled on null list")
1094
Benjamin Petersondfd79492008-06-13 19:13:39 +00001095 def test_async(self):
1096 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1097 get = TimingWrapper(res.get)
1098 self.assertEqual(get(), 49)
1099 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1100
1101 def test_async_timeout(self):
1102 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1103 get = TimingWrapper(res.get)
1104 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1105 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1106
1107 def test_imap(self):
1108 it = self.pool.imap(sqr, range(10))
1109 self.assertEqual(list(it), map(sqr, range(10)))
1110
1111 it = self.pool.imap(sqr, range(10))
1112 for i in range(10):
1113 self.assertEqual(it.next(), i*i)
1114 self.assertRaises(StopIteration, it.next)
1115
1116 it = self.pool.imap(sqr, range(1000), chunksize=100)
1117 for i in range(1000):
1118 self.assertEqual(it.next(), i*i)
1119 self.assertRaises(StopIteration, it.next)
1120
1121 def test_imap_unordered(self):
1122 it = self.pool.imap_unordered(sqr, range(1000))
1123 self.assertEqual(sorted(it), map(sqr, range(1000)))
1124
1125 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1126 self.assertEqual(sorted(it), map(sqr, range(1000)))
1127
1128 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001129 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1130 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1131
Benjamin Petersondfd79492008-06-13 19:13:39 +00001132 p = multiprocessing.Pool(3)
1133 self.assertEqual(3, len(p._pool))
1134 p.close()
1135 p.join()
1136
1137 def test_terminate(self):
1138 if self.TYPE == 'manager':
1139 # On Unix a forked process increfs each shared object to
1140 # which its parent process held a reference. If the
1141 # forked process gets terminated then there is likely to
1142 # be a reference leak. So to prevent
1143 # _TestZZZNumberOfObjects from failing we skip this test
1144 # when using a manager.
1145 return
1146
1147 result = self.pool.map_async(
1148 time.sleep, [0.1 for i in range(10000)], chunksize=1
1149 )
1150 self.pool.terminate()
1151 join = TimingWrapper(self.pool.join)
1152 join()
1153 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001154
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001155 def test_empty_iterable(self):
1156 # See Issue 12157
1157 p = self.Pool(1)
1158
1159 self.assertEqual(p.map(sqr, []), [])
1160 self.assertEqual(list(p.imap(sqr, [])), [])
1161 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1162 self.assertEqual(p.map_async(sqr, []).get(), [])
1163
1164 p.close()
1165 p.join()
1166
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001167def unpickleable_result():
1168 return lambda: 42
1169
1170class _TestPoolWorkerErrors(BaseTestCase):
1171 ALLOWED_TYPES = ('processes', )
1172
1173 def test_unpickleable_result(self):
1174 from multiprocessing.pool import MaybeEncodingError
1175 p = multiprocessing.Pool(2)
1176
1177 # Make sure we don't lose pool processes because of encoding errors.
1178 for iteration in range(20):
1179 res = p.apply_async(unpickleable_result)
1180 self.assertRaises(MaybeEncodingError, res.get)
1181
1182 p.close()
1183 p.join()
1184
Jesse Noller654ade32010-01-27 03:05:57 +00001185class _TestPoolWorkerLifetime(BaseTestCase):
1186
1187 ALLOWED_TYPES = ('processes', )
1188 def test_pool_worker_lifetime(self):
1189 p = multiprocessing.Pool(3, maxtasksperchild=10)
1190 self.assertEqual(3, len(p._pool))
1191 origworkerpids = [w.pid for w in p._pool]
1192 # Run many tasks so each worker gets replaced (hopefully)
1193 results = []
1194 for i in range(100):
1195 results.append(p.apply_async(sqr, (i, )))
1196 # Fetch the results and verify we got the right answers,
1197 # also ensuring all the tasks have completed.
1198 for (j, res) in enumerate(results):
1199 self.assertEqual(res.get(), sqr(j))
1200 # Refill the pool
1201 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001202 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001203 # (countdown * DELTA = 5 seconds max startup process time)
1204 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001205 while countdown and not all(w.is_alive() for w in p._pool):
1206 countdown -= 1
1207 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001208 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001209 # All pids should be assigned. See issue #7805.
1210 self.assertNotIn(None, origworkerpids)
1211 self.assertNotIn(None, finalworkerpids)
1212 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001213 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1214 p.close()
1215 p.join()
1216
Charles-François Natali46f990e2011-10-24 18:43:51 +02001217 def test_pool_worker_lifetime_early_close(self):
1218 # Issue #10332: closing a pool whose workers have limited lifetimes
1219 # before all the tasks completed would make join() hang.
1220 p = multiprocessing.Pool(3, maxtasksperchild=1)
1221 results = []
1222 for i in range(6):
1223 results.append(p.apply_async(sqr, (i, 0.3)))
1224 p.close()
1225 p.join()
1226 # check the results
1227 for (j, res) in enumerate(results):
1228 self.assertEqual(res.get(), sqr(j))
1229
1230
Benjamin Petersondfd79492008-06-13 19:13:39 +00001231#
1232# Test that manager has expected number of shared objects left
1233#
1234
1235class _TestZZZNumberOfObjects(BaseTestCase):
1236 # Because test cases are sorted alphabetically, this one will get
1237 # run after all the other tests for the manager. It tests that
1238 # there have been no "reference leaks" for the manager's shared
1239 # objects. Note the comment in _TestPool.test_terminate().
1240 ALLOWED_TYPES = ('manager',)
1241
1242 def test_number_of_objects(self):
1243 EXPECTED_NUMBER = 1 # the pool object is still alive
1244 multiprocessing.active_children() # discard dead process objs
1245 gc.collect() # do garbage collection
1246 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001247 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001248 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001249 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001250 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001251
1252 self.assertEqual(refs, EXPECTED_NUMBER)
1253
1254#
1255# Test of creating a customized manager class
1256#
1257
1258from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1259
1260class FooBar(object):
1261 def f(self):
1262 return 'f()'
1263 def g(self):
1264 raise ValueError
1265 def _h(self):
1266 return '_h()'
1267
1268def baz():
1269 for i in xrange(10):
1270 yield i*i
1271
1272class IteratorProxy(BaseProxy):
1273 _exposed_ = ('next', '__next__')
1274 def __iter__(self):
1275 return self
1276 def next(self):
1277 return self._callmethod('next')
1278 def __next__(self):
1279 return self._callmethod('__next__')
1280
1281class MyManager(BaseManager):
1282 pass
1283
1284MyManager.register('Foo', callable=FooBar)
1285MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1286MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1287
1288
1289class _TestMyManager(BaseTestCase):
1290
1291 ALLOWED_TYPES = ('manager',)
1292
1293 def test_mymanager(self):
1294 manager = MyManager()
1295 manager.start()
1296
1297 foo = manager.Foo()
1298 bar = manager.Bar()
1299 baz = manager.baz()
1300
1301 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1302 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1303
1304 self.assertEqual(foo_methods, ['f', 'g'])
1305 self.assertEqual(bar_methods, ['f', '_h'])
1306
1307 self.assertEqual(foo.f(), 'f()')
1308 self.assertRaises(ValueError, foo.g)
1309 self.assertEqual(foo._callmethod('f'), 'f()')
1310 self.assertRaises(RemoteError, foo._callmethod, '_h')
1311
1312 self.assertEqual(bar.f(), 'f()')
1313 self.assertEqual(bar._h(), '_h()')
1314 self.assertEqual(bar._callmethod('f'), 'f()')
1315 self.assertEqual(bar._callmethod('_h'), '_h()')
1316
1317 self.assertEqual(list(baz), [i*i for i in range(10)])
1318
1319 manager.shutdown()
1320
1321#
1322# Test of connecting to a remote server and using xmlrpclib for serialization
1323#
1324
1325_queue = Queue.Queue()
1326def get_queue():
1327 return _queue
1328
1329class QueueManager(BaseManager):
1330 '''manager class used by server process'''
1331QueueManager.register('get_queue', callable=get_queue)
1332
1333class QueueManager2(BaseManager):
1334 '''manager class which specifies the same interface as QueueManager'''
1335QueueManager2.register('get_queue')
1336
1337
1338SERIALIZER = 'xmlrpclib'
1339
1340class _TestRemoteManager(BaseTestCase):
1341
1342 ALLOWED_TYPES = ('manager',)
1343
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001344 @classmethod
1345 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001346 manager = QueueManager2(
1347 address=address, authkey=authkey, serializer=SERIALIZER
1348 )
1349 manager.connect()
1350 queue = manager.get_queue()
1351 queue.put(('hello world', None, True, 2.25))
1352
1353 def test_remote(self):
1354 authkey = os.urandom(32)
1355
1356 manager = QueueManager(
1357 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1358 )
1359 manager.start()
1360
1361 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001362 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001363 p.start()
1364
1365 manager2 = QueueManager2(
1366 address=manager.address, authkey=authkey, serializer=SERIALIZER
1367 )
1368 manager2.connect()
1369 queue = manager2.get_queue()
1370
1371 # Note that xmlrpclib will deserialize object as a list not a tuple
1372 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1373
1374 # Because we are using xmlrpclib for serialization instead of
1375 # pickle this will cause a serialization error.
1376 self.assertRaises(Exception, queue.put, time.sleep)
1377
1378 # Make queue finalizer run before the server is stopped
1379 del queue
1380 manager.shutdown()
1381
Jesse Noller459a6482009-03-30 15:50:42 +00001382class _TestManagerRestart(BaseTestCase):
1383
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001384 @classmethod
1385 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001386 manager = QueueManager(
1387 address=address, authkey=authkey, serializer=SERIALIZER)
1388 manager.connect()
1389 queue = manager.get_queue()
1390 queue.put('hello world')
1391
1392 def test_rapid_restart(self):
1393 authkey = os.urandom(32)
1394 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001395 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001396 srvr = manager.get_server()
1397 addr = srvr.address
1398 # Close the connection.Listener socket which gets opened as a part
1399 # of manager.get_server(). It's not needed for the test.
1400 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001401 manager.start()
1402
1403 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001404 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001405 p.start()
1406 queue = manager.get_queue()
1407 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001408 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001409 manager.shutdown()
1410 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001411 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001412 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001413 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001414
Benjamin Petersondfd79492008-06-13 19:13:39 +00001415#
1416#
1417#
1418
1419SENTINEL = latin('')
1420
1421class _TestConnection(BaseTestCase):
1422
1423 ALLOWED_TYPES = ('processes', 'threads')
1424
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001425 @classmethod
1426 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001427 for msg in iter(conn.recv_bytes, SENTINEL):
1428 conn.send_bytes(msg)
1429 conn.close()
1430
1431 def test_connection(self):
1432 conn, child_conn = self.Pipe()
1433
1434 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001435 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001436 p.start()
1437
1438 seq = [1, 2.25, None]
1439 msg = latin('hello world')
1440 longmsg = msg * 10
1441 arr = array.array('i', range(4))
1442
1443 if self.TYPE == 'processes':
1444 self.assertEqual(type(conn.fileno()), int)
1445
1446 self.assertEqual(conn.send(seq), None)
1447 self.assertEqual(conn.recv(), seq)
1448
1449 self.assertEqual(conn.send_bytes(msg), None)
1450 self.assertEqual(conn.recv_bytes(), msg)
1451
1452 if self.TYPE == 'processes':
1453 buffer = array.array('i', [0]*10)
1454 expected = list(arr) + [0] * (10 - len(arr))
1455 self.assertEqual(conn.send_bytes(arr), None)
1456 self.assertEqual(conn.recv_bytes_into(buffer),
1457 len(arr) * buffer.itemsize)
1458 self.assertEqual(list(buffer), expected)
1459
1460 buffer = array.array('i', [0]*10)
1461 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1462 self.assertEqual(conn.send_bytes(arr), None)
1463 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1464 len(arr) * buffer.itemsize)
1465 self.assertEqual(list(buffer), expected)
1466
1467 buffer = bytearray(latin(' ' * 40))
1468 self.assertEqual(conn.send_bytes(longmsg), None)
1469 try:
1470 res = conn.recv_bytes_into(buffer)
1471 except multiprocessing.BufferTooShort, e:
1472 self.assertEqual(e.args, (longmsg,))
1473 else:
1474 self.fail('expected BufferTooShort, got %s' % res)
1475
1476 poll = TimingWrapper(conn.poll)
1477
1478 self.assertEqual(poll(), False)
1479 self.assertTimingAlmostEqual(poll.elapsed, 0)
1480
1481 self.assertEqual(poll(TIMEOUT1), False)
1482 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1483
1484 conn.send(None)
1485
1486 self.assertEqual(poll(TIMEOUT1), True)
1487 self.assertTimingAlmostEqual(poll.elapsed, 0)
1488
1489 self.assertEqual(conn.recv(), None)
1490
1491 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1492 conn.send_bytes(really_big_msg)
1493 self.assertEqual(conn.recv_bytes(), really_big_msg)
1494
1495 conn.send_bytes(SENTINEL) # tell child to quit
1496 child_conn.close()
1497
1498 if self.TYPE == 'processes':
1499 self.assertEqual(conn.readable, True)
1500 self.assertEqual(conn.writable, True)
1501 self.assertRaises(EOFError, conn.recv)
1502 self.assertRaises(EOFError, conn.recv_bytes)
1503
1504 p.join()
1505
1506 def test_duplex_false(self):
1507 reader, writer = self.Pipe(duplex=False)
1508 self.assertEqual(writer.send(1), None)
1509 self.assertEqual(reader.recv(), 1)
1510 if self.TYPE == 'processes':
1511 self.assertEqual(reader.readable, True)
1512 self.assertEqual(reader.writable, False)
1513 self.assertEqual(writer.readable, False)
1514 self.assertEqual(writer.writable, True)
1515 self.assertRaises(IOError, reader.send, 2)
1516 self.assertRaises(IOError, writer.recv)
1517 self.assertRaises(IOError, writer.poll)
1518
1519 def test_spawn_close(self):
1520 # We test that a pipe connection can be closed by parent
1521 # process immediately after child is spawned. On Windows this
1522 # would have sometimes failed on old versions because
1523 # child_conn would be closed before the child got a chance to
1524 # duplicate it.
1525 conn, child_conn = self.Pipe()
1526
1527 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001528 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001529 p.start()
1530 child_conn.close() # this might complete before child initializes
1531
1532 msg = latin('hello')
1533 conn.send_bytes(msg)
1534 self.assertEqual(conn.recv_bytes(), msg)
1535
1536 conn.send_bytes(SENTINEL)
1537 conn.close()
1538 p.join()
1539
1540 def test_sendbytes(self):
1541 if self.TYPE != 'processes':
1542 return
1543
1544 msg = latin('abcdefghijklmnopqrstuvwxyz')
1545 a, b = self.Pipe()
1546
1547 a.send_bytes(msg)
1548 self.assertEqual(b.recv_bytes(), msg)
1549
1550 a.send_bytes(msg, 5)
1551 self.assertEqual(b.recv_bytes(), msg[5:])
1552
1553 a.send_bytes(msg, 7, 8)
1554 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1555
1556 a.send_bytes(msg, 26)
1557 self.assertEqual(b.recv_bytes(), latin(''))
1558
1559 a.send_bytes(msg, 26, 0)
1560 self.assertEqual(b.recv_bytes(), latin(''))
1561
1562 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1563
1564 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1565
1566 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1567
1568 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1569
1570 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1571
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001572 @classmethod
1573 def _is_fd_assigned(cls, fd):
1574 try:
1575 os.fstat(fd)
1576 except OSError as e:
1577 if e.errno == errno.EBADF:
1578 return False
1579 raise
1580 else:
1581 return True
1582
1583 @classmethod
1584 def _writefd(cls, conn, data, create_dummy_fds=False):
1585 if create_dummy_fds:
1586 for i in range(0, 256):
1587 if not cls._is_fd_assigned(i):
1588 os.dup2(conn.fileno(), i)
1589 fd = reduction.recv_handle(conn)
1590 if msvcrt:
1591 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1592 os.write(fd, data)
1593 os.close(fd)
1594
Charles-François Natalif8413b22011-09-21 18:44:49 +02001595 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001596 def test_fd_transfer(self):
1597 if self.TYPE != 'processes':
1598 self.skipTest("only makes sense with processes")
1599 conn, child_conn = self.Pipe(duplex=True)
1600
1601 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001602 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001603 p.start()
1604 with open(test_support.TESTFN, "wb") as f:
1605 fd = f.fileno()
1606 if msvcrt:
1607 fd = msvcrt.get_osfhandle(fd)
1608 reduction.send_handle(conn, fd, p.pid)
1609 p.join()
1610 with open(test_support.TESTFN, "rb") as f:
1611 self.assertEqual(f.read(), b"foo")
1612
Charles-François Natalif8413b22011-09-21 18:44:49 +02001613 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001614 @unittest.skipIf(sys.platform == "win32",
1615 "test semantics don't make sense on Windows")
1616 @unittest.skipIf(MAXFD <= 256,
1617 "largest assignable fd number is too small")
1618 @unittest.skipUnless(hasattr(os, "dup2"),
1619 "test needs os.dup2()")
1620 def test_large_fd_transfer(self):
1621 # With fd > 256 (issue #11657)
1622 if self.TYPE != 'processes':
1623 self.skipTest("only makes sense with processes")
1624 conn, child_conn = self.Pipe(duplex=True)
1625
1626 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001627 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001628 p.start()
1629 with open(test_support.TESTFN, "wb") as f:
1630 fd = f.fileno()
1631 for newfd in range(256, MAXFD):
1632 if not self._is_fd_assigned(newfd):
1633 break
1634 else:
1635 self.fail("could not find an unassigned large file descriptor")
1636 os.dup2(fd, newfd)
1637 try:
1638 reduction.send_handle(conn, newfd, p.pid)
1639 finally:
1640 os.close(newfd)
1641 p.join()
1642 with open(test_support.TESTFN, "rb") as f:
1643 self.assertEqual(f.read(), b"bar")
1644
Jesus Ceac23484b2011-09-21 03:47:39 +02001645 @classmethod
1646 def _send_data_without_fd(self, conn):
1647 os.write(conn.fileno(), b"\0")
1648
Charles-François Natalif8413b22011-09-21 18:44:49 +02001649 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001650 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1651 def test_missing_fd_transfer(self):
1652 # Check that exception is raised when received data is not
1653 # accompanied by a file descriptor in ancillary data.
1654 if self.TYPE != 'processes':
1655 self.skipTest("only makes sense with processes")
1656 conn, child_conn = self.Pipe(duplex=True)
1657
1658 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1659 p.daemon = True
1660 p.start()
1661 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1662 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001663
Benjamin Petersondfd79492008-06-13 19:13:39 +00001664class _TestListenerClient(BaseTestCase):
1665
1666 ALLOWED_TYPES = ('processes', 'threads')
1667
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001668 @classmethod
1669 def _test(cls, address):
1670 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001671 conn.send('hello')
1672 conn.close()
1673
1674 def test_listener_client(self):
1675 for family in self.connection.families:
1676 l = self.connection.Listener(family=family)
1677 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001678 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001679 p.start()
1680 conn = l.accept()
1681 self.assertEqual(conn.recv(), 'hello')
1682 p.join()
1683 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001684
1685 def test_issue14725(self):
1686 l = self.connection.Listener()
1687 p = self.Process(target=self._test, args=(l.address,))
1688 p.daemon = True
1689 p.start()
1690 time.sleep(1)
1691 # On Windows the client process should by now have connected,
1692 # written data and closed the pipe handle by now. This causes
1693 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1694 # 14725.
1695 conn = l.accept()
1696 self.assertEqual(conn.recv(), 'hello')
1697 conn.close()
1698 p.join()
1699 l.close()
1700
Benjamin Petersondfd79492008-06-13 19:13:39 +00001701#
1702# Test of sending connection and socket objects between processes
1703#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001704"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001705class _TestPicklingConnections(BaseTestCase):
1706
1707 ALLOWED_TYPES = ('processes',)
1708
1709 def _listener(self, conn, families):
1710 for fam in families:
1711 l = self.connection.Listener(family=fam)
1712 conn.send(l.address)
1713 new_conn = l.accept()
1714 conn.send(new_conn)
1715
1716 if self.TYPE == 'processes':
1717 l = socket.socket()
1718 l.bind(('localhost', 0))
1719 conn.send(l.getsockname())
1720 l.listen(1)
1721 new_conn, addr = l.accept()
1722 conn.send(new_conn)
1723
1724 conn.recv()
1725
1726 def _remote(self, conn):
1727 for (address, msg) in iter(conn.recv, None):
1728 client = self.connection.Client(address)
1729 client.send(msg.upper())
1730 client.close()
1731
1732 if self.TYPE == 'processes':
1733 address, msg = conn.recv()
1734 client = socket.socket()
1735 client.connect(address)
1736 client.sendall(msg.upper())
1737 client.close()
1738
1739 conn.close()
1740
1741 def test_pickling(self):
1742 try:
1743 multiprocessing.allow_connection_pickling()
1744 except ImportError:
1745 return
1746
1747 families = self.connection.families
1748
1749 lconn, lconn0 = self.Pipe()
1750 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001751 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001752 lp.start()
1753 lconn0.close()
1754
1755 rconn, rconn0 = self.Pipe()
1756 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001757 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001758 rp.start()
1759 rconn0.close()
1760
1761 for fam in families:
1762 msg = ('This connection uses family %s' % fam).encode('ascii')
1763 address = lconn.recv()
1764 rconn.send((address, msg))
1765 new_conn = lconn.recv()
1766 self.assertEqual(new_conn.recv(), msg.upper())
1767
1768 rconn.send(None)
1769
1770 if self.TYPE == 'processes':
1771 msg = latin('This connection uses a normal socket')
1772 address = lconn.recv()
1773 rconn.send((address, msg))
1774 if hasattr(socket, 'fromfd'):
1775 new_conn = lconn.recv()
1776 self.assertEqual(new_conn.recv(100), msg.upper())
1777 else:
1778 # XXX On Windows with Py2.6 need to backport fromfd()
1779 discard = lconn.recv_bytes()
1780
1781 lconn.send(None)
1782
1783 rconn.close()
1784 lconn.close()
1785
1786 lp.join()
1787 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001788"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001789#
1790#
1791#
1792
1793class _TestHeap(BaseTestCase):
1794
1795 ALLOWED_TYPES = ('processes',)
1796
1797 def test_heap(self):
1798 iterations = 5000
1799 maxblocks = 50
1800 blocks = []
1801
1802 # create and destroy lots of blocks of different sizes
1803 for i in xrange(iterations):
1804 size = int(random.lognormvariate(0, 1) * 1000)
1805 b = multiprocessing.heap.BufferWrapper(size)
1806 blocks.append(b)
1807 if len(blocks) > maxblocks:
1808 i = random.randrange(maxblocks)
1809 del blocks[i]
1810
1811 # get the heap object
1812 heap = multiprocessing.heap.BufferWrapper._heap
1813
1814 # verify the state of the heap
1815 all = []
1816 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001817 heap._lock.acquire()
1818 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001819 for L in heap._len_to_seq.values():
1820 for arena, start, stop in L:
1821 all.append((heap._arenas.index(arena), start, stop,
1822 stop-start, 'free'))
1823 for arena, start, stop in heap._allocated_blocks:
1824 all.append((heap._arenas.index(arena), start, stop,
1825 stop-start, 'occupied'))
1826 occupied += (stop-start)
1827
1828 all.sort()
1829
1830 for i in range(len(all)-1):
1831 (arena, start, stop) = all[i][:3]
1832 (narena, nstart, nstop) = all[i+1][:3]
1833 self.assertTrue((arena != narena and nstart == 0) or
1834 (stop == nstart))
1835
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001836 def test_free_from_gc(self):
1837 # Check that freeing of blocks by the garbage collector doesn't deadlock
1838 # (issue #12352).
1839 # Make sure the GC is enabled, and set lower collection thresholds to
1840 # make collections more frequent (and increase the probability of
1841 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001842 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001843 gc.enable()
1844 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001845 thresholds = gc.get_threshold()
1846 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001847 gc.set_threshold(10)
1848
1849 # perform numerous block allocations, with cyclic references to make
1850 # sure objects are collected asynchronously by the gc
1851 for i in range(5000):
1852 a = multiprocessing.heap.BufferWrapper(1)
1853 b = multiprocessing.heap.BufferWrapper(1)
1854 # circular references
1855 a.buddy = b
1856 b.buddy = a
1857
Benjamin Petersondfd79492008-06-13 19:13:39 +00001858#
1859#
1860#
1861
Benjamin Petersondfd79492008-06-13 19:13:39 +00001862class _Foo(Structure):
1863 _fields_ = [
1864 ('x', c_int),
1865 ('y', c_double)
1866 ]
1867
1868class _TestSharedCTypes(BaseTestCase):
1869
1870 ALLOWED_TYPES = ('processes',)
1871
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001872 def setUp(self):
1873 if not HAS_SHAREDCTYPES:
1874 self.skipTest("requires multiprocessing.sharedctypes")
1875
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001876 @classmethod
1877 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001878 x.value *= 2
1879 y.value *= 2
1880 foo.x *= 2
1881 foo.y *= 2
1882 string.value *= 2
1883 for i in range(len(arr)):
1884 arr[i] *= 2
1885
1886 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001887 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001888 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001889 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001890 arr = self.Array('d', range(10), lock=lock)
1891 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001892 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001893
1894 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001895 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001896 p.start()
1897 p.join()
1898
1899 self.assertEqual(x.value, 14)
1900 self.assertAlmostEqual(y.value, 2.0/3.0)
1901 self.assertEqual(foo.x, 6)
1902 self.assertAlmostEqual(foo.y, 4.0)
1903 for i in range(10):
1904 self.assertAlmostEqual(arr[i], i*2)
1905 self.assertEqual(string.value, latin('hellohello'))
1906
1907 def test_synchronize(self):
1908 self.test_sharedctypes(lock=True)
1909
1910 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001911 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001912 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001913 foo.x = 0
1914 foo.y = 0
1915 self.assertEqual(bar.x, 2)
1916 self.assertAlmostEqual(bar.y, 5.0)
1917
1918#
1919#
1920#
1921
1922class _TestFinalize(BaseTestCase):
1923
1924 ALLOWED_TYPES = ('processes',)
1925
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001926 @classmethod
1927 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001928 class Foo(object):
1929 pass
1930
1931 a = Foo()
1932 util.Finalize(a, conn.send, args=('a',))
1933 del a # triggers callback for a
1934
1935 b = Foo()
1936 close_b = util.Finalize(b, conn.send, args=('b',))
1937 close_b() # triggers callback for b
1938 close_b() # does nothing because callback has already been called
1939 del b # does nothing because callback has already been called
1940
1941 c = Foo()
1942 util.Finalize(c, conn.send, args=('c',))
1943
1944 d10 = Foo()
1945 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1946
1947 d01 = Foo()
1948 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1949 d02 = Foo()
1950 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1951 d03 = Foo()
1952 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1953
1954 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1955
1956 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1957
Ezio Melottic2077b02011-03-16 12:34:31 +02001958 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001959 # garbage collecting locals
1960 util._exit_function()
1961 conn.close()
1962 os._exit(0)
1963
1964 def test_finalize(self):
1965 conn, child_conn = self.Pipe()
1966
1967 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001968 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001969 p.start()
1970 p.join()
1971
1972 result = [obj for obj in iter(conn.recv, 'STOP')]
1973 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1974
1975#
1976# Test that from ... import * works for each module
1977#
1978
1979class _TestImportStar(BaseTestCase):
1980
1981 ALLOWED_TYPES = ('processes',)
1982
1983 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001984 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001985 'multiprocessing', 'multiprocessing.connection',
1986 'multiprocessing.heap', 'multiprocessing.managers',
1987 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001988 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001989 ]
1990
Charles-François Natalif8413b22011-09-21 18:44:49 +02001991 if HAS_REDUCTION:
1992 modules.append('multiprocessing.reduction')
1993
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001994 if c_int is not None:
1995 # This module requires _ctypes
1996 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001997
1998 for name in modules:
1999 __import__(name)
2000 mod = sys.modules[name]
2001
2002 for attr in getattr(mod, '__all__', ()):
2003 self.assertTrue(
2004 hasattr(mod, attr),
2005 '%r does not have attribute %r' % (mod, attr)
2006 )
2007
2008#
2009# Quick test that logging works -- does not test logging output
2010#
2011
2012class _TestLogging(BaseTestCase):
2013
2014 ALLOWED_TYPES = ('processes',)
2015
2016 def test_enable_logging(self):
2017 logger = multiprocessing.get_logger()
2018 logger.setLevel(util.SUBWARNING)
2019 self.assertTrue(logger is not None)
2020 logger.debug('this will not be printed')
2021 logger.info('nor will this')
2022 logger.setLevel(LOG_LEVEL)
2023
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002024 @classmethod
2025 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002026 logger = multiprocessing.get_logger()
2027 conn.send(logger.getEffectiveLevel())
2028
2029 def test_level(self):
2030 LEVEL1 = 32
2031 LEVEL2 = 37
2032
2033 logger = multiprocessing.get_logger()
2034 root_logger = logging.getLogger()
2035 root_level = root_logger.level
2036
2037 reader, writer = multiprocessing.Pipe(duplex=False)
2038
2039 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002040 p = self.Process(target=self._test_level, args=(writer,))
2041 p.daemon = True
2042 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002043 self.assertEqual(LEVEL1, reader.recv())
2044
2045 logger.setLevel(logging.NOTSET)
2046 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002047 p = self.Process(target=self._test_level, args=(writer,))
2048 p.daemon = True
2049 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002050 self.assertEqual(LEVEL2, reader.recv())
2051
2052 root_logger.setLevel(root_level)
2053 logger.setLevel(level=LOG_LEVEL)
2054
Jesse Noller814d02d2009-11-21 14:38:23 +00002055
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002056# class _TestLoggingProcessName(BaseTestCase):
2057#
2058# def handle(self, record):
2059# assert record.processName == multiprocessing.current_process().name
2060# self.__handled = True
2061#
2062# def test_logging(self):
2063# handler = logging.Handler()
2064# handler.handle = self.handle
2065# self.__handled = False
2066# # Bypass getLogger() and side-effects
2067# logger = logging.getLoggerClass()(
2068# 'multiprocessing.test.TestLoggingProcessName')
2069# logger.addHandler(handler)
2070# logger.propagate = False
2071#
2072# logger.warn('foo')
2073# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002074
Benjamin Petersondfd79492008-06-13 19:13:39 +00002075#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002076# Test to verify handle verification, see issue 3321
2077#
2078
2079class TestInvalidHandle(unittest.TestCase):
2080
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002081 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002082 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002083 conn = _multiprocessing.Connection(44977608)
2084 self.assertRaises(IOError, conn.poll)
2085 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002086
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002087#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002088# Functions used to create test cases from the base ones in this module
2089#
2090
2091def get_attributes(Source, names):
2092 d = {}
2093 for name in names:
2094 obj = getattr(Source, name)
2095 if type(obj) == type(get_attributes):
2096 obj = staticmethod(obj)
2097 d[name] = obj
2098 return d
2099
2100def create_test_cases(Mixin, type):
2101 result = {}
2102 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002103 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002104
2105 for name in glob.keys():
2106 if name.startswith('_Test'):
2107 base = glob[name]
2108 if type in base.ALLOWED_TYPES:
2109 newname = 'With' + Type + name[1:]
2110 class Temp(base, unittest.TestCase, Mixin):
2111 pass
2112 result[newname] = Temp
2113 Temp.__name__ = newname
2114 Temp.__module__ = Mixin.__module__
2115 return result
2116
2117#
2118# Create test cases
2119#
2120
2121class ProcessesMixin(object):
2122 TYPE = 'processes'
2123 Process = multiprocessing.Process
2124 locals().update(get_attributes(multiprocessing, (
2125 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2126 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2127 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002128 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002129 )))
2130
2131testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2132globals().update(testcases_processes)
2133
2134
2135class ManagerMixin(object):
2136 TYPE = 'manager'
2137 Process = multiprocessing.Process
2138 manager = object.__new__(multiprocessing.managers.SyncManager)
2139 locals().update(get_attributes(manager, (
2140 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2141 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002142 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002143 )))
2144
2145testcases_manager = create_test_cases(ManagerMixin, type='manager')
2146globals().update(testcases_manager)
2147
2148
2149class ThreadsMixin(object):
2150 TYPE = 'threads'
2151 Process = multiprocessing.dummy.Process
2152 locals().update(get_attributes(multiprocessing.dummy, (
2153 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2154 'Condition', 'Event', 'Value', 'Array', 'current_process',
2155 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002156 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002157 )))
2158
2159testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2160globals().update(testcases_threads)
2161
Neal Norwitz0c519b32008-08-25 01:50:24 +00002162class OtherTest(unittest.TestCase):
2163 # TODO: add more tests for deliver/answer challenge.
2164 def test_deliver_challenge_auth_failure(self):
2165 class _FakeConnection(object):
2166 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002167 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002168 def send_bytes(self, data):
2169 pass
2170 self.assertRaises(multiprocessing.AuthenticationError,
2171 multiprocessing.connection.deliver_challenge,
2172 _FakeConnection(), b'abc')
2173
2174 def test_answer_challenge_auth_failure(self):
2175 class _FakeConnection(object):
2176 def __init__(self):
2177 self.count = 0
2178 def recv_bytes(self, size):
2179 self.count += 1
2180 if self.count == 1:
2181 return multiprocessing.connection.CHALLENGE
2182 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002183 return b'something bogus'
2184 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002185 def send_bytes(self, data):
2186 pass
2187 self.assertRaises(multiprocessing.AuthenticationError,
2188 multiprocessing.connection.answer_challenge,
2189 _FakeConnection(), b'abc')
2190
Jesse Noller7152f6d2009-04-02 05:17:26 +00002191#
2192# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2193#
2194
2195def initializer(ns):
2196 ns.test += 1
2197
2198class TestInitializers(unittest.TestCase):
2199 def setUp(self):
2200 self.mgr = multiprocessing.Manager()
2201 self.ns = self.mgr.Namespace()
2202 self.ns.test = 0
2203
2204 def tearDown(self):
2205 self.mgr.shutdown()
2206
2207 def test_manager_initializer(self):
2208 m = multiprocessing.managers.SyncManager()
2209 self.assertRaises(TypeError, m.start, 1)
2210 m.start(initializer, (self.ns,))
2211 self.assertEqual(self.ns.test, 1)
2212 m.shutdown()
2213
2214 def test_pool_initializer(self):
2215 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2216 p = multiprocessing.Pool(1, initializer, (self.ns,))
2217 p.close()
2218 p.join()
2219 self.assertEqual(self.ns.test, 1)
2220
Jesse Noller1b90efb2009-06-30 17:11:52 +00002221#
2222# Issue 5155, 5313, 5331: Test process in processes
2223# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2224#
2225
2226def _ThisSubProcess(q):
2227 try:
2228 item = q.get(block=False)
2229 except Queue.Empty:
2230 pass
2231
2232def _TestProcess(q):
2233 queue = multiprocessing.Queue()
2234 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002235 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002236 subProc.start()
2237 subProc.join()
2238
2239def _afunc(x):
2240 return x*x
2241
2242def pool_in_process():
2243 pool = multiprocessing.Pool(processes=4)
2244 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2245
2246class _file_like(object):
2247 def __init__(self, delegate):
2248 self._delegate = delegate
2249 self._pid = None
2250
2251 @property
2252 def cache(self):
2253 pid = os.getpid()
2254 # There are no race conditions since fork keeps only the running thread
2255 if pid != self._pid:
2256 self._pid = pid
2257 self._cache = []
2258 return self._cache
2259
2260 def write(self, data):
2261 self.cache.append(data)
2262
2263 def flush(self):
2264 self._delegate.write(''.join(self.cache))
2265 self._cache = []
2266
2267class TestStdinBadfiledescriptor(unittest.TestCase):
2268
2269 def test_queue_in_process(self):
2270 queue = multiprocessing.Queue()
2271 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2272 proc.start()
2273 proc.join()
2274
2275 def test_pool_in_process(self):
2276 p = multiprocessing.Process(target=pool_in_process)
2277 p.start()
2278 p.join()
2279
2280 def test_flushing(self):
2281 sio = StringIO()
2282 flike = _file_like(sio)
2283 flike.write('foo')
2284 proc = multiprocessing.Process(target=lambda: flike.flush())
2285 flike.flush()
2286 assert sio.getvalue() == 'foo'
2287
2288testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2289 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002290
Benjamin Petersondfd79492008-06-13 19:13:39 +00002291#
2292#
2293#
2294
2295def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002296 if sys.platform.startswith("linux"):
2297 try:
2298 lock = multiprocessing.RLock()
2299 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002300 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002301
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002302 check_enough_semaphores()
2303
Benjamin Petersondfd79492008-06-13 19:13:39 +00002304 if run is None:
2305 from test.test_support import run_unittest as run
2306
2307 util.get_temp_dir() # creates temp directory for use by all processes
2308
2309 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2310
Jesse Noller146b7ab2008-07-02 16:44:09 +00002311 ProcessesMixin.pool = multiprocessing.Pool(4)
2312 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2313 ManagerMixin.manager.__init__()
2314 ManagerMixin.manager.start()
2315 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002316
2317 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002318 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2319 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002320 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2321 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002322 )
2323
2324 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2325 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002326 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2327 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002328 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002329 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002330 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002331 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2332 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2333 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002334 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002335
Jesse Noller146b7ab2008-07-02 16:44:09 +00002336 ThreadsMixin.pool.terminate()
2337 ProcessesMixin.pool.terminate()
2338 ManagerMixin.pool.terminate()
2339 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002340
Jesse Noller146b7ab2008-07-02 16:44:09 +00002341 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002342
2343def main():
2344 test_main(unittest.TextTestRunner(verbosity=2).run)
2345
2346if __name__ == '__main__':
2347 main()