blob: 45bf4549feeca0cc8e63a1e9b16b007242936529 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Charles-François Natalie51c8da2011-09-21 18:48:21 +020038from multiprocessing import util
39
40try:
41 from multiprocessing import reduction
42 HAS_REDUCTION = True
43except ImportError:
44 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Brian Curtinafa88b52010-10-07 01:12:19 +000046try:
47 from multiprocessing.sharedctypes import Value, copy
48 HAS_SHAREDCTYPES = True
49except ImportError:
50 HAS_SHAREDCTYPES = False
51
Antoine Pitroubcb39d42011-08-23 19:46:22 +020052try:
53 import msvcrt
54except ImportError:
55 msvcrt = None
56
Benjamin Petersone711caf2008-06-11 16:44:04 +000057#
58#
59#
60
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000061def latin(s):
62 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
Benjamin Petersone711caf2008-06-11 16:44:04 +000064#
65# Constants
66#
67
68LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000069#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71DELTA = 0.1
72CHECK_TIMINGS = False # making true makes tests take a lot longer
73 # and can sometimes cause some non-serious
74 # failures because some calls block a bit
75 # longer than expected
76if CHECK_TIMINGS:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
78else:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
80
81HAVE_GETVALUE = not getattr(_multiprocessing,
82 'HAVE_BROKEN_SEM_GETVALUE', False)
83
Jesse Noller6214edd2009-01-19 16:23:53 +000084WIN32 = (sys.platform == "win32")
85
Antoine Pitroubcb39d42011-08-23 19:46:22 +020086try:
87 MAXFD = os.sysconf("SC_OPEN_MAX")
88except:
89 MAXFD = 256
90
Benjamin Petersone711caf2008-06-11 16:44:04 +000091#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000092# Some tests require ctypes
93#
94
95try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000096 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000097except ImportError:
98 Structure = object
99 c_int = c_double = None
100
101#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102# Creates a wrapper for a function which records the time it takes to finish
103#
104
105class TimingWrapper(object):
106
107 def __init__(self, func):
108 self.func = func
109 self.elapsed = None
110
111 def __call__(self, *args, **kwds):
112 t = time.time()
113 try:
114 return self.func(*args, **kwds)
115 finally:
116 self.elapsed = time.time() - t
117
118#
119# Base class for test cases
120#
121
122class BaseTestCase(object):
123
124 ALLOWED_TYPES = ('processes', 'manager', 'threads')
125
126 def assertTimingAlmostEqual(self, a, b):
127 if CHECK_TIMINGS:
128 self.assertAlmostEqual(a, b, 1)
129
130 def assertReturnsIfImplemented(self, value, func, *args):
131 try:
132 res = func(*args)
133 except NotImplementedError:
134 pass
135 else:
136 return self.assertEqual(value, res)
137
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000138 # For the sanity of Windows users, rather than crashing or freezing in
139 # multiple ways.
140 def __reduce__(self, *args):
141 raise NotImplementedError("shouldn't try to pickle a test case")
142
143 __reduce_ex__ = __reduce__
144
Benjamin Petersone711caf2008-06-11 16:44:04 +0000145#
146# Return the value of a semaphore
147#
148
149def get_value(self):
150 try:
151 return self.get_value()
152 except AttributeError:
153 try:
154 return self._Semaphore__value
155 except AttributeError:
156 try:
157 return self._value
158 except AttributeError:
159 raise NotImplementedError
160
161#
162# Testcases
163#
164
165class _TestProcess(BaseTestCase):
166
167 ALLOWED_TYPES = ('processes', 'threads')
168
169 def test_current(self):
170 if self.TYPE == 'threads':
171 return
172
173 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175
176 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000177 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000178 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000180 self.assertEqual(current.ident, os.getpid())
181 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000183 @classmethod
184 def _test(cls, q, *args, **kwds):
185 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 q.put(args)
187 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000188 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000189 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191 q.put(current.pid)
192
193 def test_process(self):
194 q = self.Queue(1)
195 e = self.Event()
196 args = (q, 1, 2)
197 kwargs = {'hello':23, 'bye':2.54}
198 name = 'SomeProcess'
199 p = self.Process(
200 target=self._test, args=args, kwargs=kwargs, name=name
201 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203 current = self.current_process()
204
205 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000206 self.assertEqual(p.authkey, current.authkey)
207 self.assertEqual(p.is_alive(), False)
208 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000209 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000210 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212
213 p.start()
214
Ezio Melottib3aedd42010-11-20 19:04:17 +0000215 self.assertEqual(p.exitcode, None)
216 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000217 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218
Ezio Melottib3aedd42010-11-20 19:04:17 +0000219 self.assertEqual(q.get(), args[1:])
220 self.assertEqual(q.get(), kwargs)
221 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000223 self.assertEqual(q.get(), current.authkey)
224 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 p.join()
227
Ezio Melottib3aedd42010-11-20 19:04:17 +0000228 self.assertEqual(p.exitcode, 0)
229 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000230 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000232 @classmethod
233 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234 time.sleep(1000)
235
236 def test_terminate(self):
237 if self.TYPE == 'threads':
238 return
239
240 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242 p.start()
243
244 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000245 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000246 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
248 p.terminate()
249
250 join = TimingWrapper(p.join)
251 self.assertEqual(join(), None)
252 self.assertTimingAlmostEqual(join.elapsed, 0.0)
253
254 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256
257 p.join()
258
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000259 # XXX sometimes get p.exitcode == 0 on Windows ...
260 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261
262 def test_cpu_count(self):
263 try:
264 cpus = multiprocessing.cpu_count()
265 except NotImplementedError:
266 cpus = 1
267 self.assertTrue(type(cpus) is int)
268 self.assertTrue(cpus >= 1)
269
270 def test_active_children(self):
271 self.assertEqual(type(self.active_children()), list)
272
273 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000274 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275
Jesus Cea94f964f2011-09-09 20:26:57 +0200276 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000278 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279
280 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000281 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000283 @classmethod
284 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285 from multiprocessing import forking
286 wconn.send(id)
287 if len(id) < 2:
288 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000289 p = cls.Process(
290 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291 )
292 p.start()
293 p.join()
294
295 def test_recursion(self):
296 rconn, wconn = self.Pipe(duplex=False)
297 self._test_recursion(wconn, [])
298
299 time.sleep(DELTA)
300 result = []
301 while rconn.poll():
302 result.append(rconn.recv())
303
304 expected = [
305 [],
306 [0],
307 [0, 0],
308 [0, 1],
309 [1],
310 [1, 0],
311 [1, 1]
312 ]
313 self.assertEqual(result, expected)
314
315#
316#
317#
318
319class _UpperCaser(multiprocessing.Process):
320
321 def __init__(self):
322 multiprocessing.Process.__init__(self)
323 self.child_conn, self.parent_conn = multiprocessing.Pipe()
324
325 def run(self):
326 self.parent_conn.close()
327 for s in iter(self.child_conn.recv, None):
328 self.child_conn.send(s.upper())
329 self.child_conn.close()
330
331 def submit(self, s):
332 assert type(s) is str
333 self.parent_conn.send(s)
334 return self.parent_conn.recv()
335
336 def stop(self):
337 self.parent_conn.send(None)
338 self.parent_conn.close()
339 self.child_conn.close()
340
341class _TestSubclassingProcess(BaseTestCase):
342
343 ALLOWED_TYPES = ('processes',)
344
345 def test_subclassing(self):
346 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200347 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000348 uppercaser.start()
349 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
350 self.assertEqual(uppercaser.submit('world'), 'WORLD')
351 uppercaser.stop()
352 uppercaser.join()
353
354#
355#
356#
357
358def queue_empty(q):
359 if hasattr(q, 'empty'):
360 return q.empty()
361 else:
362 return q.qsize() == 0
363
364def queue_full(q, maxsize):
365 if hasattr(q, 'full'):
366 return q.full()
367 else:
368 return q.qsize() == maxsize
369
370
371class _TestQueue(BaseTestCase):
372
373
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000374 @classmethod
375 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000376 child_can_start.wait()
377 for i in range(6):
378 queue.get()
379 parent_can_continue.set()
380
381 def test_put(self):
382 MAXSIZE = 6
383 queue = self.Queue(maxsize=MAXSIZE)
384 child_can_start = self.Event()
385 parent_can_continue = self.Event()
386
387 proc = self.Process(
388 target=self._test_put,
389 args=(queue, child_can_start, parent_can_continue)
390 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000391 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000392 proc.start()
393
394 self.assertEqual(queue_empty(queue), True)
395 self.assertEqual(queue_full(queue, MAXSIZE), False)
396
397 queue.put(1)
398 queue.put(2, True)
399 queue.put(3, True, None)
400 queue.put(4, False)
401 queue.put(5, False, None)
402 queue.put_nowait(6)
403
404 # the values may be in buffer but not yet in pipe so sleep a bit
405 time.sleep(DELTA)
406
407 self.assertEqual(queue_empty(queue), False)
408 self.assertEqual(queue_full(queue, MAXSIZE), True)
409
410 put = TimingWrapper(queue.put)
411 put_nowait = TimingWrapper(queue.put_nowait)
412
413 self.assertRaises(pyqueue.Full, put, 7, False)
414 self.assertTimingAlmostEqual(put.elapsed, 0)
415
416 self.assertRaises(pyqueue.Full, put, 7, False, None)
417 self.assertTimingAlmostEqual(put.elapsed, 0)
418
419 self.assertRaises(pyqueue.Full, put_nowait, 7)
420 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
421
422 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
423 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
424
425 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
426 self.assertTimingAlmostEqual(put.elapsed, 0)
427
428 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
429 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
430
431 child_can_start.set()
432 parent_can_continue.wait()
433
434 self.assertEqual(queue_empty(queue), True)
435 self.assertEqual(queue_full(queue, MAXSIZE), False)
436
437 proc.join()
438
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000439 @classmethod
440 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000442 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000443 queue.put(2)
444 queue.put(3)
445 queue.put(4)
446 queue.put(5)
447 parent_can_continue.set()
448
449 def test_get(self):
450 queue = self.Queue()
451 child_can_start = self.Event()
452 parent_can_continue = self.Event()
453
454 proc = self.Process(
455 target=self._test_get,
456 args=(queue, child_can_start, parent_can_continue)
457 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000458 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000459 proc.start()
460
461 self.assertEqual(queue_empty(queue), True)
462
463 child_can_start.set()
464 parent_can_continue.wait()
465
466 time.sleep(DELTA)
467 self.assertEqual(queue_empty(queue), False)
468
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000469 # Hangs unexpectedly, remove for now
470 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000471 self.assertEqual(queue.get(True, None), 2)
472 self.assertEqual(queue.get(True), 3)
473 self.assertEqual(queue.get(timeout=1), 4)
474 self.assertEqual(queue.get_nowait(), 5)
475
476 self.assertEqual(queue_empty(queue), True)
477
478 get = TimingWrapper(queue.get)
479 get_nowait = TimingWrapper(queue.get_nowait)
480
481 self.assertRaises(pyqueue.Empty, get, False)
482 self.assertTimingAlmostEqual(get.elapsed, 0)
483
484 self.assertRaises(pyqueue.Empty, get, False, None)
485 self.assertTimingAlmostEqual(get.elapsed, 0)
486
487 self.assertRaises(pyqueue.Empty, get_nowait)
488 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
489
490 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
491 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
492
493 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
494 self.assertTimingAlmostEqual(get.elapsed, 0)
495
496 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
497 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
498
499 proc.join()
500
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000501 @classmethod
502 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000503 for i in range(10, 20):
504 queue.put(i)
505 # note that at this point the items may only be buffered, so the
506 # process cannot shutdown until the feeder thread has finished
507 # pushing items onto the pipe.
508
509 def test_fork(self):
510 # Old versions of Queue would fail to create a new feeder
511 # thread for a forked process if the original process had its
512 # own feeder thread. This test checks that this no longer
513 # happens.
514
515 queue = self.Queue()
516
517 # put items on queue so that main process starts a feeder thread
518 for i in range(10):
519 queue.put(i)
520
521 # wait to make sure thread starts before we fork a new process
522 time.sleep(DELTA)
523
524 # fork process
525 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200526 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000527 p.start()
528
529 # check that all expected items are in the queue
530 for i in range(20):
531 self.assertEqual(queue.get(), i)
532 self.assertRaises(pyqueue.Empty, queue.get, False)
533
534 p.join()
535
536 def test_qsize(self):
537 q = self.Queue()
538 try:
539 self.assertEqual(q.qsize(), 0)
540 except NotImplementedError:
541 return
542 q.put(1)
543 self.assertEqual(q.qsize(), 1)
544 q.put(5)
545 self.assertEqual(q.qsize(), 2)
546 q.get()
547 self.assertEqual(q.qsize(), 1)
548 q.get()
549 self.assertEqual(q.qsize(), 0)
550
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000551 @classmethod
552 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000553 for obj in iter(q.get, None):
554 time.sleep(DELTA)
555 q.task_done()
556
557 def test_task_done(self):
558 queue = self.JoinableQueue()
559
560 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000561 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562
563 workers = [self.Process(target=self._test_task_done, args=(queue,))
564 for i in range(4)]
565
566 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200567 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000568 p.start()
569
570 for i in range(10):
571 queue.put(i)
572
573 queue.join()
574
575 for p in workers:
576 queue.put(None)
577
578 for p in workers:
579 p.join()
580
581#
582#
583#
584
585class _TestLock(BaseTestCase):
586
587 def test_lock(self):
588 lock = self.Lock()
589 self.assertEqual(lock.acquire(), True)
590 self.assertEqual(lock.acquire(False), False)
591 self.assertEqual(lock.release(), None)
592 self.assertRaises((ValueError, threading.ThreadError), lock.release)
593
594 def test_rlock(self):
595 lock = self.RLock()
596 self.assertEqual(lock.acquire(), True)
597 self.assertEqual(lock.acquire(), True)
598 self.assertEqual(lock.acquire(), True)
599 self.assertEqual(lock.release(), None)
600 self.assertEqual(lock.release(), None)
601 self.assertEqual(lock.release(), None)
602 self.assertRaises((AssertionError, RuntimeError), lock.release)
603
Jesse Nollerf8d00852009-03-31 03:25:07 +0000604 def test_lock_context(self):
605 with self.Lock():
606 pass
607
Benjamin Petersone711caf2008-06-11 16:44:04 +0000608
609class _TestSemaphore(BaseTestCase):
610
611 def _test_semaphore(self, sem):
612 self.assertReturnsIfImplemented(2, get_value, sem)
613 self.assertEqual(sem.acquire(), True)
614 self.assertReturnsIfImplemented(1, get_value, sem)
615 self.assertEqual(sem.acquire(), True)
616 self.assertReturnsIfImplemented(0, get_value, sem)
617 self.assertEqual(sem.acquire(False), False)
618 self.assertReturnsIfImplemented(0, get_value, sem)
619 self.assertEqual(sem.release(), None)
620 self.assertReturnsIfImplemented(1, get_value, sem)
621 self.assertEqual(sem.release(), None)
622 self.assertReturnsIfImplemented(2, get_value, sem)
623
624 def test_semaphore(self):
625 sem = self.Semaphore(2)
626 self._test_semaphore(sem)
627 self.assertEqual(sem.release(), None)
628 self.assertReturnsIfImplemented(3, get_value, sem)
629 self.assertEqual(sem.release(), None)
630 self.assertReturnsIfImplemented(4, get_value, sem)
631
632 def test_bounded_semaphore(self):
633 sem = self.BoundedSemaphore(2)
634 self._test_semaphore(sem)
635 # Currently fails on OS/X
636 #if HAVE_GETVALUE:
637 # self.assertRaises(ValueError, sem.release)
638 # self.assertReturnsIfImplemented(2, get_value, sem)
639
640 def test_timeout(self):
641 if self.TYPE != 'processes':
642 return
643
644 sem = self.Semaphore(0)
645 acquire = TimingWrapper(sem.acquire)
646
647 self.assertEqual(acquire(False), False)
648 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
649
650 self.assertEqual(acquire(False, None), False)
651 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
652
653 self.assertEqual(acquire(False, TIMEOUT1), False)
654 self.assertTimingAlmostEqual(acquire.elapsed, 0)
655
656 self.assertEqual(acquire(True, TIMEOUT2), False)
657 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
658
659 self.assertEqual(acquire(timeout=TIMEOUT3), False)
660 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
661
662
663class _TestCondition(BaseTestCase):
664
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000665 @classmethod
666 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000667 cond.acquire()
668 sleeping.release()
669 cond.wait(timeout)
670 woken.release()
671 cond.release()
672
673 def check_invariant(self, cond):
674 # this is only supposed to succeed when there are no sleepers
675 if self.TYPE == 'processes':
676 try:
677 sleepers = (cond._sleeping_count.get_value() -
678 cond._woken_count.get_value())
679 self.assertEqual(sleepers, 0)
680 self.assertEqual(cond._wait_semaphore.get_value(), 0)
681 except NotImplementedError:
682 pass
683
684 def test_notify(self):
685 cond = self.Condition()
686 sleeping = self.Semaphore(0)
687 woken = self.Semaphore(0)
688
689 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000690 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000691 p.start()
692
693 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000694 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000695 p.start()
696
697 # wait for both children to start sleeping
698 sleeping.acquire()
699 sleeping.acquire()
700
701 # check no process/thread has woken up
702 time.sleep(DELTA)
703 self.assertReturnsIfImplemented(0, get_value, woken)
704
705 # wake up one process/thread
706 cond.acquire()
707 cond.notify()
708 cond.release()
709
710 # check one process/thread has woken up
711 time.sleep(DELTA)
712 self.assertReturnsIfImplemented(1, get_value, woken)
713
714 # wake up another
715 cond.acquire()
716 cond.notify()
717 cond.release()
718
719 # check other has woken up
720 time.sleep(DELTA)
721 self.assertReturnsIfImplemented(2, get_value, woken)
722
723 # check state is not mucked up
724 self.check_invariant(cond)
725 p.join()
726
727 def test_notify_all(self):
728 cond = self.Condition()
729 sleeping = self.Semaphore(0)
730 woken = self.Semaphore(0)
731
732 # start some threads/processes which will timeout
733 for i in range(3):
734 p = self.Process(target=self.f,
735 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000736 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000737 p.start()
738
739 t = threading.Thread(target=self.f,
740 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000741 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000742 t.start()
743
744 # wait for them all to sleep
745 for i in range(6):
746 sleeping.acquire()
747
748 # check they have all timed out
749 for i in range(6):
750 woken.acquire()
751 self.assertReturnsIfImplemented(0, get_value, woken)
752
753 # check state is not mucked up
754 self.check_invariant(cond)
755
756 # start some more threads/processes
757 for i in range(3):
758 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000759 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000760 p.start()
761
762 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000763 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000764 t.start()
765
766 # wait for them to all sleep
767 for i in range(6):
768 sleeping.acquire()
769
770 # check no process/thread has woken up
771 time.sleep(DELTA)
772 self.assertReturnsIfImplemented(0, get_value, woken)
773
774 # wake them all up
775 cond.acquire()
776 cond.notify_all()
777 cond.release()
778
779 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200780 for i in range(10):
781 try:
782 if get_value(woken) == 6:
783 break
784 except NotImplementedError:
785 break
786 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787 self.assertReturnsIfImplemented(6, get_value, woken)
788
789 # check state is not mucked up
790 self.check_invariant(cond)
791
792 def test_timeout(self):
793 cond = self.Condition()
794 wait = TimingWrapper(cond.wait)
795 cond.acquire()
796 res = wait(TIMEOUT1)
797 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000798 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000799 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
800
801
802class _TestEvent(BaseTestCase):
803
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000804 @classmethod
805 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000806 time.sleep(TIMEOUT2)
807 event.set()
808
809 def test_event(self):
810 event = self.Event()
811 wait = TimingWrapper(event.wait)
812
Ezio Melotti13925002011-03-16 11:05:33 +0200813 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000815 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816
Benjamin Peterson965ce872009-04-05 21:24:58 +0000817 # Removed, threading.Event.wait() will return the value of the __flag
818 # instead of None. API Shear with the semaphore backed mp.Event
819 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000820 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000821 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000822 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
823
824 event.set()
825
826 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000827 self.assertEqual(event.is_set(), True)
828 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000829 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000830 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
832 # self.assertEqual(event.is_set(), True)
833
834 event.clear()
835
836 #self.assertEqual(event.is_set(), False)
837
Jesus Cea94f964f2011-09-09 20:26:57 +0200838 p = self.Process(target=self._test_event, args=(event,))
839 p.daemon = True
840 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000841 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000842
843#
844#
845#
846
847class _TestValue(BaseTestCase):
848
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000849 ALLOWED_TYPES = ('processes',)
850
Benjamin Petersone711caf2008-06-11 16:44:04 +0000851 codes_values = [
852 ('i', 4343, 24234),
853 ('d', 3.625, -4.25),
854 ('h', -232, 234),
855 ('c', latin('x'), latin('y'))
856 ]
857
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000858 def setUp(self):
859 if not HAS_SHAREDCTYPES:
860 self.skipTest("requires multiprocessing.sharedctypes")
861
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000862 @classmethod
863 def _test(cls, values):
864 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000865 sv.value = cv[2]
866
867
868 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 if raw:
870 values = [self.RawValue(code, value)
871 for code, value, _ in self.codes_values]
872 else:
873 values = [self.Value(code, value)
874 for code, value, _ in self.codes_values]
875
876 for sv, cv in zip(values, self.codes_values):
877 self.assertEqual(sv.value, cv[1])
878
879 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200880 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881 proc.start()
882 proc.join()
883
884 for sv, cv in zip(values, self.codes_values):
885 self.assertEqual(sv.value, cv[2])
886
887 def test_rawvalue(self):
888 self.test_value(raw=True)
889
890 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891 val1 = self.Value('i', 5)
892 lock1 = val1.get_lock()
893 obj1 = val1.get_obj()
894
895 val2 = self.Value('i', 5, lock=None)
896 lock2 = val2.get_lock()
897 obj2 = val2.get_obj()
898
899 lock = self.Lock()
900 val3 = self.Value('i', 5, lock=lock)
901 lock3 = val3.get_lock()
902 obj3 = val3.get_obj()
903 self.assertEqual(lock, lock3)
904
Jesse Nollerb0516a62009-01-18 03:11:38 +0000905 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000906 self.assertFalse(hasattr(arr4, 'get_lock'))
907 self.assertFalse(hasattr(arr4, 'get_obj'))
908
Jesse Nollerb0516a62009-01-18 03:11:38 +0000909 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
910
911 arr5 = self.RawValue('i', 5)
912 self.assertFalse(hasattr(arr5, 'get_lock'))
913 self.assertFalse(hasattr(arr5, 'get_obj'))
914
Benjamin Petersone711caf2008-06-11 16:44:04 +0000915
916class _TestArray(BaseTestCase):
917
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000918 ALLOWED_TYPES = ('processes',)
919
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000920 @classmethod
921 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000922 for i in range(1, len(seq)):
923 seq[i] += seq[i-1]
924
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000925 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000926 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
928 if raw:
929 arr = self.RawArray('i', seq)
930 else:
931 arr = self.Array('i', seq)
932
933 self.assertEqual(len(arr), len(seq))
934 self.assertEqual(arr[3], seq[3])
935 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
936
937 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
938
939 self.assertEqual(list(arr[:]), seq)
940
941 self.f(seq)
942
943 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200944 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000945 p.start()
946 p.join()
947
948 self.assertEqual(list(arr[:]), seq)
949
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000950 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000951 def test_array_from_size(self):
952 size = 10
953 # Test for zeroing (see issue #11675).
954 # The repetition below strengthens the test by increasing the chances
955 # of previously allocated non-zero memory being used for the new array
956 # on the 2nd and 3rd loops.
957 for _ in range(3):
958 arr = self.Array('i', size)
959 self.assertEqual(len(arr), size)
960 self.assertEqual(list(arr), [0] * size)
961 arr[:] = range(10)
962 self.assertEqual(list(arr), list(range(10)))
963 del arr
964
965 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000966 def test_rawarray(self):
967 self.test_array(raw=True)
968
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000969 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 arr1 = self.Array('i', list(range(10)))
972 lock1 = arr1.get_lock()
973 obj1 = arr1.get_obj()
974
975 arr2 = self.Array('i', list(range(10)), lock=None)
976 lock2 = arr2.get_lock()
977 obj2 = arr2.get_obj()
978
979 lock = self.Lock()
980 arr3 = self.Array('i', list(range(10)), lock=lock)
981 lock3 = arr3.get_lock()
982 obj3 = arr3.get_obj()
983 self.assertEqual(lock, lock3)
984
Jesse Nollerb0516a62009-01-18 03:11:38 +0000985 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000986 self.assertFalse(hasattr(arr4, 'get_lock'))
987 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000988 self.assertRaises(AttributeError,
989 self.Array, 'i', range(10), lock='notalock')
990
991 arr5 = self.RawArray('i', range(10))
992 self.assertFalse(hasattr(arr5, 'get_lock'))
993 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000994
995#
996#
997#
998
999class _TestContainers(BaseTestCase):
1000
1001 ALLOWED_TYPES = ('manager',)
1002
1003 def test_list(self):
1004 a = self.list(list(range(10)))
1005 self.assertEqual(a[:], list(range(10)))
1006
1007 b = self.list()
1008 self.assertEqual(b[:], [])
1009
1010 b.extend(list(range(5)))
1011 self.assertEqual(b[:], list(range(5)))
1012
1013 self.assertEqual(b[2], 2)
1014 self.assertEqual(b[2:10], [2,3,4])
1015
1016 b *= 2
1017 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1018
1019 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1020
1021 self.assertEqual(a[:], list(range(10)))
1022
1023 d = [a, b]
1024 e = self.list(d)
1025 self.assertEqual(
1026 e[:],
1027 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1028 )
1029
1030 f = self.list([a])
1031 a.append('hello')
1032 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1033
1034 def test_dict(self):
1035 d = self.dict()
1036 indices = list(range(65, 70))
1037 for i in indices:
1038 d[i] = chr(i)
1039 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1040 self.assertEqual(sorted(d.keys()), indices)
1041 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1042 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1043
1044 def test_namespace(self):
1045 n = self.Namespace()
1046 n.name = 'Bob'
1047 n.job = 'Builder'
1048 n._hidden = 'hidden'
1049 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1050 del n.job
1051 self.assertEqual(str(n), "Namespace(name='Bob')")
1052 self.assertTrue(hasattr(n, 'name'))
1053 self.assertTrue(not hasattr(n, 'job'))
1054
1055#
1056#
1057#
1058
1059def sqr(x, wait=0.0):
1060 time.sleep(wait)
1061 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001062
Benjamin Petersone711caf2008-06-11 16:44:04 +00001063class _TestPool(BaseTestCase):
1064
1065 def test_apply(self):
1066 papply = self.pool.apply
1067 self.assertEqual(papply(sqr, (5,)), sqr(5))
1068 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1069
1070 def test_map(self):
1071 pmap = self.pool.map
1072 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1073 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1074 list(map(sqr, list(range(100)))))
1075
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001076 def test_map_chunksize(self):
1077 try:
1078 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1079 except multiprocessing.TimeoutError:
1080 self.fail("pool.map_async with chunksize stalled on null list")
1081
Benjamin Petersone711caf2008-06-11 16:44:04 +00001082 def test_async(self):
1083 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1084 get = TimingWrapper(res.get)
1085 self.assertEqual(get(), 49)
1086 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1087
1088 def test_async_timeout(self):
1089 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1090 get = TimingWrapper(res.get)
1091 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1092 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1093
1094 def test_imap(self):
1095 it = self.pool.imap(sqr, list(range(10)))
1096 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1097
1098 it = self.pool.imap(sqr, list(range(10)))
1099 for i in range(10):
1100 self.assertEqual(next(it), i*i)
1101 self.assertRaises(StopIteration, it.__next__)
1102
1103 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1104 for i in range(1000):
1105 self.assertEqual(next(it), i*i)
1106 self.assertRaises(StopIteration, it.__next__)
1107
1108 def test_imap_unordered(self):
1109 it = self.pool.imap_unordered(sqr, list(range(1000)))
1110 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1111
1112 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1113 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1114
1115 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001116 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1117 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1118
Benjamin Petersone711caf2008-06-11 16:44:04 +00001119 p = multiprocessing.Pool(3)
1120 self.assertEqual(3, len(p._pool))
1121 p.close()
1122 p.join()
1123
1124 def test_terminate(self):
1125 if self.TYPE == 'manager':
1126 # On Unix a forked process increfs each shared object to
1127 # which its parent process held a reference. If the
1128 # forked process gets terminated then there is likely to
1129 # be a reference leak. So to prevent
1130 # _TestZZZNumberOfObjects from failing we skip this test
1131 # when using a manager.
1132 return
1133
1134 result = self.pool.map_async(
1135 time.sleep, [0.1 for i in range(10000)], chunksize=1
1136 )
1137 self.pool.terminate()
1138 join = TimingWrapper(self.pool.join)
1139 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001140 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001141
Ask Solem2afcbf22010-11-09 20:55:52 +00001142def raising():
1143 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001144
Ask Solem2afcbf22010-11-09 20:55:52 +00001145def unpickleable_result():
1146 return lambda: 42
1147
1148class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001149 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001150
1151 def test_async_error_callback(self):
1152 p = multiprocessing.Pool(2)
1153
1154 scratchpad = [None]
1155 def errback(exc):
1156 scratchpad[0] = exc
1157
1158 res = p.apply_async(raising, error_callback=errback)
1159 self.assertRaises(KeyError, res.get)
1160 self.assertTrue(scratchpad[0])
1161 self.assertIsInstance(scratchpad[0], KeyError)
1162
1163 p.close()
1164 p.join()
1165
1166 def test_unpickleable_result(self):
1167 from multiprocessing.pool import MaybeEncodingError
1168 p = multiprocessing.Pool(2)
1169
1170 # Make sure we don't lose pool processes because of encoding errors.
1171 for iteration in range(20):
1172
1173 scratchpad = [None]
1174 def errback(exc):
1175 scratchpad[0] = exc
1176
1177 res = p.apply_async(unpickleable_result, error_callback=errback)
1178 self.assertRaises(MaybeEncodingError, res.get)
1179 wrapped = scratchpad[0]
1180 self.assertTrue(wrapped)
1181 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1182 self.assertIsNotNone(wrapped.exc)
1183 self.assertIsNotNone(wrapped.value)
1184
1185 p.close()
1186 p.join()
1187
1188class _TestPoolWorkerLifetime(BaseTestCase):
1189 ALLOWED_TYPES = ('processes', )
1190
Jesse Noller1f0b6582010-01-27 03:36:01 +00001191 def test_pool_worker_lifetime(self):
1192 p = multiprocessing.Pool(3, maxtasksperchild=10)
1193 self.assertEqual(3, len(p._pool))
1194 origworkerpids = [w.pid for w in p._pool]
1195 # Run many tasks so each worker gets replaced (hopefully)
1196 results = []
1197 for i in range(100):
1198 results.append(p.apply_async(sqr, (i, )))
1199 # Fetch the results and verify we got the right answers,
1200 # also ensuring all the tasks have completed.
1201 for (j, res) in enumerate(results):
1202 self.assertEqual(res.get(), sqr(j))
1203 # Refill the pool
1204 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001205 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001206 # (countdown * DELTA = 5 seconds max startup process time)
1207 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001208 while countdown and not all(w.is_alive() for w in p._pool):
1209 countdown -= 1
1210 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001211 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001212 # All pids should be assigned. See issue #7805.
1213 self.assertNotIn(None, origworkerpids)
1214 self.assertNotIn(None, finalworkerpids)
1215 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001216 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1217 p.close()
1218 p.join()
1219
Charles-François Natalif8859e12011-10-24 18:45:29 +02001220 def test_pool_worker_lifetime_early_close(self):
1221 # Issue #10332: closing a pool whose workers have limited lifetimes
1222 # before all the tasks completed would make join() hang.
1223 p = multiprocessing.Pool(3, maxtasksperchild=1)
1224 results = []
1225 for i in range(6):
1226 results.append(p.apply_async(sqr, (i, 0.3)))
1227 p.close()
1228 p.join()
1229 # check the results
1230 for (j, res) in enumerate(results):
1231 self.assertEqual(res.get(), sqr(j))
1232
1233
Benjamin Petersone711caf2008-06-11 16:44:04 +00001234#
1235# Test that manager has expected number of shared objects left
1236#
1237
1238class _TestZZZNumberOfObjects(BaseTestCase):
1239 # Because test cases are sorted alphabetically, this one will get
1240 # run after all the other tests for the manager. It tests that
1241 # there have been no "reference leaks" for the manager's shared
1242 # objects. Note the comment in _TestPool.test_terminate().
1243 ALLOWED_TYPES = ('manager',)
1244
1245 def test_number_of_objects(self):
1246 EXPECTED_NUMBER = 1 # the pool object is still alive
1247 multiprocessing.active_children() # discard dead process objs
1248 gc.collect() # do garbage collection
1249 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001250 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001251 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001252 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001253 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001254
1255 self.assertEqual(refs, EXPECTED_NUMBER)
1256
1257#
1258# Test of creating a customized manager class
1259#
1260
1261from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1262
1263class FooBar(object):
1264 def f(self):
1265 return 'f()'
1266 def g(self):
1267 raise ValueError
1268 def _h(self):
1269 return '_h()'
1270
1271def baz():
1272 for i in range(10):
1273 yield i*i
1274
1275class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001276 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001277 def __iter__(self):
1278 return self
1279 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001280 return self._callmethod('__next__')
1281
1282class MyManager(BaseManager):
1283 pass
1284
1285MyManager.register('Foo', callable=FooBar)
1286MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1287MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1288
1289
1290class _TestMyManager(BaseTestCase):
1291
1292 ALLOWED_TYPES = ('manager',)
1293
1294 def test_mymanager(self):
1295 manager = MyManager()
1296 manager.start()
1297
1298 foo = manager.Foo()
1299 bar = manager.Bar()
1300 baz = manager.baz()
1301
1302 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1303 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1304
1305 self.assertEqual(foo_methods, ['f', 'g'])
1306 self.assertEqual(bar_methods, ['f', '_h'])
1307
1308 self.assertEqual(foo.f(), 'f()')
1309 self.assertRaises(ValueError, foo.g)
1310 self.assertEqual(foo._callmethod('f'), 'f()')
1311 self.assertRaises(RemoteError, foo._callmethod, '_h')
1312
1313 self.assertEqual(bar.f(), 'f()')
1314 self.assertEqual(bar._h(), '_h()')
1315 self.assertEqual(bar._callmethod('f'), 'f()')
1316 self.assertEqual(bar._callmethod('_h'), '_h()')
1317
1318 self.assertEqual(list(baz), [i*i for i in range(10)])
1319
1320 manager.shutdown()
1321
1322#
1323# Test of connecting to a remote server and using xmlrpclib for serialization
1324#
1325
1326_queue = pyqueue.Queue()
1327def get_queue():
1328 return _queue
1329
1330class QueueManager(BaseManager):
1331 '''manager class used by server process'''
1332QueueManager.register('get_queue', callable=get_queue)
1333
1334class QueueManager2(BaseManager):
1335 '''manager class which specifies the same interface as QueueManager'''
1336QueueManager2.register('get_queue')
1337
1338
1339SERIALIZER = 'xmlrpclib'
1340
1341class _TestRemoteManager(BaseTestCase):
1342
1343 ALLOWED_TYPES = ('manager',)
1344
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001345 @classmethod
1346 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001347 manager = QueueManager2(
1348 address=address, authkey=authkey, serializer=SERIALIZER
1349 )
1350 manager.connect()
1351 queue = manager.get_queue()
1352 queue.put(('hello world', None, True, 2.25))
1353
1354 def test_remote(self):
1355 authkey = os.urandom(32)
1356
1357 manager = QueueManager(
1358 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1359 )
1360 manager.start()
1361
1362 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001363 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001364 p.start()
1365
1366 manager2 = QueueManager2(
1367 address=manager.address, authkey=authkey, serializer=SERIALIZER
1368 )
1369 manager2.connect()
1370 queue = manager2.get_queue()
1371
1372 # Note that xmlrpclib will deserialize object as a list not a tuple
1373 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1374
1375 # Because we are using xmlrpclib for serialization instead of
1376 # pickle this will cause a serialization error.
1377 self.assertRaises(Exception, queue.put, time.sleep)
1378
1379 # Make queue finalizer run before the server is stopped
1380 del queue
1381 manager.shutdown()
1382
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001383class _TestManagerRestart(BaseTestCase):
1384
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001385 @classmethod
1386 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001387 manager = QueueManager(
1388 address=address, authkey=authkey, serializer=SERIALIZER)
1389 manager.connect()
1390 queue = manager.get_queue()
1391 queue.put('hello world')
1392
1393 def test_rapid_restart(self):
1394 authkey = os.urandom(32)
1395 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001396 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001397 srvr = manager.get_server()
1398 addr = srvr.address
1399 # Close the connection.Listener socket which gets opened as a part
1400 # of manager.get_server(). It's not needed for the test.
1401 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001402 manager.start()
1403
1404 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001405 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001406 p.start()
1407 queue = manager.get_queue()
1408 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001409 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001410 manager.shutdown()
1411 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001412 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001413 try:
1414 manager.start()
1415 except IOError as e:
1416 if e.errno != errno.EADDRINUSE:
1417 raise
1418 # Retry after some time, in case the old socket was lingering
1419 # (sporadic failure on buildbots)
1420 time.sleep(1.0)
1421 manager = QueueManager(
1422 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001423 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001424
Benjamin Petersone711caf2008-06-11 16:44:04 +00001425#
1426#
1427#
1428
1429SENTINEL = latin('')
1430
1431class _TestConnection(BaseTestCase):
1432
1433 ALLOWED_TYPES = ('processes', 'threads')
1434
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001435 @classmethod
1436 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001437 for msg in iter(conn.recv_bytes, SENTINEL):
1438 conn.send_bytes(msg)
1439 conn.close()
1440
1441 def test_connection(self):
1442 conn, child_conn = self.Pipe()
1443
1444 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001445 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001446 p.start()
1447
1448 seq = [1, 2.25, None]
1449 msg = latin('hello world')
1450 longmsg = msg * 10
1451 arr = array.array('i', list(range(4)))
1452
1453 if self.TYPE == 'processes':
1454 self.assertEqual(type(conn.fileno()), int)
1455
1456 self.assertEqual(conn.send(seq), None)
1457 self.assertEqual(conn.recv(), seq)
1458
1459 self.assertEqual(conn.send_bytes(msg), None)
1460 self.assertEqual(conn.recv_bytes(), msg)
1461
1462 if self.TYPE == 'processes':
1463 buffer = array.array('i', [0]*10)
1464 expected = list(arr) + [0] * (10 - len(arr))
1465 self.assertEqual(conn.send_bytes(arr), None)
1466 self.assertEqual(conn.recv_bytes_into(buffer),
1467 len(arr) * buffer.itemsize)
1468 self.assertEqual(list(buffer), expected)
1469
1470 buffer = array.array('i', [0]*10)
1471 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1472 self.assertEqual(conn.send_bytes(arr), None)
1473 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1474 len(arr) * buffer.itemsize)
1475 self.assertEqual(list(buffer), expected)
1476
1477 buffer = bytearray(latin(' ' * 40))
1478 self.assertEqual(conn.send_bytes(longmsg), None)
1479 try:
1480 res = conn.recv_bytes_into(buffer)
1481 except multiprocessing.BufferTooShort as e:
1482 self.assertEqual(e.args, (longmsg,))
1483 else:
1484 self.fail('expected BufferTooShort, got %s' % res)
1485
1486 poll = TimingWrapper(conn.poll)
1487
1488 self.assertEqual(poll(), False)
1489 self.assertTimingAlmostEqual(poll.elapsed, 0)
1490
1491 self.assertEqual(poll(TIMEOUT1), False)
1492 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1493
1494 conn.send(None)
1495
1496 self.assertEqual(poll(TIMEOUT1), True)
1497 self.assertTimingAlmostEqual(poll.elapsed, 0)
1498
1499 self.assertEqual(conn.recv(), None)
1500
1501 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1502 conn.send_bytes(really_big_msg)
1503 self.assertEqual(conn.recv_bytes(), really_big_msg)
1504
1505 conn.send_bytes(SENTINEL) # tell child to quit
1506 child_conn.close()
1507
1508 if self.TYPE == 'processes':
1509 self.assertEqual(conn.readable, True)
1510 self.assertEqual(conn.writable, True)
1511 self.assertRaises(EOFError, conn.recv)
1512 self.assertRaises(EOFError, conn.recv_bytes)
1513
1514 p.join()
1515
1516 def test_duplex_false(self):
1517 reader, writer = self.Pipe(duplex=False)
1518 self.assertEqual(writer.send(1), None)
1519 self.assertEqual(reader.recv(), 1)
1520 if self.TYPE == 'processes':
1521 self.assertEqual(reader.readable, True)
1522 self.assertEqual(reader.writable, False)
1523 self.assertEqual(writer.readable, False)
1524 self.assertEqual(writer.writable, True)
1525 self.assertRaises(IOError, reader.send, 2)
1526 self.assertRaises(IOError, writer.recv)
1527 self.assertRaises(IOError, writer.poll)
1528
1529 def test_spawn_close(self):
1530 # We test that a pipe connection can be closed by parent
1531 # process immediately after child is spawned. On Windows this
1532 # would have sometimes failed on old versions because
1533 # child_conn would be closed before the child got a chance to
1534 # duplicate it.
1535 conn, child_conn = self.Pipe()
1536
1537 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001538 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001539 p.start()
1540 child_conn.close() # this might complete before child initializes
1541
1542 msg = latin('hello')
1543 conn.send_bytes(msg)
1544 self.assertEqual(conn.recv_bytes(), msg)
1545
1546 conn.send_bytes(SENTINEL)
1547 conn.close()
1548 p.join()
1549
1550 def test_sendbytes(self):
1551 if self.TYPE != 'processes':
1552 return
1553
1554 msg = latin('abcdefghijklmnopqrstuvwxyz')
1555 a, b = self.Pipe()
1556
1557 a.send_bytes(msg)
1558 self.assertEqual(b.recv_bytes(), msg)
1559
1560 a.send_bytes(msg, 5)
1561 self.assertEqual(b.recv_bytes(), msg[5:])
1562
1563 a.send_bytes(msg, 7, 8)
1564 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1565
1566 a.send_bytes(msg, 26)
1567 self.assertEqual(b.recv_bytes(), latin(''))
1568
1569 a.send_bytes(msg, 26, 0)
1570 self.assertEqual(b.recv_bytes(), latin(''))
1571
1572 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1573
1574 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1575
1576 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1577
1578 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1579
1580 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1581
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001582 @classmethod
1583 def _is_fd_assigned(cls, fd):
1584 try:
1585 os.fstat(fd)
1586 except OSError as e:
1587 if e.errno == errno.EBADF:
1588 return False
1589 raise
1590 else:
1591 return True
1592
1593 @classmethod
1594 def _writefd(cls, conn, data, create_dummy_fds=False):
1595 if create_dummy_fds:
1596 for i in range(0, 256):
1597 if not cls._is_fd_assigned(i):
1598 os.dup2(conn.fileno(), i)
1599 fd = reduction.recv_handle(conn)
1600 if msvcrt:
1601 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1602 os.write(fd, data)
1603 os.close(fd)
1604
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001605 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001606 def test_fd_transfer(self):
1607 if self.TYPE != 'processes':
1608 self.skipTest("only makes sense with processes")
1609 conn, child_conn = self.Pipe(duplex=True)
1610
1611 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001612 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001613 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001614 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001615 with open(test.support.TESTFN, "wb") as f:
1616 fd = f.fileno()
1617 if msvcrt:
1618 fd = msvcrt.get_osfhandle(fd)
1619 reduction.send_handle(conn, fd, p.pid)
1620 p.join()
1621 with open(test.support.TESTFN, "rb") as f:
1622 self.assertEqual(f.read(), b"foo")
1623
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001624 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001625 @unittest.skipIf(sys.platform == "win32",
1626 "test semantics don't make sense on Windows")
1627 @unittest.skipIf(MAXFD <= 256,
1628 "largest assignable fd number is too small")
1629 @unittest.skipUnless(hasattr(os, "dup2"),
1630 "test needs os.dup2()")
1631 def test_large_fd_transfer(self):
1632 # With fd > 256 (issue #11657)
1633 if self.TYPE != 'processes':
1634 self.skipTest("only makes sense with processes")
1635 conn, child_conn = self.Pipe(duplex=True)
1636
1637 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001638 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001639 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001640 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001641 with open(test.support.TESTFN, "wb") as f:
1642 fd = f.fileno()
1643 for newfd in range(256, MAXFD):
1644 if not self._is_fd_assigned(newfd):
1645 break
1646 else:
1647 self.fail("could not find an unassigned large file descriptor")
1648 os.dup2(fd, newfd)
1649 try:
1650 reduction.send_handle(conn, newfd, p.pid)
1651 finally:
1652 os.close(newfd)
1653 p.join()
1654 with open(test.support.TESTFN, "rb") as f:
1655 self.assertEqual(f.read(), b"bar")
1656
Jesus Cea4507e642011-09-21 03:53:25 +02001657 @classmethod
1658 def _send_data_without_fd(self, conn):
1659 os.write(conn.fileno(), b"\0")
1660
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001661 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001662 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1663 def test_missing_fd_transfer(self):
1664 # Check that exception is raised when received data is not
1665 # accompanied by a file descriptor in ancillary data.
1666 if self.TYPE != 'processes':
1667 self.skipTest("only makes sense with processes")
1668 conn, child_conn = self.Pipe(duplex=True)
1669
1670 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1671 p.daemon = True
1672 p.start()
1673 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1674 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001675
Benjamin Petersone711caf2008-06-11 16:44:04 +00001676class _TestListenerClient(BaseTestCase):
1677
1678 ALLOWED_TYPES = ('processes', 'threads')
1679
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001680 @classmethod
1681 def _test(cls, address):
1682 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001683 conn.send('hello')
1684 conn.close()
1685
1686 def test_listener_client(self):
1687 for family in self.connection.families:
1688 l = self.connection.Listener(family=family)
1689 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001690 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001691 p.start()
1692 conn = l.accept()
1693 self.assertEqual(conn.recv(), 'hello')
1694 p.join()
1695 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001696#
1697# Test of sending connection and socket objects between processes
1698#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001699"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001700class _TestPicklingConnections(BaseTestCase):
1701
1702 ALLOWED_TYPES = ('processes',)
1703
1704 def _listener(self, conn, families):
1705 for fam in families:
1706 l = self.connection.Listener(family=fam)
1707 conn.send(l.address)
1708 new_conn = l.accept()
1709 conn.send(new_conn)
1710
1711 if self.TYPE == 'processes':
1712 l = socket.socket()
1713 l.bind(('localhost', 0))
1714 conn.send(l.getsockname())
1715 l.listen(1)
1716 new_conn, addr = l.accept()
1717 conn.send(new_conn)
1718
1719 conn.recv()
1720
1721 def _remote(self, conn):
1722 for (address, msg) in iter(conn.recv, None):
1723 client = self.connection.Client(address)
1724 client.send(msg.upper())
1725 client.close()
1726
1727 if self.TYPE == 'processes':
1728 address, msg = conn.recv()
1729 client = socket.socket()
1730 client.connect(address)
1731 client.sendall(msg.upper())
1732 client.close()
1733
1734 conn.close()
1735
1736 def test_pickling(self):
1737 try:
1738 multiprocessing.allow_connection_pickling()
1739 except ImportError:
1740 return
1741
1742 families = self.connection.families
1743
1744 lconn, lconn0 = self.Pipe()
1745 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001746 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001747 lp.start()
1748 lconn0.close()
1749
1750 rconn, rconn0 = self.Pipe()
1751 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001752 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001753 rp.start()
1754 rconn0.close()
1755
1756 for fam in families:
1757 msg = ('This connection uses family %s' % fam).encode('ascii')
1758 address = lconn.recv()
1759 rconn.send((address, msg))
1760 new_conn = lconn.recv()
1761 self.assertEqual(new_conn.recv(), msg.upper())
1762
1763 rconn.send(None)
1764
1765 if self.TYPE == 'processes':
1766 msg = latin('This connection uses a normal socket')
1767 address = lconn.recv()
1768 rconn.send((address, msg))
1769 if hasattr(socket, 'fromfd'):
1770 new_conn = lconn.recv()
1771 self.assertEqual(new_conn.recv(100), msg.upper())
1772 else:
1773 # XXX On Windows with Py2.6 need to backport fromfd()
1774 discard = lconn.recv_bytes()
1775
1776 lconn.send(None)
1777
1778 rconn.close()
1779 lconn.close()
1780
1781 lp.join()
1782 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001783"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001784#
1785#
1786#
1787
1788class _TestHeap(BaseTestCase):
1789
1790 ALLOWED_TYPES = ('processes',)
1791
1792 def test_heap(self):
1793 iterations = 5000
1794 maxblocks = 50
1795 blocks = []
1796
1797 # create and destroy lots of blocks of different sizes
1798 for i in range(iterations):
1799 size = int(random.lognormvariate(0, 1) * 1000)
1800 b = multiprocessing.heap.BufferWrapper(size)
1801 blocks.append(b)
1802 if len(blocks) > maxblocks:
1803 i = random.randrange(maxblocks)
1804 del blocks[i]
1805
1806 # get the heap object
1807 heap = multiprocessing.heap.BufferWrapper._heap
1808
1809 # verify the state of the heap
1810 all = []
1811 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001812 heap._lock.acquire()
1813 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001814 for L in list(heap._len_to_seq.values()):
1815 for arena, start, stop in L:
1816 all.append((heap._arenas.index(arena), start, stop,
1817 stop-start, 'free'))
1818 for arena, start, stop in heap._allocated_blocks:
1819 all.append((heap._arenas.index(arena), start, stop,
1820 stop-start, 'occupied'))
1821 occupied += (stop-start)
1822
1823 all.sort()
1824
1825 for i in range(len(all)-1):
1826 (arena, start, stop) = all[i][:3]
1827 (narena, nstart, nstop) = all[i+1][:3]
1828 self.assertTrue((arena != narena and nstart == 0) or
1829 (stop == nstart))
1830
Charles-François Natali778db492011-07-02 14:35:49 +02001831 def test_free_from_gc(self):
1832 # Check that freeing of blocks by the garbage collector doesn't deadlock
1833 # (issue #12352).
1834 # Make sure the GC is enabled, and set lower collection thresholds to
1835 # make collections more frequent (and increase the probability of
1836 # deadlock).
1837 if not gc.isenabled():
1838 gc.enable()
1839 self.addCleanup(gc.disable)
1840 thresholds = gc.get_threshold()
1841 self.addCleanup(gc.set_threshold, *thresholds)
1842 gc.set_threshold(10)
1843
1844 # perform numerous block allocations, with cyclic references to make
1845 # sure objects are collected asynchronously by the gc
1846 for i in range(5000):
1847 a = multiprocessing.heap.BufferWrapper(1)
1848 b = multiprocessing.heap.BufferWrapper(1)
1849 # circular references
1850 a.buddy = b
1851 b.buddy = a
1852
Benjamin Petersone711caf2008-06-11 16:44:04 +00001853#
1854#
1855#
1856
Benjamin Petersone711caf2008-06-11 16:44:04 +00001857class _Foo(Structure):
1858 _fields_ = [
1859 ('x', c_int),
1860 ('y', c_double)
1861 ]
1862
1863class _TestSharedCTypes(BaseTestCase):
1864
1865 ALLOWED_TYPES = ('processes',)
1866
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001867 def setUp(self):
1868 if not HAS_SHAREDCTYPES:
1869 self.skipTest("requires multiprocessing.sharedctypes")
1870
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001871 @classmethod
1872 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001873 x.value *= 2
1874 y.value *= 2
1875 foo.x *= 2
1876 foo.y *= 2
1877 string.value *= 2
1878 for i in range(len(arr)):
1879 arr[i] *= 2
1880
1881 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001882 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001883 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001884 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001885 arr = self.Array('d', list(range(10)), lock=lock)
1886 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001887 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001888
1889 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001890 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001891 p.start()
1892 p.join()
1893
1894 self.assertEqual(x.value, 14)
1895 self.assertAlmostEqual(y.value, 2.0/3.0)
1896 self.assertEqual(foo.x, 6)
1897 self.assertAlmostEqual(foo.y, 4.0)
1898 for i in range(10):
1899 self.assertAlmostEqual(arr[i], i*2)
1900 self.assertEqual(string.value, latin('hellohello'))
1901
1902 def test_synchronize(self):
1903 self.test_sharedctypes(lock=True)
1904
1905 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001906 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001907 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001908 foo.x = 0
1909 foo.y = 0
1910 self.assertEqual(bar.x, 2)
1911 self.assertAlmostEqual(bar.y, 5.0)
1912
1913#
1914#
1915#
1916
1917class _TestFinalize(BaseTestCase):
1918
1919 ALLOWED_TYPES = ('processes',)
1920
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001921 @classmethod
1922 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001923 class Foo(object):
1924 pass
1925
1926 a = Foo()
1927 util.Finalize(a, conn.send, args=('a',))
1928 del a # triggers callback for a
1929
1930 b = Foo()
1931 close_b = util.Finalize(b, conn.send, args=('b',))
1932 close_b() # triggers callback for b
1933 close_b() # does nothing because callback has already been called
1934 del b # does nothing because callback has already been called
1935
1936 c = Foo()
1937 util.Finalize(c, conn.send, args=('c',))
1938
1939 d10 = Foo()
1940 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1941
1942 d01 = Foo()
1943 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1944 d02 = Foo()
1945 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1946 d03 = Foo()
1947 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1948
1949 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1950
1951 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1952
Ezio Melotti13925002011-03-16 11:05:33 +02001953 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001954 # garbage collecting locals
1955 util._exit_function()
1956 conn.close()
1957 os._exit(0)
1958
1959 def test_finalize(self):
1960 conn, child_conn = self.Pipe()
1961
1962 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001963 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001964 p.start()
1965 p.join()
1966
1967 result = [obj for obj in iter(conn.recv, 'STOP')]
1968 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1969
1970#
1971# Test that from ... import * works for each module
1972#
1973
1974class _TestImportStar(BaseTestCase):
1975
1976 ALLOWED_TYPES = ('processes',)
1977
1978 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001979 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001980 'multiprocessing', 'multiprocessing.connection',
1981 'multiprocessing.heap', 'multiprocessing.managers',
1982 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001983 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001984 ]
1985
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001986 if HAS_REDUCTION:
1987 modules.append('multiprocessing.reduction')
1988
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001989 if c_int is not None:
1990 # This module requires _ctypes
1991 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001992
1993 for name in modules:
1994 __import__(name)
1995 mod = sys.modules[name]
1996
1997 for attr in getattr(mod, '__all__', ()):
1998 self.assertTrue(
1999 hasattr(mod, attr),
2000 '%r does not have attribute %r' % (mod, attr)
2001 )
2002
2003#
2004# Quick test that logging works -- does not test logging output
2005#
2006
2007class _TestLogging(BaseTestCase):
2008
2009 ALLOWED_TYPES = ('processes',)
2010
2011 def test_enable_logging(self):
2012 logger = multiprocessing.get_logger()
2013 logger.setLevel(util.SUBWARNING)
2014 self.assertTrue(logger is not None)
2015 logger.debug('this will not be printed')
2016 logger.info('nor will this')
2017 logger.setLevel(LOG_LEVEL)
2018
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002019 @classmethod
2020 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002021 logger = multiprocessing.get_logger()
2022 conn.send(logger.getEffectiveLevel())
2023
2024 def test_level(self):
2025 LEVEL1 = 32
2026 LEVEL2 = 37
2027
2028 logger = multiprocessing.get_logger()
2029 root_logger = logging.getLogger()
2030 root_level = root_logger.level
2031
2032 reader, writer = multiprocessing.Pipe(duplex=False)
2033
2034 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002035 p = self.Process(target=self._test_level, args=(writer,))
2036 p.daemon = True
2037 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002038 self.assertEqual(LEVEL1, reader.recv())
2039
2040 logger.setLevel(logging.NOTSET)
2041 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002042 p = self.Process(target=self._test_level, args=(writer,))
2043 p.daemon = True
2044 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002045 self.assertEqual(LEVEL2, reader.recv())
2046
2047 root_logger.setLevel(root_level)
2048 logger.setLevel(level=LOG_LEVEL)
2049
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002050
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002051# class _TestLoggingProcessName(BaseTestCase):
2052#
2053# def handle(self, record):
2054# assert record.processName == multiprocessing.current_process().name
2055# self.__handled = True
2056#
2057# def test_logging(self):
2058# handler = logging.Handler()
2059# handler.handle = self.handle
2060# self.__handled = False
2061# # Bypass getLogger() and side-effects
2062# logger = logging.getLoggerClass()(
2063# 'multiprocessing.test.TestLoggingProcessName')
2064# logger.addHandler(handler)
2065# logger.propagate = False
2066#
2067# logger.warn('foo')
2068# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002069
Benjamin Petersone711caf2008-06-11 16:44:04 +00002070#
Jesse Noller6214edd2009-01-19 16:23:53 +00002071# Test to verify handle verification, see issue 3321
2072#
2073
2074class TestInvalidHandle(unittest.TestCase):
2075
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002076 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002077 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00002078 conn = _multiprocessing.Connection(44977608)
2079 self.assertRaises(IOError, conn.poll)
2080 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002081
Jesse Noller6214edd2009-01-19 16:23:53 +00002082#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002083# Functions used to create test cases from the base ones in this module
2084#
2085
2086def get_attributes(Source, names):
2087 d = {}
2088 for name in names:
2089 obj = getattr(Source, name)
2090 if type(obj) == type(get_attributes):
2091 obj = staticmethod(obj)
2092 d[name] = obj
2093 return d
2094
2095def create_test_cases(Mixin, type):
2096 result = {}
2097 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002098 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002099
2100 for name in list(glob.keys()):
2101 if name.startswith('_Test'):
2102 base = glob[name]
2103 if type in base.ALLOWED_TYPES:
2104 newname = 'With' + Type + name[1:]
2105 class Temp(base, unittest.TestCase, Mixin):
2106 pass
2107 result[newname] = Temp
2108 Temp.__name__ = newname
2109 Temp.__module__ = Mixin.__module__
2110 return result
2111
2112#
2113# Create test cases
2114#
2115
2116class ProcessesMixin(object):
2117 TYPE = 'processes'
2118 Process = multiprocessing.Process
2119 locals().update(get_attributes(multiprocessing, (
2120 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2121 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2122 'RawArray', 'current_process', 'active_children', 'Pipe',
2123 'connection', 'JoinableQueue'
2124 )))
2125
2126testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2127globals().update(testcases_processes)
2128
2129
2130class ManagerMixin(object):
2131 TYPE = 'manager'
2132 Process = multiprocessing.Process
2133 manager = object.__new__(multiprocessing.managers.SyncManager)
2134 locals().update(get_attributes(manager, (
2135 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2136 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2137 'Namespace', 'JoinableQueue'
2138 )))
2139
2140testcases_manager = create_test_cases(ManagerMixin, type='manager')
2141globals().update(testcases_manager)
2142
2143
2144class ThreadsMixin(object):
2145 TYPE = 'threads'
2146 Process = multiprocessing.dummy.Process
2147 locals().update(get_attributes(multiprocessing.dummy, (
2148 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2149 'Condition', 'Event', 'Value', 'Array', 'current_process',
2150 'active_children', 'Pipe', 'connection', 'dict', 'list',
2151 'Namespace', 'JoinableQueue'
2152 )))
2153
2154testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2155globals().update(testcases_threads)
2156
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002157class OtherTest(unittest.TestCase):
2158 # TODO: add more tests for deliver/answer challenge.
2159 def test_deliver_challenge_auth_failure(self):
2160 class _FakeConnection(object):
2161 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002162 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002163 def send_bytes(self, data):
2164 pass
2165 self.assertRaises(multiprocessing.AuthenticationError,
2166 multiprocessing.connection.deliver_challenge,
2167 _FakeConnection(), b'abc')
2168
2169 def test_answer_challenge_auth_failure(self):
2170 class _FakeConnection(object):
2171 def __init__(self):
2172 self.count = 0
2173 def recv_bytes(self, size):
2174 self.count += 1
2175 if self.count == 1:
2176 return multiprocessing.connection.CHALLENGE
2177 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002178 return b'something bogus'
2179 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002180 def send_bytes(self, data):
2181 pass
2182 self.assertRaises(multiprocessing.AuthenticationError,
2183 multiprocessing.connection.answer_challenge,
2184 _FakeConnection(), b'abc')
2185
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002186#
2187# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2188#
2189
2190def initializer(ns):
2191 ns.test += 1
2192
2193class TestInitializers(unittest.TestCase):
2194 def setUp(self):
2195 self.mgr = multiprocessing.Manager()
2196 self.ns = self.mgr.Namespace()
2197 self.ns.test = 0
2198
2199 def tearDown(self):
2200 self.mgr.shutdown()
2201
2202 def test_manager_initializer(self):
2203 m = multiprocessing.managers.SyncManager()
2204 self.assertRaises(TypeError, m.start, 1)
2205 m.start(initializer, (self.ns,))
2206 self.assertEqual(self.ns.test, 1)
2207 m.shutdown()
2208
2209 def test_pool_initializer(self):
2210 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2211 p = multiprocessing.Pool(1, initializer, (self.ns,))
2212 p.close()
2213 p.join()
2214 self.assertEqual(self.ns.test, 1)
2215
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002216#
2217# Issue 5155, 5313, 5331: Test process in processes
2218# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2219#
2220
2221def _ThisSubProcess(q):
2222 try:
2223 item = q.get(block=False)
2224 except pyqueue.Empty:
2225 pass
2226
2227def _TestProcess(q):
2228 queue = multiprocessing.Queue()
2229 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002230 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002231 subProc.start()
2232 subProc.join()
2233
2234def _afunc(x):
2235 return x*x
2236
2237def pool_in_process():
2238 pool = multiprocessing.Pool(processes=4)
2239 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2240
2241class _file_like(object):
2242 def __init__(self, delegate):
2243 self._delegate = delegate
2244 self._pid = None
2245
2246 @property
2247 def cache(self):
2248 pid = os.getpid()
2249 # There are no race conditions since fork keeps only the running thread
2250 if pid != self._pid:
2251 self._pid = pid
2252 self._cache = []
2253 return self._cache
2254
2255 def write(self, data):
2256 self.cache.append(data)
2257
2258 def flush(self):
2259 self._delegate.write(''.join(self.cache))
2260 self._cache = []
2261
2262class TestStdinBadfiledescriptor(unittest.TestCase):
2263
2264 def test_queue_in_process(self):
2265 queue = multiprocessing.Queue()
2266 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2267 proc.start()
2268 proc.join()
2269
2270 def test_pool_in_process(self):
2271 p = multiprocessing.Process(target=pool_in_process)
2272 p.start()
2273 p.join()
2274
2275 def test_flushing(self):
2276 sio = io.StringIO()
2277 flike = _file_like(sio)
2278 flike.write('foo')
2279 proc = multiprocessing.Process(target=lambda: flike.flush())
2280 flike.flush()
2281 assert sio.getvalue() == 'foo'
2282
2283testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2284 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002285
Benjamin Petersone711caf2008-06-11 16:44:04 +00002286#
2287#
2288#
2289
2290def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002291 if sys.platform.startswith("linux"):
2292 try:
2293 lock = multiprocessing.RLock()
2294 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002295 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002296
Benjamin Petersone711caf2008-06-11 16:44:04 +00002297 if run is None:
2298 from test.support import run_unittest as run
2299
2300 util.get_temp_dir() # creates temp directory for use by all processes
2301
2302 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2303
Benjamin Peterson41181742008-07-02 20:22:54 +00002304 ProcessesMixin.pool = multiprocessing.Pool(4)
2305 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2306 ManagerMixin.manager.__init__()
2307 ManagerMixin.manager.start()
2308 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002309
2310 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002311 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2312 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002313 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2314 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002315 )
2316
2317 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2318 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2319 run(suite)
2320
Benjamin Peterson41181742008-07-02 20:22:54 +00002321 ThreadsMixin.pool.terminate()
2322 ProcessesMixin.pool.terminate()
2323 ManagerMixin.pool.terminate()
2324 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002325
Benjamin Peterson41181742008-07-02 20:22:54 +00002326 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002327
2328def main():
2329 test_main(unittest.TextTestRunner(verbosity=2).run)
2330
2331if __name__ == '__main__':
2332 main()