blob: dc41e151bf30e64a1af1869b94f2def399bd27d1 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47#
48#
49
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000050def latin(s):
51 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
Benjamin Petersone711caf2008-06-11 16:44:04 +000053#
54# Constants
55#
56
57LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000058#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60DELTA = 0.1
61CHECK_TIMINGS = False # making true makes tests take a lot longer
62 # and can sometimes cause some non-serious
63 # failures because some calls block a bit
64 # longer than expected
65if CHECK_TIMINGS:
66 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
67else:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
69
70HAVE_GETVALUE = not getattr(_multiprocessing,
71 'HAVE_BROKEN_SEM_GETVALUE', False)
72
Jesse Noller6214edd2009-01-19 16:23:53 +000073WIN32 = (sys.platform == "win32")
74
Benjamin Petersone711caf2008-06-11 16:44:04 +000075#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000076# Some tests require ctypes
77#
78
79try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000080 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000081except ImportError:
82 Structure = object
83 c_int = c_double = None
84
85#
Benjamin Petersone711caf2008-06-11 16:44:04 +000086# Creates a wrapper for a function which records the time it takes to finish
87#
88
89class TimingWrapper(object):
90
91 def __init__(self, func):
92 self.func = func
93 self.elapsed = None
94
95 def __call__(self, *args, **kwds):
96 t = time.time()
97 try:
98 return self.func(*args, **kwds)
99 finally:
100 self.elapsed = time.time() - t
101
102#
103# Base class for test cases
104#
105
106class BaseTestCase(object):
107
108 ALLOWED_TYPES = ('processes', 'manager', 'threads')
109
110 def assertTimingAlmostEqual(self, a, b):
111 if CHECK_TIMINGS:
112 self.assertAlmostEqual(a, b, 1)
113
114 def assertReturnsIfImplemented(self, value, func, *args):
115 try:
116 res = func(*args)
117 except NotImplementedError:
118 pass
119 else:
120 return self.assertEqual(value, res)
121
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000122 # For the sanity of Windows users, rather than crashing or freezing in
123 # multiple ways.
124 def __reduce__(self, *args):
125 raise NotImplementedError("shouldn't try to pickle a test case")
126
127 __reduce_ex__ = __reduce__
128
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129#
130# Return the value of a semaphore
131#
132
133def get_value(self):
134 try:
135 return self.get_value()
136 except AttributeError:
137 try:
138 return self._Semaphore__value
139 except AttributeError:
140 try:
141 return self._value
142 except AttributeError:
143 raise NotImplementedError
144
145#
146# Testcases
147#
148
149class _TestProcess(BaseTestCase):
150
151 ALLOWED_TYPES = ('processes', 'threads')
152
153 def test_current(self):
154 if self.TYPE == 'threads':
155 return
156
157 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159
160 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000161 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000162 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEqual(current.ident, os.getpid())
165 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000167 @classmethod
168 def _test(cls, q, *args, **kwds):
169 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 q.put(args)
171 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000172 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000173 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 q.put(current.pid)
176
177 def test_process(self):
178 q = self.Queue(1)
179 e = self.Event()
180 args = (q, 1, 2)
181 kwargs = {'hello':23, 'bye':2.54}
182 name = 'SomeProcess'
183 p = self.Process(
184 target=self._test, args=args, kwargs=kwargs, name=name
185 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 current = self.current_process()
188
189 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(p.authkey, current.authkey)
191 self.assertEqual(p.is_alive(), False)
192 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000193 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196
197 p.start()
198
Ezio Melottib3aedd42010-11-20 19:04:17 +0000199 self.assertEqual(p.exitcode, None)
200 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000201 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
Ezio Melottib3aedd42010-11-20 19:04:17 +0000203 self.assertEqual(q.get(), args[1:])
204 self.assertEqual(q.get(), kwargs)
205 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000207 self.assertEqual(q.get(), current.authkey)
208 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 p.join()
211
Ezio Melottib3aedd42010-11-20 19:04:17 +0000212 self.assertEqual(p.exitcode, 0)
213 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000214 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000216 @classmethod
217 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 time.sleep(1000)
219
220 def test_terminate(self):
221 if self.TYPE == 'threads':
222 return
223
224 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 p.start()
227
228 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000229 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
232 p.terminate()
233
234 join = TimingWrapper(p.join)
235 self.assertEqual(join(), None)
236 self.assertTimingAlmostEqual(join.elapsed, 0.0)
237
238 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000239 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240
241 p.join()
242
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000243 # XXX sometimes get p.exitcode == 0 on Windows ...
244 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245
246 def test_cpu_count(self):
247 try:
248 cpus = multiprocessing.cpu_count()
249 except NotImplementedError:
250 cpus = 1
251 self.assertTrue(type(cpus) is int)
252 self.assertTrue(cpus >= 1)
253
254 def test_active_children(self):
255 self.assertEqual(type(self.active_children()), list)
256
257 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259
260 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000261 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262
263 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000264 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000266 @classmethod
267 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 from multiprocessing import forking
269 wconn.send(id)
270 if len(id) < 2:
271 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000272 p = cls.Process(
273 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274 )
275 p.start()
276 p.join()
277
278 def test_recursion(self):
279 rconn, wconn = self.Pipe(duplex=False)
280 self._test_recursion(wconn, [])
281
282 time.sleep(DELTA)
283 result = []
284 while rconn.poll():
285 result.append(rconn.recv())
286
287 expected = [
288 [],
289 [0],
290 [0, 0],
291 [0, 1],
292 [1],
293 [1, 0],
294 [1, 1]
295 ]
296 self.assertEqual(result, expected)
297
298#
299#
300#
301
302class _UpperCaser(multiprocessing.Process):
303
304 def __init__(self):
305 multiprocessing.Process.__init__(self)
306 self.child_conn, self.parent_conn = multiprocessing.Pipe()
307
308 def run(self):
309 self.parent_conn.close()
310 for s in iter(self.child_conn.recv, None):
311 self.child_conn.send(s.upper())
312 self.child_conn.close()
313
314 def submit(self, s):
315 assert type(s) is str
316 self.parent_conn.send(s)
317 return self.parent_conn.recv()
318
319 def stop(self):
320 self.parent_conn.send(None)
321 self.parent_conn.close()
322 self.child_conn.close()
323
324class _TestSubclassingProcess(BaseTestCase):
325
326 ALLOWED_TYPES = ('processes',)
327
328 def test_subclassing(self):
329 uppercaser = _UpperCaser()
330 uppercaser.start()
331 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
332 self.assertEqual(uppercaser.submit('world'), 'WORLD')
333 uppercaser.stop()
334 uppercaser.join()
335
336#
337#
338#
339
340def queue_empty(q):
341 if hasattr(q, 'empty'):
342 return q.empty()
343 else:
344 return q.qsize() == 0
345
346def queue_full(q, maxsize):
347 if hasattr(q, 'full'):
348 return q.full()
349 else:
350 return q.qsize() == maxsize
351
352
353class _TestQueue(BaseTestCase):
354
355
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000356 @classmethod
357 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000358 child_can_start.wait()
359 for i in range(6):
360 queue.get()
361 parent_can_continue.set()
362
363 def test_put(self):
364 MAXSIZE = 6
365 queue = self.Queue(maxsize=MAXSIZE)
366 child_can_start = self.Event()
367 parent_can_continue = self.Event()
368
369 proc = self.Process(
370 target=self._test_put,
371 args=(queue, child_can_start, parent_can_continue)
372 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000373 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000374 proc.start()
375
376 self.assertEqual(queue_empty(queue), True)
377 self.assertEqual(queue_full(queue, MAXSIZE), False)
378
379 queue.put(1)
380 queue.put(2, True)
381 queue.put(3, True, None)
382 queue.put(4, False)
383 queue.put(5, False, None)
384 queue.put_nowait(6)
385
386 # the values may be in buffer but not yet in pipe so sleep a bit
387 time.sleep(DELTA)
388
389 self.assertEqual(queue_empty(queue), False)
390 self.assertEqual(queue_full(queue, MAXSIZE), True)
391
392 put = TimingWrapper(queue.put)
393 put_nowait = TimingWrapper(queue.put_nowait)
394
395 self.assertRaises(pyqueue.Full, put, 7, False)
396 self.assertTimingAlmostEqual(put.elapsed, 0)
397
398 self.assertRaises(pyqueue.Full, put, 7, False, None)
399 self.assertTimingAlmostEqual(put.elapsed, 0)
400
401 self.assertRaises(pyqueue.Full, put_nowait, 7)
402 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
403
404 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
405 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
406
407 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
408 self.assertTimingAlmostEqual(put.elapsed, 0)
409
410 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
411 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 self.assertEqual(queue_empty(queue), True)
417 self.assertEqual(queue_full(queue, MAXSIZE), False)
418
419 proc.join()
420
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000421 @classmethod
422 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000424 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 queue.put(2)
426 queue.put(3)
427 queue.put(4)
428 queue.put(5)
429 parent_can_continue.set()
430
431 def test_get(self):
432 queue = self.Queue()
433 child_can_start = self.Event()
434 parent_can_continue = self.Event()
435
436 proc = self.Process(
437 target=self._test_get,
438 args=(queue, child_can_start, parent_can_continue)
439 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000440 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 proc.start()
442
443 self.assertEqual(queue_empty(queue), True)
444
445 child_can_start.set()
446 parent_can_continue.wait()
447
448 time.sleep(DELTA)
449 self.assertEqual(queue_empty(queue), False)
450
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000451 # Hangs unexpectedly, remove for now
452 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 self.assertEqual(queue.get(True, None), 2)
454 self.assertEqual(queue.get(True), 3)
455 self.assertEqual(queue.get(timeout=1), 4)
456 self.assertEqual(queue.get_nowait(), 5)
457
458 self.assertEqual(queue_empty(queue), True)
459
460 get = TimingWrapper(queue.get)
461 get_nowait = TimingWrapper(queue.get_nowait)
462
463 self.assertRaises(pyqueue.Empty, get, False)
464 self.assertTimingAlmostEqual(get.elapsed, 0)
465
466 self.assertRaises(pyqueue.Empty, get, False, None)
467 self.assertTimingAlmostEqual(get.elapsed, 0)
468
469 self.assertRaises(pyqueue.Empty, get_nowait)
470 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
471
472 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
473 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
474
475 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
476 self.assertTimingAlmostEqual(get.elapsed, 0)
477
478 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
479 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
480
481 proc.join()
482
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000483 @classmethod
484 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485 for i in range(10, 20):
486 queue.put(i)
487 # note that at this point the items may only be buffered, so the
488 # process cannot shutdown until the feeder thread has finished
489 # pushing items onto the pipe.
490
491 def test_fork(self):
492 # Old versions of Queue would fail to create a new feeder
493 # thread for a forked process if the original process had its
494 # own feeder thread. This test checks that this no longer
495 # happens.
496
497 queue = self.Queue()
498
499 # put items on queue so that main process starts a feeder thread
500 for i in range(10):
501 queue.put(i)
502
503 # wait to make sure thread starts before we fork a new process
504 time.sleep(DELTA)
505
506 # fork process
507 p = self.Process(target=self._test_fork, args=(queue,))
508 p.start()
509
510 # check that all expected items are in the queue
511 for i in range(20):
512 self.assertEqual(queue.get(), i)
513 self.assertRaises(pyqueue.Empty, queue.get, False)
514
515 p.join()
516
517 def test_qsize(self):
518 q = self.Queue()
519 try:
520 self.assertEqual(q.qsize(), 0)
521 except NotImplementedError:
522 return
523 q.put(1)
524 self.assertEqual(q.qsize(), 1)
525 q.put(5)
526 self.assertEqual(q.qsize(), 2)
527 q.get()
528 self.assertEqual(q.qsize(), 1)
529 q.get()
530 self.assertEqual(q.qsize(), 0)
531
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000532 @classmethod
533 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000534 for obj in iter(q.get, None):
535 time.sleep(DELTA)
536 q.task_done()
537
538 def test_task_done(self):
539 queue = self.JoinableQueue()
540
541 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000542 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000543
544 workers = [self.Process(target=self._test_task_done, args=(queue,))
545 for i in range(4)]
546
547 for p in workers:
548 p.start()
549
550 for i in range(10):
551 queue.put(i)
552
553 queue.join()
554
555 for p in workers:
556 queue.put(None)
557
558 for p in workers:
559 p.join()
560
561#
562#
563#
564
565class _TestLock(BaseTestCase):
566
567 def test_lock(self):
568 lock = self.Lock()
569 self.assertEqual(lock.acquire(), True)
570 self.assertEqual(lock.acquire(False), False)
571 self.assertEqual(lock.release(), None)
572 self.assertRaises((ValueError, threading.ThreadError), lock.release)
573
574 def test_rlock(self):
575 lock = self.RLock()
576 self.assertEqual(lock.acquire(), True)
577 self.assertEqual(lock.acquire(), True)
578 self.assertEqual(lock.acquire(), True)
579 self.assertEqual(lock.release(), None)
580 self.assertEqual(lock.release(), None)
581 self.assertEqual(lock.release(), None)
582 self.assertRaises((AssertionError, RuntimeError), lock.release)
583
Jesse Nollerf8d00852009-03-31 03:25:07 +0000584 def test_lock_context(self):
585 with self.Lock():
586 pass
587
Benjamin Petersone711caf2008-06-11 16:44:04 +0000588
589class _TestSemaphore(BaseTestCase):
590
591 def _test_semaphore(self, sem):
592 self.assertReturnsIfImplemented(2, get_value, sem)
593 self.assertEqual(sem.acquire(), True)
594 self.assertReturnsIfImplemented(1, get_value, sem)
595 self.assertEqual(sem.acquire(), True)
596 self.assertReturnsIfImplemented(0, get_value, sem)
597 self.assertEqual(sem.acquire(False), False)
598 self.assertReturnsIfImplemented(0, get_value, sem)
599 self.assertEqual(sem.release(), None)
600 self.assertReturnsIfImplemented(1, get_value, sem)
601 self.assertEqual(sem.release(), None)
602 self.assertReturnsIfImplemented(2, get_value, sem)
603
604 def test_semaphore(self):
605 sem = self.Semaphore(2)
606 self._test_semaphore(sem)
607 self.assertEqual(sem.release(), None)
608 self.assertReturnsIfImplemented(3, get_value, sem)
609 self.assertEqual(sem.release(), None)
610 self.assertReturnsIfImplemented(4, get_value, sem)
611
612 def test_bounded_semaphore(self):
613 sem = self.BoundedSemaphore(2)
614 self._test_semaphore(sem)
615 # Currently fails on OS/X
616 #if HAVE_GETVALUE:
617 # self.assertRaises(ValueError, sem.release)
618 # self.assertReturnsIfImplemented(2, get_value, sem)
619
620 def test_timeout(self):
621 if self.TYPE != 'processes':
622 return
623
624 sem = self.Semaphore(0)
625 acquire = TimingWrapper(sem.acquire)
626
627 self.assertEqual(acquire(False), False)
628 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
629
630 self.assertEqual(acquire(False, None), False)
631 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
632
633 self.assertEqual(acquire(False, TIMEOUT1), False)
634 self.assertTimingAlmostEqual(acquire.elapsed, 0)
635
636 self.assertEqual(acquire(True, TIMEOUT2), False)
637 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
638
639 self.assertEqual(acquire(timeout=TIMEOUT3), False)
640 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
641
642
643class _TestCondition(BaseTestCase):
644
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000645 @classmethod
646 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647 cond.acquire()
648 sleeping.release()
649 cond.wait(timeout)
650 woken.release()
651 cond.release()
652
653 def check_invariant(self, cond):
654 # this is only supposed to succeed when there are no sleepers
655 if self.TYPE == 'processes':
656 try:
657 sleepers = (cond._sleeping_count.get_value() -
658 cond._woken_count.get_value())
659 self.assertEqual(sleepers, 0)
660 self.assertEqual(cond._wait_semaphore.get_value(), 0)
661 except NotImplementedError:
662 pass
663
664 def test_notify(self):
665 cond = self.Condition()
666 sleeping = self.Semaphore(0)
667 woken = self.Semaphore(0)
668
669 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000670 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 p.start()
672
673 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000674 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000675 p.start()
676
677 # wait for both children to start sleeping
678 sleeping.acquire()
679 sleeping.acquire()
680
681 # check no process/thread has woken up
682 time.sleep(DELTA)
683 self.assertReturnsIfImplemented(0, get_value, woken)
684
685 # wake up one process/thread
686 cond.acquire()
687 cond.notify()
688 cond.release()
689
690 # check one process/thread has woken up
691 time.sleep(DELTA)
692 self.assertReturnsIfImplemented(1, get_value, woken)
693
694 # wake up another
695 cond.acquire()
696 cond.notify()
697 cond.release()
698
699 # check other has woken up
700 time.sleep(DELTA)
701 self.assertReturnsIfImplemented(2, get_value, woken)
702
703 # check state is not mucked up
704 self.check_invariant(cond)
705 p.join()
706
707 def test_notify_all(self):
708 cond = self.Condition()
709 sleeping = self.Semaphore(0)
710 woken = self.Semaphore(0)
711
712 # start some threads/processes which will timeout
713 for i in range(3):
714 p = self.Process(target=self.f,
715 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000716 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000717 p.start()
718
719 t = threading.Thread(target=self.f,
720 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000721 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000722 t.start()
723
724 # wait for them all to sleep
725 for i in range(6):
726 sleeping.acquire()
727
728 # check they have all timed out
729 for i in range(6):
730 woken.acquire()
731 self.assertReturnsIfImplemented(0, get_value, woken)
732
733 # check state is not mucked up
734 self.check_invariant(cond)
735
736 # start some more threads/processes
737 for i in range(3):
738 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000739 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 p.start()
741
742 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000743 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 t.start()
745
746 # wait for them to all sleep
747 for i in range(6):
748 sleeping.acquire()
749
750 # check no process/thread has woken up
751 time.sleep(DELTA)
752 self.assertReturnsIfImplemented(0, get_value, woken)
753
754 # wake them all up
755 cond.acquire()
756 cond.notify_all()
757 cond.release()
758
759 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200760 for i in range(10):
761 try:
762 if get_value(woken) == 6:
763 break
764 except NotImplementedError:
765 break
766 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000767 self.assertReturnsIfImplemented(6, get_value, woken)
768
769 # check state is not mucked up
770 self.check_invariant(cond)
771
772 def test_timeout(self):
773 cond = self.Condition()
774 wait = TimingWrapper(cond.wait)
775 cond.acquire()
776 res = wait(TIMEOUT1)
777 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000778 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
780
781
782class _TestEvent(BaseTestCase):
783
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000784 @classmethod
785 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 time.sleep(TIMEOUT2)
787 event.set()
788
789 def test_event(self):
790 event = self.Event()
791 wait = TimingWrapper(event.wait)
792
Ezio Melotti13925002011-03-16 11:05:33 +0200793 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000794 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000795 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000796
Benjamin Peterson965ce872009-04-05 21:24:58 +0000797 # Removed, threading.Event.wait() will return the value of the __flag
798 # instead of None. API Shear with the semaphore backed mp.Event
799 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000800 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000801 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
803
804 event.set()
805
806 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000807 self.assertEqual(event.is_set(), True)
808 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000809 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000810 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
812 # self.assertEqual(event.is_set(), True)
813
814 event.clear()
815
816 #self.assertEqual(event.is_set(), False)
817
818 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000819 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000820
821#
822#
823#
824
825class _TestValue(BaseTestCase):
826
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000827 ALLOWED_TYPES = ('processes',)
828
Benjamin Petersone711caf2008-06-11 16:44:04 +0000829 codes_values = [
830 ('i', 4343, 24234),
831 ('d', 3.625, -4.25),
832 ('h', -232, 234),
833 ('c', latin('x'), latin('y'))
834 ]
835
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000836 def setUp(self):
837 if not HAS_SHAREDCTYPES:
838 self.skipTest("requires multiprocessing.sharedctypes")
839
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000840 @classmethod
841 def _test(cls, values):
842 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000843 sv.value = cv[2]
844
845
846 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000847 if raw:
848 values = [self.RawValue(code, value)
849 for code, value, _ in self.codes_values]
850 else:
851 values = [self.Value(code, value)
852 for code, value, _ in self.codes_values]
853
854 for sv, cv in zip(values, self.codes_values):
855 self.assertEqual(sv.value, cv[1])
856
857 proc = self.Process(target=self._test, args=(values,))
858 proc.start()
859 proc.join()
860
861 for sv, cv in zip(values, self.codes_values):
862 self.assertEqual(sv.value, cv[2])
863
864 def test_rawvalue(self):
865 self.test_value(raw=True)
866
867 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868 val1 = self.Value('i', 5)
869 lock1 = val1.get_lock()
870 obj1 = val1.get_obj()
871
872 val2 = self.Value('i', 5, lock=None)
873 lock2 = val2.get_lock()
874 obj2 = val2.get_obj()
875
876 lock = self.Lock()
877 val3 = self.Value('i', 5, lock=lock)
878 lock3 = val3.get_lock()
879 obj3 = val3.get_obj()
880 self.assertEqual(lock, lock3)
881
Jesse Nollerb0516a62009-01-18 03:11:38 +0000882 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883 self.assertFalse(hasattr(arr4, 'get_lock'))
884 self.assertFalse(hasattr(arr4, 'get_obj'))
885
Jesse Nollerb0516a62009-01-18 03:11:38 +0000886 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
887
888 arr5 = self.RawValue('i', 5)
889 self.assertFalse(hasattr(arr5, 'get_lock'))
890 self.assertFalse(hasattr(arr5, 'get_obj'))
891
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892
893class _TestArray(BaseTestCase):
894
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000895 ALLOWED_TYPES = ('processes',)
896
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000897 @classmethod
898 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000899 for i in range(1, len(seq)):
900 seq[i] += seq[i-1]
901
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000902 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000903 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
905 if raw:
906 arr = self.RawArray('i', seq)
907 else:
908 arr = self.Array('i', seq)
909
910 self.assertEqual(len(arr), len(seq))
911 self.assertEqual(arr[3], seq[3])
912 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
913
914 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
915
916 self.assertEqual(list(arr[:]), seq)
917
918 self.f(seq)
919
920 p = self.Process(target=self.f, args=(arr,))
921 p.start()
922 p.join()
923
924 self.assertEqual(list(arr[:]), seq)
925
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000926 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000927 def test_array_from_size(self):
928 size = 10
929 # Test for zeroing (see issue #11675).
930 # The repetition below strengthens the test by increasing the chances
931 # of previously allocated non-zero memory being used for the new array
932 # on the 2nd and 3rd loops.
933 for _ in range(3):
934 arr = self.Array('i', size)
935 self.assertEqual(len(arr), size)
936 self.assertEqual(list(arr), [0] * size)
937 arr[:] = range(10)
938 self.assertEqual(list(arr), list(range(10)))
939 del arr
940
941 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942 def test_rawarray(self):
943 self.test_array(raw=True)
944
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000945 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000947 arr1 = self.Array('i', list(range(10)))
948 lock1 = arr1.get_lock()
949 obj1 = arr1.get_obj()
950
951 arr2 = self.Array('i', list(range(10)), lock=None)
952 lock2 = arr2.get_lock()
953 obj2 = arr2.get_obj()
954
955 lock = self.Lock()
956 arr3 = self.Array('i', list(range(10)), lock=lock)
957 lock3 = arr3.get_lock()
958 obj3 = arr3.get_obj()
959 self.assertEqual(lock, lock3)
960
Jesse Nollerb0516a62009-01-18 03:11:38 +0000961 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000962 self.assertFalse(hasattr(arr4, 'get_lock'))
963 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000964 self.assertRaises(AttributeError,
965 self.Array, 'i', range(10), lock='notalock')
966
967 arr5 = self.RawArray('i', range(10))
968 self.assertFalse(hasattr(arr5, 'get_lock'))
969 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970
971#
972#
973#
974
975class _TestContainers(BaseTestCase):
976
977 ALLOWED_TYPES = ('manager',)
978
979 def test_list(self):
980 a = self.list(list(range(10)))
981 self.assertEqual(a[:], list(range(10)))
982
983 b = self.list()
984 self.assertEqual(b[:], [])
985
986 b.extend(list(range(5)))
987 self.assertEqual(b[:], list(range(5)))
988
989 self.assertEqual(b[2], 2)
990 self.assertEqual(b[2:10], [2,3,4])
991
992 b *= 2
993 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
994
995 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
996
997 self.assertEqual(a[:], list(range(10)))
998
999 d = [a, b]
1000 e = self.list(d)
1001 self.assertEqual(
1002 e[:],
1003 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1004 )
1005
1006 f = self.list([a])
1007 a.append('hello')
1008 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1009
1010 def test_dict(self):
1011 d = self.dict()
1012 indices = list(range(65, 70))
1013 for i in indices:
1014 d[i] = chr(i)
1015 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1016 self.assertEqual(sorted(d.keys()), indices)
1017 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1018 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1019
1020 def test_namespace(self):
1021 n = self.Namespace()
1022 n.name = 'Bob'
1023 n.job = 'Builder'
1024 n._hidden = 'hidden'
1025 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1026 del n.job
1027 self.assertEqual(str(n), "Namespace(name='Bob')")
1028 self.assertTrue(hasattr(n, 'name'))
1029 self.assertTrue(not hasattr(n, 'job'))
1030
1031#
1032#
1033#
1034
1035def sqr(x, wait=0.0):
1036 time.sleep(wait)
1037 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001038
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039class _TestPool(BaseTestCase):
1040
1041 def test_apply(self):
1042 papply = self.pool.apply
1043 self.assertEqual(papply(sqr, (5,)), sqr(5))
1044 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1045
1046 def test_map(self):
1047 pmap = self.pool.map
1048 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1049 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1050 list(map(sqr, list(range(100)))))
1051
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001052 def test_map_chunksize(self):
1053 try:
1054 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1055 except multiprocessing.TimeoutError:
1056 self.fail("pool.map_async with chunksize stalled on null list")
1057
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058 def test_async(self):
1059 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1060 get = TimingWrapper(res.get)
1061 self.assertEqual(get(), 49)
1062 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1063
1064 def test_async_timeout(self):
1065 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1066 get = TimingWrapper(res.get)
1067 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1068 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1069
1070 def test_imap(self):
1071 it = self.pool.imap(sqr, list(range(10)))
1072 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1073
1074 it = self.pool.imap(sqr, list(range(10)))
1075 for i in range(10):
1076 self.assertEqual(next(it), i*i)
1077 self.assertRaises(StopIteration, it.__next__)
1078
1079 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1080 for i in range(1000):
1081 self.assertEqual(next(it), i*i)
1082 self.assertRaises(StopIteration, it.__next__)
1083
1084 def test_imap_unordered(self):
1085 it = self.pool.imap_unordered(sqr, list(range(1000)))
1086 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1087
1088 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1089 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1090
1091 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001092 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1093 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1094
Benjamin Petersone711caf2008-06-11 16:44:04 +00001095 p = multiprocessing.Pool(3)
1096 self.assertEqual(3, len(p._pool))
1097 p.close()
1098 p.join()
1099
1100 def test_terminate(self):
1101 if self.TYPE == 'manager':
1102 # On Unix a forked process increfs each shared object to
1103 # which its parent process held a reference. If the
1104 # forked process gets terminated then there is likely to
1105 # be a reference leak. So to prevent
1106 # _TestZZZNumberOfObjects from failing we skip this test
1107 # when using a manager.
1108 return
1109
1110 result = self.pool.map_async(
1111 time.sleep, [0.1 for i in range(10000)], chunksize=1
1112 )
1113 self.pool.terminate()
1114 join = TimingWrapper(self.pool.join)
1115 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001116 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001117
Ask Solem2afcbf22010-11-09 20:55:52 +00001118def raising():
1119 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001120
Ask Solem2afcbf22010-11-09 20:55:52 +00001121def unpickleable_result():
1122 return lambda: 42
1123
1124class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001125 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001126
1127 def test_async_error_callback(self):
1128 p = multiprocessing.Pool(2)
1129
1130 scratchpad = [None]
1131 def errback(exc):
1132 scratchpad[0] = exc
1133
1134 res = p.apply_async(raising, error_callback=errback)
1135 self.assertRaises(KeyError, res.get)
1136 self.assertTrue(scratchpad[0])
1137 self.assertIsInstance(scratchpad[0], KeyError)
1138
1139 p.close()
1140 p.join()
1141
1142 def test_unpickleable_result(self):
1143 from multiprocessing.pool import MaybeEncodingError
1144 p = multiprocessing.Pool(2)
1145
1146 # Make sure we don't lose pool processes because of encoding errors.
1147 for iteration in range(20):
1148
1149 scratchpad = [None]
1150 def errback(exc):
1151 scratchpad[0] = exc
1152
1153 res = p.apply_async(unpickleable_result, error_callback=errback)
1154 self.assertRaises(MaybeEncodingError, res.get)
1155 wrapped = scratchpad[0]
1156 self.assertTrue(wrapped)
1157 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1158 self.assertIsNotNone(wrapped.exc)
1159 self.assertIsNotNone(wrapped.value)
1160
1161 p.close()
1162 p.join()
1163
1164class _TestPoolWorkerLifetime(BaseTestCase):
1165 ALLOWED_TYPES = ('processes', )
1166
Jesse Noller1f0b6582010-01-27 03:36:01 +00001167 def test_pool_worker_lifetime(self):
1168 p = multiprocessing.Pool(3, maxtasksperchild=10)
1169 self.assertEqual(3, len(p._pool))
1170 origworkerpids = [w.pid for w in p._pool]
1171 # Run many tasks so each worker gets replaced (hopefully)
1172 results = []
1173 for i in range(100):
1174 results.append(p.apply_async(sqr, (i, )))
1175 # Fetch the results and verify we got the right answers,
1176 # also ensuring all the tasks have completed.
1177 for (j, res) in enumerate(results):
1178 self.assertEqual(res.get(), sqr(j))
1179 # Refill the pool
1180 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001181 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001182 # (countdown * DELTA = 5 seconds max startup process time)
1183 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001184 while countdown and not all(w.is_alive() for w in p._pool):
1185 countdown -= 1
1186 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001187 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001188 # All pids should be assigned. See issue #7805.
1189 self.assertNotIn(None, origworkerpids)
1190 self.assertNotIn(None, finalworkerpids)
1191 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001192 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1193 p.close()
1194 p.join()
1195
Benjamin Petersone711caf2008-06-11 16:44:04 +00001196#
1197# Test that manager has expected number of shared objects left
1198#
1199
1200class _TestZZZNumberOfObjects(BaseTestCase):
1201 # Because test cases are sorted alphabetically, this one will get
1202 # run after all the other tests for the manager. It tests that
1203 # there have been no "reference leaks" for the manager's shared
1204 # objects. Note the comment in _TestPool.test_terminate().
1205 ALLOWED_TYPES = ('manager',)
1206
1207 def test_number_of_objects(self):
1208 EXPECTED_NUMBER = 1 # the pool object is still alive
1209 multiprocessing.active_children() # discard dead process objs
1210 gc.collect() # do garbage collection
1211 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001212 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001213 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001214 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001215 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001216
1217 self.assertEqual(refs, EXPECTED_NUMBER)
1218
1219#
1220# Test of creating a customized manager class
1221#
1222
1223from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1224
1225class FooBar(object):
1226 def f(self):
1227 return 'f()'
1228 def g(self):
1229 raise ValueError
1230 def _h(self):
1231 return '_h()'
1232
1233def baz():
1234 for i in range(10):
1235 yield i*i
1236
1237class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001238 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001239 def __iter__(self):
1240 return self
1241 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001242 return self._callmethod('__next__')
1243
1244class MyManager(BaseManager):
1245 pass
1246
1247MyManager.register('Foo', callable=FooBar)
1248MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1249MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1250
1251
1252class _TestMyManager(BaseTestCase):
1253
1254 ALLOWED_TYPES = ('manager',)
1255
1256 def test_mymanager(self):
1257 manager = MyManager()
1258 manager.start()
1259
1260 foo = manager.Foo()
1261 bar = manager.Bar()
1262 baz = manager.baz()
1263
1264 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1265 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1266
1267 self.assertEqual(foo_methods, ['f', 'g'])
1268 self.assertEqual(bar_methods, ['f', '_h'])
1269
1270 self.assertEqual(foo.f(), 'f()')
1271 self.assertRaises(ValueError, foo.g)
1272 self.assertEqual(foo._callmethod('f'), 'f()')
1273 self.assertRaises(RemoteError, foo._callmethod, '_h')
1274
1275 self.assertEqual(bar.f(), 'f()')
1276 self.assertEqual(bar._h(), '_h()')
1277 self.assertEqual(bar._callmethod('f'), 'f()')
1278 self.assertEqual(bar._callmethod('_h'), '_h()')
1279
1280 self.assertEqual(list(baz), [i*i for i in range(10)])
1281
1282 manager.shutdown()
1283
1284#
1285# Test of connecting to a remote server and using xmlrpclib for serialization
1286#
1287
1288_queue = pyqueue.Queue()
1289def get_queue():
1290 return _queue
1291
1292class QueueManager(BaseManager):
1293 '''manager class used by server process'''
1294QueueManager.register('get_queue', callable=get_queue)
1295
1296class QueueManager2(BaseManager):
1297 '''manager class which specifies the same interface as QueueManager'''
1298QueueManager2.register('get_queue')
1299
1300
1301SERIALIZER = 'xmlrpclib'
1302
1303class _TestRemoteManager(BaseTestCase):
1304
1305 ALLOWED_TYPES = ('manager',)
1306
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001307 @classmethod
1308 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001309 manager = QueueManager2(
1310 address=address, authkey=authkey, serializer=SERIALIZER
1311 )
1312 manager.connect()
1313 queue = manager.get_queue()
1314 queue.put(('hello world', None, True, 2.25))
1315
1316 def test_remote(self):
1317 authkey = os.urandom(32)
1318
1319 manager = QueueManager(
1320 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1321 )
1322 manager.start()
1323
1324 p = self.Process(target=self._putter, args=(manager.address, authkey))
1325 p.start()
1326
1327 manager2 = QueueManager2(
1328 address=manager.address, authkey=authkey, serializer=SERIALIZER
1329 )
1330 manager2.connect()
1331 queue = manager2.get_queue()
1332
1333 # Note that xmlrpclib will deserialize object as a list not a tuple
1334 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1335
1336 # Because we are using xmlrpclib for serialization instead of
1337 # pickle this will cause a serialization error.
1338 self.assertRaises(Exception, queue.put, time.sleep)
1339
1340 # Make queue finalizer run before the server is stopped
1341 del queue
1342 manager.shutdown()
1343
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001344class _TestManagerRestart(BaseTestCase):
1345
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001346 @classmethod
1347 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001348 manager = QueueManager(
1349 address=address, authkey=authkey, serializer=SERIALIZER)
1350 manager.connect()
1351 queue = manager.get_queue()
1352 queue.put('hello world')
1353
1354 def test_rapid_restart(self):
1355 authkey = os.urandom(32)
1356 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001357 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001358 srvr = manager.get_server()
1359 addr = srvr.address
1360 # Close the connection.Listener socket which gets opened as a part
1361 # of manager.get_server(). It's not needed for the test.
1362 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001363 manager.start()
1364
1365 p = self.Process(target=self._putter, args=(manager.address, authkey))
1366 p.start()
1367 queue = manager.get_queue()
1368 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001369 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001370 manager.shutdown()
1371 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001372 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001373 try:
1374 manager.start()
1375 except IOError as e:
1376 if e.errno != errno.EADDRINUSE:
1377 raise
1378 # Retry after some time, in case the old socket was lingering
1379 # (sporadic failure on buildbots)
1380 time.sleep(1.0)
1381 manager = QueueManager(
1382 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001383 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001384
Benjamin Petersone711caf2008-06-11 16:44:04 +00001385#
1386#
1387#
1388
1389SENTINEL = latin('')
1390
1391class _TestConnection(BaseTestCase):
1392
1393 ALLOWED_TYPES = ('processes', 'threads')
1394
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001395 @classmethod
1396 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001397 for msg in iter(conn.recv_bytes, SENTINEL):
1398 conn.send_bytes(msg)
1399 conn.close()
1400
1401 def test_connection(self):
1402 conn, child_conn = self.Pipe()
1403
1404 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001405 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001406 p.start()
1407
1408 seq = [1, 2.25, None]
1409 msg = latin('hello world')
1410 longmsg = msg * 10
1411 arr = array.array('i', list(range(4)))
1412
1413 if self.TYPE == 'processes':
1414 self.assertEqual(type(conn.fileno()), int)
1415
1416 self.assertEqual(conn.send(seq), None)
1417 self.assertEqual(conn.recv(), seq)
1418
1419 self.assertEqual(conn.send_bytes(msg), None)
1420 self.assertEqual(conn.recv_bytes(), msg)
1421
1422 if self.TYPE == 'processes':
1423 buffer = array.array('i', [0]*10)
1424 expected = list(arr) + [0] * (10 - len(arr))
1425 self.assertEqual(conn.send_bytes(arr), None)
1426 self.assertEqual(conn.recv_bytes_into(buffer),
1427 len(arr) * buffer.itemsize)
1428 self.assertEqual(list(buffer), expected)
1429
1430 buffer = array.array('i', [0]*10)
1431 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1432 self.assertEqual(conn.send_bytes(arr), None)
1433 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1434 len(arr) * buffer.itemsize)
1435 self.assertEqual(list(buffer), expected)
1436
1437 buffer = bytearray(latin(' ' * 40))
1438 self.assertEqual(conn.send_bytes(longmsg), None)
1439 try:
1440 res = conn.recv_bytes_into(buffer)
1441 except multiprocessing.BufferTooShort as e:
1442 self.assertEqual(e.args, (longmsg,))
1443 else:
1444 self.fail('expected BufferTooShort, got %s' % res)
1445
1446 poll = TimingWrapper(conn.poll)
1447
1448 self.assertEqual(poll(), False)
1449 self.assertTimingAlmostEqual(poll.elapsed, 0)
1450
1451 self.assertEqual(poll(TIMEOUT1), False)
1452 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1453
1454 conn.send(None)
1455
1456 self.assertEqual(poll(TIMEOUT1), True)
1457 self.assertTimingAlmostEqual(poll.elapsed, 0)
1458
1459 self.assertEqual(conn.recv(), None)
1460
1461 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1462 conn.send_bytes(really_big_msg)
1463 self.assertEqual(conn.recv_bytes(), really_big_msg)
1464
1465 conn.send_bytes(SENTINEL) # tell child to quit
1466 child_conn.close()
1467
1468 if self.TYPE == 'processes':
1469 self.assertEqual(conn.readable, True)
1470 self.assertEqual(conn.writable, True)
1471 self.assertRaises(EOFError, conn.recv)
1472 self.assertRaises(EOFError, conn.recv_bytes)
1473
1474 p.join()
1475
1476 def test_duplex_false(self):
1477 reader, writer = self.Pipe(duplex=False)
1478 self.assertEqual(writer.send(1), None)
1479 self.assertEqual(reader.recv(), 1)
1480 if self.TYPE == 'processes':
1481 self.assertEqual(reader.readable, True)
1482 self.assertEqual(reader.writable, False)
1483 self.assertEqual(writer.readable, False)
1484 self.assertEqual(writer.writable, True)
1485 self.assertRaises(IOError, reader.send, 2)
1486 self.assertRaises(IOError, writer.recv)
1487 self.assertRaises(IOError, writer.poll)
1488
1489 def test_spawn_close(self):
1490 # We test that a pipe connection can be closed by parent
1491 # process immediately after child is spawned. On Windows this
1492 # would have sometimes failed on old versions because
1493 # child_conn would be closed before the child got a chance to
1494 # duplicate it.
1495 conn, child_conn = self.Pipe()
1496
1497 p = self.Process(target=self._echo, args=(child_conn,))
1498 p.start()
1499 child_conn.close() # this might complete before child initializes
1500
1501 msg = latin('hello')
1502 conn.send_bytes(msg)
1503 self.assertEqual(conn.recv_bytes(), msg)
1504
1505 conn.send_bytes(SENTINEL)
1506 conn.close()
1507 p.join()
1508
1509 def test_sendbytes(self):
1510 if self.TYPE != 'processes':
1511 return
1512
1513 msg = latin('abcdefghijklmnopqrstuvwxyz')
1514 a, b = self.Pipe()
1515
1516 a.send_bytes(msg)
1517 self.assertEqual(b.recv_bytes(), msg)
1518
1519 a.send_bytes(msg, 5)
1520 self.assertEqual(b.recv_bytes(), msg[5:])
1521
1522 a.send_bytes(msg, 7, 8)
1523 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1524
1525 a.send_bytes(msg, 26)
1526 self.assertEqual(b.recv_bytes(), latin(''))
1527
1528 a.send_bytes(msg, 26, 0)
1529 self.assertEqual(b.recv_bytes(), latin(''))
1530
1531 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1532
1533 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1534
1535 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1536
1537 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1538
1539 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1540
Benjamin Petersone711caf2008-06-11 16:44:04 +00001541class _TestListenerClient(BaseTestCase):
1542
1543 ALLOWED_TYPES = ('processes', 'threads')
1544
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001545 @classmethod
1546 def _test(cls, address):
1547 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001548 conn.send('hello')
1549 conn.close()
1550
1551 def test_listener_client(self):
1552 for family in self.connection.families:
1553 l = self.connection.Listener(family=family)
1554 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001555 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001556 p.start()
1557 conn = l.accept()
1558 self.assertEqual(conn.recv(), 'hello')
1559 p.join()
1560 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001561#
1562# Test of sending connection and socket objects between processes
1563#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001564"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001565class _TestPicklingConnections(BaseTestCase):
1566
1567 ALLOWED_TYPES = ('processes',)
1568
1569 def _listener(self, conn, families):
1570 for fam in families:
1571 l = self.connection.Listener(family=fam)
1572 conn.send(l.address)
1573 new_conn = l.accept()
1574 conn.send(new_conn)
1575
1576 if self.TYPE == 'processes':
1577 l = socket.socket()
1578 l.bind(('localhost', 0))
1579 conn.send(l.getsockname())
1580 l.listen(1)
1581 new_conn, addr = l.accept()
1582 conn.send(new_conn)
1583
1584 conn.recv()
1585
1586 def _remote(self, conn):
1587 for (address, msg) in iter(conn.recv, None):
1588 client = self.connection.Client(address)
1589 client.send(msg.upper())
1590 client.close()
1591
1592 if self.TYPE == 'processes':
1593 address, msg = conn.recv()
1594 client = socket.socket()
1595 client.connect(address)
1596 client.sendall(msg.upper())
1597 client.close()
1598
1599 conn.close()
1600
1601 def test_pickling(self):
1602 try:
1603 multiprocessing.allow_connection_pickling()
1604 except ImportError:
1605 return
1606
1607 families = self.connection.families
1608
1609 lconn, lconn0 = self.Pipe()
1610 lp = self.Process(target=self._listener, args=(lconn0, families))
1611 lp.start()
1612 lconn0.close()
1613
1614 rconn, rconn0 = self.Pipe()
1615 rp = self.Process(target=self._remote, args=(rconn0,))
1616 rp.start()
1617 rconn0.close()
1618
1619 for fam in families:
1620 msg = ('This connection uses family %s' % fam).encode('ascii')
1621 address = lconn.recv()
1622 rconn.send((address, msg))
1623 new_conn = lconn.recv()
1624 self.assertEqual(new_conn.recv(), msg.upper())
1625
1626 rconn.send(None)
1627
1628 if self.TYPE == 'processes':
1629 msg = latin('This connection uses a normal socket')
1630 address = lconn.recv()
1631 rconn.send((address, msg))
1632 if hasattr(socket, 'fromfd'):
1633 new_conn = lconn.recv()
1634 self.assertEqual(new_conn.recv(100), msg.upper())
1635 else:
1636 # XXX On Windows with Py2.6 need to backport fromfd()
1637 discard = lconn.recv_bytes()
1638
1639 lconn.send(None)
1640
1641 rconn.close()
1642 lconn.close()
1643
1644 lp.join()
1645 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001646"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001647#
1648#
1649#
1650
1651class _TestHeap(BaseTestCase):
1652
1653 ALLOWED_TYPES = ('processes',)
1654
1655 def test_heap(self):
1656 iterations = 5000
1657 maxblocks = 50
1658 blocks = []
1659
1660 # create and destroy lots of blocks of different sizes
1661 for i in range(iterations):
1662 size = int(random.lognormvariate(0, 1) * 1000)
1663 b = multiprocessing.heap.BufferWrapper(size)
1664 blocks.append(b)
1665 if len(blocks) > maxblocks:
1666 i = random.randrange(maxblocks)
1667 del blocks[i]
1668
1669 # get the heap object
1670 heap = multiprocessing.heap.BufferWrapper._heap
1671
1672 # verify the state of the heap
1673 all = []
1674 occupied = 0
1675 for L in list(heap._len_to_seq.values()):
1676 for arena, start, stop in L:
1677 all.append((heap._arenas.index(arena), start, stop,
1678 stop-start, 'free'))
1679 for arena, start, stop in heap._allocated_blocks:
1680 all.append((heap._arenas.index(arena), start, stop,
1681 stop-start, 'occupied'))
1682 occupied += (stop-start)
1683
1684 all.sort()
1685
1686 for i in range(len(all)-1):
1687 (arena, start, stop) = all[i][:3]
1688 (narena, nstart, nstop) = all[i+1][:3]
1689 self.assertTrue((arena != narena and nstart == 0) or
1690 (stop == nstart))
1691
1692#
1693#
1694#
1695
Benjamin Petersone711caf2008-06-11 16:44:04 +00001696class _Foo(Structure):
1697 _fields_ = [
1698 ('x', c_int),
1699 ('y', c_double)
1700 ]
1701
1702class _TestSharedCTypes(BaseTestCase):
1703
1704 ALLOWED_TYPES = ('processes',)
1705
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001706 def setUp(self):
1707 if not HAS_SHAREDCTYPES:
1708 self.skipTest("requires multiprocessing.sharedctypes")
1709
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001710 @classmethod
1711 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001712 x.value *= 2
1713 y.value *= 2
1714 foo.x *= 2
1715 foo.y *= 2
1716 string.value *= 2
1717 for i in range(len(arr)):
1718 arr[i] *= 2
1719
1720 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001721 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001722 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001723 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001724 arr = self.Array('d', list(range(10)), lock=lock)
1725 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001726 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001727
1728 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1729 p.start()
1730 p.join()
1731
1732 self.assertEqual(x.value, 14)
1733 self.assertAlmostEqual(y.value, 2.0/3.0)
1734 self.assertEqual(foo.x, 6)
1735 self.assertAlmostEqual(foo.y, 4.0)
1736 for i in range(10):
1737 self.assertAlmostEqual(arr[i], i*2)
1738 self.assertEqual(string.value, latin('hellohello'))
1739
1740 def test_synchronize(self):
1741 self.test_sharedctypes(lock=True)
1742
1743 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001744 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001745 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001746 foo.x = 0
1747 foo.y = 0
1748 self.assertEqual(bar.x, 2)
1749 self.assertAlmostEqual(bar.y, 5.0)
1750
1751#
1752#
1753#
1754
1755class _TestFinalize(BaseTestCase):
1756
1757 ALLOWED_TYPES = ('processes',)
1758
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001759 @classmethod
1760 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001761 class Foo(object):
1762 pass
1763
1764 a = Foo()
1765 util.Finalize(a, conn.send, args=('a',))
1766 del a # triggers callback for a
1767
1768 b = Foo()
1769 close_b = util.Finalize(b, conn.send, args=('b',))
1770 close_b() # triggers callback for b
1771 close_b() # does nothing because callback has already been called
1772 del b # does nothing because callback has already been called
1773
1774 c = Foo()
1775 util.Finalize(c, conn.send, args=('c',))
1776
1777 d10 = Foo()
1778 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1779
1780 d01 = Foo()
1781 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1782 d02 = Foo()
1783 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1784 d03 = Foo()
1785 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1786
1787 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1788
1789 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1790
Ezio Melotti13925002011-03-16 11:05:33 +02001791 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001792 # garbage collecting locals
1793 util._exit_function()
1794 conn.close()
1795 os._exit(0)
1796
1797 def test_finalize(self):
1798 conn, child_conn = self.Pipe()
1799
1800 p = self.Process(target=self._test_finalize, args=(child_conn,))
1801 p.start()
1802 p.join()
1803
1804 result = [obj for obj in iter(conn.recv, 'STOP')]
1805 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1806
1807#
1808# Test that from ... import * works for each module
1809#
1810
1811class _TestImportStar(BaseTestCase):
1812
1813 ALLOWED_TYPES = ('processes',)
1814
1815 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001816 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001817 'multiprocessing', 'multiprocessing.connection',
1818 'multiprocessing.heap', 'multiprocessing.managers',
1819 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001820 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001821 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001822 ]
1823
1824 if c_int is not None:
1825 # This module requires _ctypes
1826 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001827
1828 for name in modules:
1829 __import__(name)
1830 mod = sys.modules[name]
1831
1832 for attr in getattr(mod, '__all__', ()):
1833 self.assertTrue(
1834 hasattr(mod, attr),
1835 '%r does not have attribute %r' % (mod, attr)
1836 )
1837
1838#
1839# Quick test that logging works -- does not test logging output
1840#
1841
1842class _TestLogging(BaseTestCase):
1843
1844 ALLOWED_TYPES = ('processes',)
1845
1846 def test_enable_logging(self):
1847 logger = multiprocessing.get_logger()
1848 logger.setLevel(util.SUBWARNING)
1849 self.assertTrue(logger is not None)
1850 logger.debug('this will not be printed')
1851 logger.info('nor will this')
1852 logger.setLevel(LOG_LEVEL)
1853
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001854 @classmethod
1855 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001856 logger = multiprocessing.get_logger()
1857 conn.send(logger.getEffectiveLevel())
1858
1859 def test_level(self):
1860 LEVEL1 = 32
1861 LEVEL2 = 37
1862
1863 logger = multiprocessing.get_logger()
1864 root_logger = logging.getLogger()
1865 root_level = root_logger.level
1866
1867 reader, writer = multiprocessing.Pipe(duplex=False)
1868
1869 logger.setLevel(LEVEL1)
1870 self.Process(target=self._test_level, args=(writer,)).start()
1871 self.assertEqual(LEVEL1, reader.recv())
1872
1873 logger.setLevel(logging.NOTSET)
1874 root_logger.setLevel(LEVEL2)
1875 self.Process(target=self._test_level, args=(writer,)).start()
1876 self.assertEqual(LEVEL2, reader.recv())
1877
1878 root_logger.setLevel(root_level)
1879 logger.setLevel(level=LOG_LEVEL)
1880
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001881
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001882# class _TestLoggingProcessName(BaseTestCase):
1883#
1884# def handle(self, record):
1885# assert record.processName == multiprocessing.current_process().name
1886# self.__handled = True
1887#
1888# def test_logging(self):
1889# handler = logging.Handler()
1890# handler.handle = self.handle
1891# self.__handled = False
1892# # Bypass getLogger() and side-effects
1893# logger = logging.getLoggerClass()(
1894# 'multiprocessing.test.TestLoggingProcessName')
1895# logger.addHandler(handler)
1896# logger.propagate = False
1897#
1898# logger.warn('foo')
1899# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001900
Benjamin Petersone711caf2008-06-11 16:44:04 +00001901#
Jesse Noller6214edd2009-01-19 16:23:53 +00001902# Test to verify handle verification, see issue 3321
1903#
1904
1905class TestInvalidHandle(unittest.TestCase):
1906
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001907 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001908 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001909 conn = _multiprocessing.Connection(44977608)
1910 self.assertRaises(IOError, conn.poll)
1911 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001912
Jesse Noller6214edd2009-01-19 16:23:53 +00001913#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001914# Functions used to create test cases from the base ones in this module
1915#
1916
1917def get_attributes(Source, names):
1918 d = {}
1919 for name in names:
1920 obj = getattr(Source, name)
1921 if type(obj) == type(get_attributes):
1922 obj = staticmethod(obj)
1923 d[name] = obj
1924 return d
1925
1926def create_test_cases(Mixin, type):
1927 result = {}
1928 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001929 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001930
1931 for name in list(glob.keys()):
1932 if name.startswith('_Test'):
1933 base = glob[name]
1934 if type in base.ALLOWED_TYPES:
1935 newname = 'With' + Type + name[1:]
1936 class Temp(base, unittest.TestCase, Mixin):
1937 pass
1938 result[newname] = Temp
1939 Temp.__name__ = newname
1940 Temp.__module__ = Mixin.__module__
1941 return result
1942
1943#
1944# Create test cases
1945#
1946
1947class ProcessesMixin(object):
1948 TYPE = 'processes'
1949 Process = multiprocessing.Process
1950 locals().update(get_attributes(multiprocessing, (
1951 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1952 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1953 'RawArray', 'current_process', 'active_children', 'Pipe',
1954 'connection', 'JoinableQueue'
1955 )))
1956
1957testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1958globals().update(testcases_processes)
1959
1960
1961class ManagerMixin(object):
1962 TYPE = 'manager'
1963 Process = multiprocessing.Process
1964 manager = object.__new__(multiprocessing.managers.SyncManager)
1965 locals().update(get_attributes(manager, (
1966 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1967 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1968 'Namespace', 'JoinableQueue'
1969 )))
1970
1971testcases_manager = create_test_cases(ManagerMixin, type='manager')
1972globals().update(testcases_manager)
1973
1974
1975class ThreadsMixin(object):
1976 TYPE = 'threads'
1977 Process = multiprocessing.dummy.Process
1978 locals().update(get_attributes(multiprocessing.dummy, (
1979 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1980 'Condition', 'Event', 'Value', 'Array', 'current_process',
1981 'active_children', 'Pipe', 'connection', 'dict', 'list',
1982 'Namespace', 'JoinableQueue'
1983 )))
1984
1985testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1986globals().update(testcases_threads)
1987
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001988class OtherTest(unittest.TestCase):
1989 # TODO: add more tests for deliver/answer challenge.
1990 def test_deliver_challenge_auth_failure(self):
1991 class _FakeConnection(object):
1992 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001993 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001994 def send_bytes(self, data):
1995 pass
1996 self.assertRaises(multiprocessing.AuthenticationError,
1997 multiprocessing.connection.deliver_challenge,
1998 _FakeConnection(), b'abc')
1999
2000 def test_answer_challenge_auth_failure(self):
2001 class _FakeConnection(object):
2002 def __init__(self):
2003 self.count = 0
2004 def recv_bytes(self, size):
2005 self.count += 1
2006 if self.count == 1:
2007 return multiprocessing.connection.CHALLENGE
2008 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002009 return b'something bogus'
2010 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002011 def send_bytes(self, data):
2012 pass
2013 self.assertRaises(multiprocessing.AuthenticationError,
2014 multiprocessing.connection.answer_challenge,
2015 _FakeConnection(), b'abc')
2016
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002017#
2018# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2019#
2020
2021def initializer(ns):
2022 ns.test += 1
2023
2024class TestInitializers(unittest.TestCase):
2025 def setUp(self):
2026 self.mgr = multiprocessing.Manager()
2027 self.ns = self.mgr.Namespace()
2028 self.ns.test = 0
2029
2030 def tearDown(self):
2031 self.mgr.shutdown()
2032
2033 def test_manager_initializer(self):
2034 m = multiprocessing.managers.SyncManager()
2035 self.assertRaises(TypeError, m.start, 1)
2036 m.start(initializer, (self.ns,))
2037 self.assertEqual(self.ns.test, 1)
2038 m.shutdown()
2039
2040 def test_pool_initializer(self):
2041 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2042 p = multiprocessing.Pool(1, initializer, (self.ns,))
2043 p.close()
2044 p.join()
2045 self.assertEqual(self.ns.test, 1)
2046
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002047#
2048# Issue 5155, 5313, 5331: Test process in processes
2049# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2050#
2051
2052def _ThisSubProcess(q):
2053 try:
2054 item = q.get(block=False)
2055 except pyqueue.Empty:
2056 pass
2057
2058def _TestProcess(q):
2059 queue = multiprocessing.Queue()
2060 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2061 subProc.start()
2062 subProc.join()
2063
2064def _afunc(x):
2065 return x*x
2066
2067def pool_in_process():
2068 pool = multiprocessing.Pool(processes=4)
2069 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2070
2071class _file_like(object):
2072 def __init__(self, delegate):
2073 self._delegate = delegate
2074 self._pid = None
2075
2076 @property
2077 def cache(self):
2078 pid = os.getpid()
2079 # There are no race conditions since fork keeps only the running thread
2080 if pid != self._pid:
2081 self._pid = pid
2082 self._cache = []
2083 return self._cache
2084
2085 def write(self, data):
2086 self.cache.append(data)
2087
2088 def flush(self):
2089 self._delegate.write(''.join(self.cache))
2090 self._cache = []
2091
2092class TestStdinBadfiledescriptor(unittest.TestCase):
2093
2094 def test_queue_in_process(self):
2095 queue = multiprocessing.Queue()
2096 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2097 proc.start()
2098 proc.join()
2099
2100 def test_pool_in_process(self):
2101 p = multiprocessing.Process(target=pool_in_process)
2102 p.start()
2103 p.join()
2104
2105 def test_flushing(self):
2106 sio = io.StringIO()
2107 flike = _file_like(sio)
2108 flike.write('foo')
2109 proc = multiprocessing.Process(target=lambda: flike.flush())
2110 flike.flush()
2111 assert sio.getvalue() == 'foo'
2112
2113testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2114 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002115
Benjamin Petersone711caf2008-06-11 16:44:04 +00002116#
2117#
2118#
2119
2120def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002121 if sys.platform.startswith("linux"):
2122 try:
2123 lock = multiprocessing.RLock()
2124 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002125 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002126
Benjamin Petersone711caf2008-06-11 16:44:04 +00002127 if run is None:
2128 from test.support import run_unittest as run
2129
2130 util.get_temp_dir() # creates temp directory for use by all processes
2131
2132 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2133
Benjamin Peterson41181742008-07-02 20:22:54 +00002134 ProcessesMixin.pool = multiprocessing.Pool(4)
2135 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2136 ManagerMixin.manager.__init__()
2137 ManagerMixin.manager.start()
2138 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002139
2140 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002141 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2142 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002143 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2144 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002145 )
2146
2147 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2148 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2149 run(suite)
2150
Benjamin Peterson41181742008-07-02 20:22:54 +00002151 ThreadsMixin.pool.terminate()
2152 ProcessesMixin.pool.terminate()
2153 ManagerMixin.pool.terminate()
2154 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002155
Benjamin Peterson41181742008-07-02 20:22:54 +00002156 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002157
2158def main():
2159 test_main(unittest.TextTestRunner(verbosity=2).run)
2160
2161if __name__ == '__main__':
2162 main()