blob: 2614689bca94cd5a1eb6e2537ff31ca972deb50f [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47#
48#
49
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000050def latin(s):
51 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
Benjamin Petersone711caf2008-06-11 16:44:04 +000053#
54# Constants
55#
56
57LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000058#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60DELTA = 0.1
61CHECK_TIMINGS = False # making true makes tests take a lot longer
62 # and can sometimes cause some non-serious
63 # failures because some calls block a bit
64 # longer than expected
65if CHECK_TIMINGS:
66 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
67else:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
69
70HAVE_GETVALUE = not getattr(_multiprocessing,
71 'HAVE_BROKEN_SEM_GETVALUE', False)
72
Jesse Noller6214edd2009-01-19 16:23:53 +000073WIN32 = (sys.platform == "win32")
74
Benjamin Petersone711caf2008-06-11 16:44:04 +000075#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000076# Some tests require ctypes
77#
78
79try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000080 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000081except ImportError:
82 Structure = object
83 c_int = c_double = None
84
85#
Benjamin Petersone711caf2008-06-11 16:44:04 +000086# Creates a wrapper for a function which records the time it takes to finish
87#
88
89class TimingWrapper(object):
90
91 def __init__(self, func):
92 self.func = func
93 self.elapsed = None
94
95 def __call__(self, *args, **kwds):
96 t = time.time()
97 try:
98 return self.func(*args, **kwds)
99 finally:
100 self.elapsed = time.time() - t
101
102#
103# Base class for test cases
104#
105
106class BaseTestCase(object):
107
108 ALLOWED_TYPES = ('processes', 'manager', 'threads')
109
110 def assertTimingAlmostEqual(self, a, b):
111 if CHECK_TIMINGS:
112 self.assertAlmostEqual(a, b, 1)
113
114 def assertReturnsIfImplemented(self, value, func, *args):
115 try:
116 res = func(*args)
117 except NotImplementedError:
118 pass
119 else:
120 return self.assertEqual(value, res)
121
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000122 # For the sanity of Windows users, rather than crashing or freezing in
123 # multiple ways.
124 def __reduce__(self, *args):
125 raise NotImplementedError("shouldn't try to pickle a test case")
126
127 __reduce_ex__ = __reduce__
128
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129#
130# Return the value of a semaphore
131#
132
133def get_value(self):
134 try:
135 return self.get_value()
136 except AttributeError:
137 try:
138 return self._Semaphore__value
139 except AttributeError:
140 try:
141 return self._value
142 except AttributeError:
143 raise NotImplementedError
144
145#
146# Testcases
147#
148
149class _TestProcess(BaseTestCase):
150
151 ALLOWED_TYPES = ('processes', 'threads')
152
153 def test_current(self):
154 if self.TYPE == 'threads':
155 return
156
157 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159
160 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000161 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000162 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEqual(current.ident, os.getpid())
165 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000167 @classmethod
168 def _test(cls, q, *args, **kwds):
169 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 q.put(args)
171 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000172 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000173 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 q.put(current.pid)
176
177 def test_process(self):
178 q = self.Queue(1)
179 e = self.Event()
180 args = (q, 1, 2)
181 kwargs = {'hello':23, 'bye':2.54}
182 name = 'SomeProcess'
183 p = self.Process(
184 target=self._test, args=args, kwargs=kwargs, name=name
185 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 current = self.current_process()
188
189 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(p.authkey, current.authkey)
191 self.assertEqual(p.is_alive(), False)
192 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000193 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196
197 p.start()
198
Ezio Melottib3aedd42010-11-20 19:04:17 +0000199 self.assertEqual(p.exitcode, None)
200 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000201 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
Ezio Melottib3aedd42010-11-20 19:04:17 +0000203 self.assertEqual(q.get(), args[1:])
204 self.assertEqual(q.get(), kwargs)
205 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000207 self.assertEqual(q.get(), current.authkey)
208 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 p.join()
211
Ezio Melottib3aedd42010-11-20 19:04:17 +0000212 self.assertEqual(p.exitcode, 0)
213 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000214 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000216 @classmethod
217 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 time.sleep(1000)
219
220 def test_terminate(self):
221 if self.TYPE == 'threads':
222 return
223
224 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 p.start()
227
228 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000229 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
232 p.terminate()
233
234 join = TimingWrapper(p.join)
235 self.assertEqual(join(), None)
236 self.assertTimingAlmostEqual(join.elapsed, 0.0)
237
238 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000239 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240
241 p.join()
242
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000243 # XXX sometimes get p.exitcode == 0 on Windows ...
244 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245
246 def test_cpu_count(self):
247 try:
248 cpus = multiprocessing.cpu_count()
249 except NotImplementedError:
250 cpus = 1
251 self.assertTrue(type(cpus) is int)
252 self.assertTrue(cpus >= 1)
253
254 def test_active_children(self):
255 self.assertEqual(type(self.active_children()), list)
256
257 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259
260 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000261 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262
263 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000264 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000266 @classmethod
267 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 from multiprocessing import forking
269 wconn.send(id)
270 if len(id) < 2:
271 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000272 p = cls.Process(
273 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274 )
275 p.start()
276 p.join()
277
278 def test_recursion(self):
279 rconn, wconn = self.Pipe(duplex=False)
280 self._test_recursion(wconn, [])
281
282 time.sleep(DELTA)
283 result = []
284 while rconn.poll():
285 result.append(rconn.recv())
286
287 expected = [
288 [],
289 [0],
290 [0, 0],
291 [0, 1],
292 [1],
293 [1, 0],
294 [1, 1]
295 ]
296 self.assertEqual(result, expected)
297
298#
299#
300#
301
302class _UpperCaser(multiprocessing.Process):
303
304 def __init__(self):
305 multiprocessing.Process.__init__(self)
306 self.child_conn, self.parent_conn = multiprocessing.Pipe()
307
308 def run(self):
309 self.parent_conn.close()
310 for s in iter(self.child_conn.recv, None):
311 self.child_conn.send(s.upper())
312 self.child_conn.close()
313
314 def submit(self, s):
315 assert type(s) is str
316 self.parent_conn.send(s)
317 return self.parent_conn.recv()
318
319 def stop(self):
320 self.parent_conn.send(None)
321 self.parent_conn.close()
322 self.child_conn.close()
323
324class _TestSubclassingProcess(BaseTestCase):
325
326 ALLOWED_TYPES = ('processes',)
327
328 def test_subclassing(self):
329 uppercaser = _UpperCaser()
330 uppercaser.start()
331 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
332 self.assertEqual(uppercaser.submit('world'), 'WORLD')
333 uppercaser.stop()
334 uppercaser.join()
335
336#
337#
338#
339
340def queue_empty(q):
341 if hasattr(q, 'empty'):
342 return q.empty()
343 else:
344 return q.qsize() == 0
345
346def queue_full(q, maxsize):
347 if hasattr(q, 'full'):
348 return q.full()
349 else:
350 return q.qsize() == maxsize
351
352
353class _TestQueue(BaseTestCase):
354
355
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000356 @classmethod
357 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000358 child_can_start.wait()
359 for i in range(6):
360 queue.get()
361 parent_can_continue.set()
362
363 def test_put(self):
364 MAXSIZE = 6
365 queue = self.Queue(maxsize=MAXSIZE)
366 child_can_start = self.Event()
367 parent_can_continue = self.Event()
368
369 proc = self.Process(
370 target=self._test_put,
371 args=(queue, child_can_start, parent_can_continue)
372 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000373 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000374 proc.start()
375
376 self.assertEqual(queue_empty(queue), True)
377 self.assertEqual(queue_full(queue, MAXSIZE), False)
378
379 queue.put(1)
380 queue.put(2, True)
381 queue.put(3, True, None)
382 queue.put(4, False)
383 queue.put(5, False, None)
384 queue.put_nowait(6)
385
386 # the values may be in buffer but not yet in pipe so sleep a bit
387 time.sleep(DELTA)
388
389 self.assertEqual(queue_empty(queue), False)
390 self.assertEqual(queue_full(queue, MAXSIZE), True)
391
392 put = TimingWrapper(queue.put)
393 put_nowait = TimingWrapper(queue.put_nowait)
394
395 self.assertRaises(pyqueue.Full, put, 7, False)
396 self.assertTimingAlmostEqual(put.elapsed, 0)
397
398 self.assertRaises(pyqueue.Full, put, 7, False, None)
399 self.assertTimingAlmostEqual(put.elapsed, 0)
400
401 self.assertRaises(pyqueue.Full, put_nowait, 7)
402 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
403
404 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
405 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
406
407 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
408 self.assertTimingAlmostEqual(put.elapsed, 0)
409
410 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
411 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 self.assertEqual(queue_empty(queue), True)
417 self.assertEqual(queue_full(queue, MAXSIZE), False)
418
419 proc.join()
420
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000421 @classmethod
422 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000424 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 queue.put(2)
426 queue.put(3)
427 queue.put(4)
428 queue.put(5)
429 parent_can_continue.set()
430
431 def test_get(self):
432 queue = self.Queue()
433 child_can_start = self.Event()
434 parent_can_continue = self.Event()
435
436 proc = self.Process(
437 target=self._test_get,
438 args=(queue, child_can_start, parent_can_continue)
439 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000440 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 proc.start()
442
443 self.assertEqual(queue_empty(queue), True)
444
445 child_can_start.set()
446 parent_can_continue.wait()
447
448 time.sleep(DELTA)
449 self.assertEqual(queue_empty(queue), False)
450
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000451 # Hangs unexpectedly, remove for now
452 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 self.assertEqual(queue.get(True, None), 2)
454 self.assertEqual(queue.get(True), 3)
455 self.assertEqual(queue.get(timeout=1), 4)
456 self.assertEqual(queue.get_nowait(), 5)
457
458 self.assertEqual(queue_empty(queue), True)
459
460 get = TimingWrapper(queue.get)
461 get_nowait = TimingWrapper(queue.get_nowait)
462
463 self.assertRaises(pyqueue.Empty, get, False)
464 self.assertTimingAlmostEqual(get.elapsed, 0)
465
466 self.assertRaises(pyqueue.Empty, get, False, None)
467 self.assertTimingAlmostEqual(get.elapsed, 0)
468
469 self.assertRaises(pyqueue.Empty, get_nowait)
470 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
471
472 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
473 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
474
475 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
476 self.assertTimingAlmostEqual(get.elapsed, 0)
477
478 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
479 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
480
481 proc.join()
482
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000483 @classmethod
484 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485 for i in range(10, 20):
486 queue.put(i)
487 # note that at this point the items may only be buffered, so the
488 # process cannot shutdown until the feeder thread has finished
489 # pushing items onto the pipe.
490
491 def test_fork(self):
492 # Old versions of Queue would fail to create a new feeder
493 # thread for a forked process if the original process had its
494 # own feeder thread. This test checks that this no longer
495 # happens.
496
497 queue = self.Queue()
498
499 # put items on queue so that main process starts a feeder thread
500 for i in range(10):
501 queue.put(i)
502
503 # wait to make sure thread starts before we fork a new process
504 time.sleep(DELTA)
505
506 # fork process
507 p = self.Process(target=self._test_fork, args=(queue,))
508 p.start()
509
510 # check that all expected items are in the queue
511 for i in range(20):
512 self.assertEqual(queue.get(), i)
513 self.assertRaises(pyqueue.Empty, queue.get, False)
514
515 p.join()
516
517 def test_qsize(self):
518 q = self.Queue()
519 try:
520 self.assertEqual(q.qsize(), 0)
521 except NotImplementedError:
522 return
523 q.put(1)
524 self.assertEqual(q.qsize(), 1)
525 q.put(5)
526 self.assertEqual(q.qsize(), 2)
527 q.get()
528 self.assertEqual(q.qsize(), 1)
529 q.get()
530 self.assertEqual(q.qsize(), 0)
531
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000532 @classmethod
533 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000534 for obj in iter(q.get, None):
535 time.sleep(DELTA)
536 q.task_done()
537
538 def test_task_done(self):
539 queue = self.JoinableQueue()
540
541 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000542 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000543
544 workers = [self.Process(target=self._test_task_done, args=(queue,))
545 for i in range(4)]
546
547 for p in workers:
548 p.start()
549
550 for i in range(10):
551 queue.put(i)
552
553 queue.join()
554
555 for p in workers:
556 queue.put(None)
557
558 for p in workers:
559 p.join()
560
561#
562#
563#
564
565class _TestLock(BaseTestCase):
566
567 def test_lock(self):
568 lock = self.Lock()
569 self.assertEqual(lock.acquire(), True)
570 self.assertEqual(lock.acquire(False), False)
571 self.assertEqual(lock.release(), None)
572 self.assertRaises((ValueError, threading.ThreadError), lock.release)
573
574 def test_rlock(self):
575 lock = self.RLock()
576 self.assertEqual(lock.acquire(), True)
577 self.assertEqual(lock.acquire(), True)
578 self.assertEqual(lock.acquire(), True)
579 self.assertEqual(lock.release(), None)
580 self.assertEqual(lock.release(), None)
581 self.assertEqual(lock.release(), None)
582 self.assertRaises((AssertionError, RuntimeError), lock.release)
583
Jesse Nollerf8d00852009-03-31 03:25:07 +0000584 def test_lock_context(self):
585 with self.Lock():
586 pass
587
Benjamin Petersone711caf2008-06-11 16:44:04 +0000588
589class _TestSemaphore(BaseTestCase):
590
591 def _test_semaphore(self, sem):
592 self.assertReturnsIfImplemented(2, get_value, sem)
593 self.assertEqual(sem.acquire(), True)
594 self.assertReturnsIfImplemented(1, get_value, sem)
595 self.assertEqual(sem.acquire(), True)
596 self.assertReturnsIfImplemented(0, get_value, sem)
597 self.assertEqual(sem.acquire(False), False)
598 self.assertReturnsIfImplemented(0, get_value, sem)
599 self.assertEqual(sem.release(), None)
600 self.assertReturnsIfImplemented(1, get_value, sem)
601 self.assertEqual(sem.release(), None)
602 self.assertReturnsIfImplemented(2, get_value, sem)
603
604 def test_semaphore(self):
605 sem = self.Semaphore(2)
606 self._test_semaphore(sem)
607 self.assertEqual(sem.release(), None)
608 self.assertReturnsIfImplemented(3, get_value, sem)
609 self.assertEqual(sem.release(), None)
610 self.assertReturnsIfImplemented(4, get_value, sem)
611
612 def test_bounded_semaphore(self):
613 sem = self.BoundedSemaphore(2)
614 self._test_semaphore(sem)
615 # Currently fails on OS/X
616 #if HAVE_GETVALUE:
617 # self.assertRaises(ValueError, sem.release)
618 # self.assertReturnsIfImplemented(2, get_value, sem)
619
620 def test_timeout(self):
621 if self.TYPE != 'processes':
622 return
623
624 sem = self.Semaphore(0)
625 acquire = TimingWrapper(sem.acquire)
626
627 self.assertEqual(acquire(False), False)
628 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
629
630 self.assertEqual(acquire(False, None), False)
631 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
632
633 self.assertEqual(acquire(False, TIMEOUT1), False)
634 self.assertTimingAlmostEqual(acquire.elapsed, 0)
635
636 self.assertEqual(acquire(True, TIMEOUT2), False)
637 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
638
639 self.assertEqual(acquire(timeout=TIMEOUT3), False)
640 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
641
642
643class _TestCondition(BaseTestCase):
644
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000645 @classmethod
646 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647 cond.acquire()
648 sleeping.release()
649 cond.wait(timeout)
650 woken.release()
651 cond.release()
652
653 def check_invariant(self, cond):
654 # this is only supposed to succeed when there are no sleepers
655 if self.TYPE == 'processes':
656 try:
657 sleepers = (cond._sleeping_count.get_value() -
658 cond._woken_count.get_value())
659 self.assertEqual(sleepers, 0)
660 self.assertEqual(cond._wait_semaphore.get_value(), 0)
661 except NotImplementedError:
662 pass
663
664 def test_notify(self):
665 cond = self.Condition()
666 sleeping = self.Semaphore(0)
667 woken = self.Semaphore(0)
668
669 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000670 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 p.start()
672
673 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000674 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000675 p.start()
676
677 # wait for both children to start sleeping
678 sleeping.acquire()
679 sleeping.acquire()
680
681 # check no process/thread has woken up
682 time.sleep(DELTA)
683 self.assertReturnsIfImplemented(0, get_value, woken)
684
685 # wake up one process/thread
686 cond.acquire()
687 cond.notify()
688 cond.release()
689
690 # check one process/thread has woken up
691 time.sleep(DELTA)
692 self.assertReturnsIfImplemented(1, get_value, woken)
693
694 # wake up another
695 cond.acquire()
696 cond.notify()
697 cond.release()
698
699 # check other has woken up
700 time.sleep(DELTA)
701 self.assertReturnsIfImplemented(2, get_value, woken)
702
703 # check state is not mucked up
704 self.check_invariant(cond)
705 p.join()
706
707 def test_notify_all(self):
708 cond = self.Condition()
709 sleeping = self.Semaphore(0)
710 woken = self.Semaphore(0)
711
712 # start some threads/processes which will timeout
713 for i in range(3):
714 p = self.Process(target=self.f,
715 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000716 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000717 p.start()
718
719 t = threading.Thread(target=self.f,
720 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000721 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000722 t.start()
723
724 # wait for them all to sleep
725 for i in range(6):
726 sleeping.acquire()
727
728 # check they have all timed out
729 for i in range(6):
730 woken.acquire()
731 self.assertReturnsIfImplemented(0, get_value, woken)
732
733 # check state is not mucked up
734 self.check_invariant(cond)
735
736 # start some more threads/processes
737 for i in range(3):
738 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000739 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 p.start()
741
742 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000743 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 t.start()
745
746 # wait for them to all sleep
747 for i in range(6):
748 sleeping.acquire()
749
750 # check no process/thread has woken up
751 time.sleep(DELTA)
752 self.assertReturnsIfImplemented(0, get_value, woken)
753
754 # wake them all up
755 cond.acquire()
756 cond.notify_all()
757 cond.release()
758
759 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200760 for i in range(10):
761 try:
762 if get_value(woken) == 6:
763 break
764 except NotImplementedError:
765 break
766 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000767 self.assertReturnsIfImplemented(6, get_value, woken)
768
769 # check state is not mucked up
770 self.check_invariant(cond)
771
772 def test_timeout(self):
773 cond = self.Condition()
774 wait = TimingWrapper(cond.wait)
775 cond.acquire()
776 res = wait(TIMEOUT1)
777 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000778 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
780
781
782class _TestEvent(BaseTestCase):
783
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000784 @classmethod
785 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 time.sleep(TIMEOUT2)
787 event.set()
788
789 def test_event(self):
790 event = self.Event()
791 wait = TimingWrapper(event.wait)
792
Ezio Melotti13925002011-03-16 11:05:33 +0200793 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000794 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000795 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000796
Benjamin Peterson965ce872009-04-05 21:24:58 +0000797 # Removed, threading.Event.wait() will return the value of the __flag
798 # instead of None. API Shear with the semaphore backed mp.Event
799 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000800 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000801 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
803
804 event.set()
805
806 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000807 self.assertEqual(event.is_set(), True)
808 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000809 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000810 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
812 # self.assertEqual(event.is_set(), True)
813
814 event.clear()
815
816 #self.assertEqual(event.is_set(), False)
817
818 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000819 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000820
821#
822#
823#
824
825class _TestValue(BaseTestCase):
826
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000827 ALLOWED_TYPES = ('processes',)
828
Benjamin Petersone711caf2008-06-11 16:44:04 +0000829 codes_values = [
830 ('i', 4343, 24234),
831 ('d', 3.625, -4.25),
832 ('h', -232, 234),
833 ('c', latin('x'), latin('y'))
834 ]
835
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000836 def setUp(self):
837 if not HAS_SHAREDCTYPES:
838 self.skipTest("requires multiprocessing.sharedctypes")
839
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000840 @classmethod
841 def _test(cls, values):
842 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000843 sv.value = cv[2]
844
845
846 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000847 if raw:
848 values = [self.RawValue(code, value)
849 for code, value, _ in self.codes_values]
850 else:
851 values = [self.Value(code, value)
852 for code, value, _ in self.codes_values]
853
854 for sv, cv in zip(values, self.codes_values):
855 self.assertEqual(sv.value, cv[1])
856
857 proc = self.Process(target=self._test, args=(values,))
858 proc.start()
859 proc.join()
860
861 for sv, cv in zip(values, self.codes_values):
862 self.assertEqual(sv.value, cv[2])
863
864 def test_rawvalue(self):
865 self.test_value(raw=True)
866
867 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868 val1 = self.Value('i', 5)
869 lock1 = val1.get_lock()
870 obj1 = val1.get_obj()
871
872 val2 = self.Value('i', 5, lock=None)
873 lock2 = val2.get_lock()
874 obj2 = val2.get_obj()
875
876 lock = self.Lock()
877 val3 = self.Value('i', 5, lock=lock)
878 lock3 = val3.get_lock()
879 obj3 = val3.get_obj()
880 self.assertEqual(lock, lock3)
881
Jesse Nollerb0516a62009-01-18 03:11:38 +0000882 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883 self.assertFalse(hasattr(arr4, 'get_lock'))
884 self.assertFalse(hasattr(arr4, 'get_obj'))
885
Jesse Nollerb0516a62009-01-18 03:11:38 +0000886 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
887
888 arr5 = self.RawValue('i', 5)
889 self.assertFalse(hasattr(arr5, 'get_lock'))
890 self.assertFalse(hasattr(arr5, 'get_obj'))
891
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892
893class _TestArray(BaseTestCase):
894
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000895 ALLOWED_TYPES = ('processes',)
896
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000897 @classmethod
898 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000899 for i in range(1, len(seq)):
900 seq[i] += seq[i-1]
901
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000902 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000903 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
905 if raw:
906 arr = self.RawArray('i', seq)
907 else:
908 arr = self.Array('i', seq)
909
910 self.assertEqual(len(arr), len(seq))
911 self.assertEqual(arr[3], seq[3])
912 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
913
914 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
915
916 self.assertEqual(list(arr[:]), seq)
917
918 self.f(seq)
919
920 p = self.Process(target=self.f, args=(arr,))
921 p.start()
922 p.join()
923
924 self.assertEqual(list(arr[:]), seq)
925
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000926 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000927 def test_array_from_size(self):
928 size = 10
929 # Test for zeroing (see issue #11675).
930 # The repetition below strengthens the test by increasing the chances
931 # of previously allocated non-zero memory being used for the new array
932 # on the 2nd and 3rd loops.
933 for _ in range(3):
934 arr = self.Array('i', size)
935 self.assertEqual(len(arr), size)
936 self.assertEqual(list(arr), [0] * size)
937 arr[:] = range(10)
938 self.assertEqual(list(arr), list(range(10)))
939 del arr
940
941 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942 def test_rawarray(self):
943 self.test_array(raw=True)
944
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000945 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000947 arr1 = self.Array('i', list(range(10)))
948 lock1 = arr1.get_lock()
949 obj1 = arr1.get_obj()
950
951 arr2 = self.Array('i', list(range(10)), lock=None)
952 lock2 = arr2.get_lock()
953 obj2 = arr2.get_obj()
954
955 lock = self.Lock()
956 arr3 = self.Array('i', list(range(10)), lock=lock)
957 lock3 = arr3.get_lock()
958 obj3 = arr3.get_obj()
959 self.assertEqual(lock, lock3)
960
Jesse Nollerb0516a62009-01-18 03:11:38 +0000961 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000962 self.assertFalse(hasattr(arr4, 'get_lock'))
963 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000964 self.assertRaises(AttributeError,
965 self.Array, 'i', range(10), lock='notalock')
966
967 arr5 = self.RawArray('i', range(10))
968 self.assertFalse(hasattr(arr5, 'get_lock'))
969 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970
971#
972#
973#
974
975class _TestContainers(BaseTestCase):
976
977 ALLOWED_TYPES = ('manager',)
978
979 def test_list(self):
980 a = self.list(list(range(10)))
981 self.assertEqual(a[:], list(range(10)))
982
983 b = self.list()
984 self.assertEqual(b[:], [])
985
986 b.extend(list(range(5)))
987 self.assertEqual(b[:], list(range(5)))
988
989 self.assertEqual(b[2], 2)
990 self.assertEqual(b[2:10], [2,3,4])
991
992 b *= 2
993 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
994
995 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
996
997 self.assertEqual(a[:], list(range(10)))
998
999 d = [a, b]
1000 e = self.list(d)
1001 self.assertEqual(
1002 e[:],
1003 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1004 )
1005
1006 f = self.list([a])
1007 a.append('hello')
1008 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1009
1010 def test_dict(self):
1011 d = self.dict()
1012 indices = list(range(65, 70))
1013 for i in indices:
1014 d[i] = chr(i)
1015 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1016 self.assertEqual(sorted(d.keys()), indices)
1017 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1018 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1019
1020 def test_namespace(self):
1021 n = self.Namespace()
1022 n.name = 'Bob'
1023 n.job = 'Builder'
1024 n._hidden = 'hidden'
1025 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1026 del n.job
1027 self.assertEqual(str(n), "Namespace(name='Bob')")
1028 self.assertTrue(hasattr(n, 'name'))
1029 self.assertTrue(not hasattr(n, 'job'))
1030
1031#
1032#
1033#
1034
1035def sqr(x, wait=0.0):
1036 time.sleep(wait)
1037 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001038
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039class _TestPool(BaseTestCase):
1040
1041 def test_apply(self):
1042 papply = self.pool.apply
1043 self.assertEqual(papply(sqr, (5,)), sqr(5))
1044 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1045
1046 def test_map(self):
1047 pmap = self.pool.map
1048 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1049 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1050 list(map(sqr, list(range(100)))))
1051
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001052 def test_map_chunksize(self):
1053 try:
1054 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1055 except multiprocessing.TimeoutError:
1056 self.fail("pool.map_async with chunksize stalled on null list")
1057
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058 def test_async(self):
1059 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1060 get = TimingWrapper(res.get)
1061 self.assertEqual(get(), 49)
1062 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1063
1064 def test_async_timeout(self):
1065 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1066 get = TimingWrapper(res.get)
1067 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1068 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1069
1070 def test_imap(self):
1071 it = self.pool.imap(sqr, list(range(10)))
1072 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1073
1074 it = self.pool.imap(sqr, list(range(10)))
1075 for i in range(10):
1076 self.assertEqual(next(it), i*i)
1077 self.assertRaises(StopIteration, it.__next__)
1078
1079 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1080 for i in range(1000):
1081 self.assertEqual(next(it), i*i)
1082 self.assertRaises(StopIteration, it.__next__)
1083
1084 def test_imap_unordered(self):
1085 it = self.pool.imap_unordered(sqr, list(range(1000)))
1086 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1087
1088 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1089 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1090
1091 def test_make_pool(self):
1092 p = multiprocessing.Pool(3)
1093 self.assertEqual(3, len(p._pool))
1094 p.close()
1095 p.join()
1096
1097 def test_terminate(self):
1098 if self.TYPE == 'manager':
1099 # On Unix a forked process increfs each shared object to
1100 # which its parent process held a reference. If the
1101 # forked process gets terminated then there is likely to
1102 # be a reference leak. So to prevent
1103 # _TestZZZNumberOfObjects from failing we skip this test
1104 # when using a manager.
1105 return
1106
1107 result = self.pool.map_async(
1108 time.sleep, [0.1 for i in range(10000)], chunksize=1
1109 )
1110 self.pool.terminate()
1111 join = TimingWrapper(self.pool.join)
1112 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001113 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001114
Ask Solem2afcbf22010-11-09 20:55:52 +00001115def raising():
1116 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001117
Ask Solem2afcbf22010-11-09 20:55:52 +00001118def unpickleable_result():
1119 return lambda: 42
1120
1121class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001122 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001123
1124 def test_async_error_callback(self):
1125 p = multiprocessing.Pool(2)
1126
1127 scratchpad = [None]
1128 def errback(exc):
1129 scratchpad[0] = exc
1130
1131 res = p.apply_async(raising, error_callback=errback)
1132 self.assertRaises(KeyError, res.get)
1133 self.assertTrue(scratchpad[0])
1134 self.assertIsInstance(scratchpad[0], KeyError)
1135
1136 p.close()
1137 p.join()
1138
1139 def test_unpickleable_result(self):
1140 from multiprocessing.pool import MaybeEncodingError
1141 p = multiprocessing.Pool(2)
1142
1143 # Make sure we don't lose pool processes because of encoding errors.
1144 for iteration in range(20):
1145
1146 scratchpad = [None]
1147 def errback(exc):
1148 scratchpad[0] = exc
1149
1150 res = p.apply_async(unpickleable_result, error_callback=errback)
1151 self.assertRaises(MaybeEncodingError, res.get)
1152 wrapped = scratchpad[0]
1153 self.assertTrue(wrapped)
1154 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1155 self.assertIsNotNone(wrapped.exc)
1156 self.assertIsNotNone(wrapped.value)
1157
1158 p.close()
1159 p.join()
1160
1161class _TestPoolWorkerLifetime(BaseTestCase):
1162 ALLOWED_TYPES = ('processes', )
1163
Jesse Noller1f0b6582010-01-27 03:36:01 +00001164 def test_pool_worker_lifetime(self):
1165 p = multiprocessing.Pool(3, maxtasksperchild=10)
1166 self.assertEqual(3, len(p._pool))
1167 origworkerpids = [w.pid for w in p._pool]
1168 # Run many tasks so each worker gets replaced (hopefully)
1169 results = []
1170 for i in range(100):
1171 results.append(p.apply_async(sqr, (i, )))
1172 # Fetch the results and verify we got the right answers,
1173 # also ensuring all the tasks have completed.
1174 for (j, res) in enumerate(results):
1175 self.assertEqual(res.get(), sqr(j))
1176 # Refill the pool
1177 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001178 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001179 # (countdown * DELTA = 5 seconds max startup process time)
1180 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001181 while countdown and not all(w.is_alive() for w in p._pool):
1182 countdown -= 1
1183 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001184 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001185 # All pids should be assigned. See issue #7805.
1186 self.assertNotIn(None, origworkerpids)
1187 self.assertNotIn(None, finalworkerpids)
1188 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001189 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1190 p.close()
1191 p.join()
1192
Benjamin Petersone711caf2008-06-11 16:44:04 +00001193#
1194# Test that manager has expected number of shared objects left
1195#
1196
1197class _TestZZZNumberOfObjects(BaseTestCase):
1198 # Because test cases are sorted alphabetically, this one will get
1199 # run after all the other tests for the manager. It tests that
1200 # there have been no "reference leaks" for the manager's shared
1201 # objects. Note the comment in _TestPool.test_terminate().
1202 ALLOWED_TYPES = ('manager',)
1203
1204 def test_number_of_objects(self):
1205 EXPECTED_NUMBER = 1 # the pool object is still alive
1206 multiprocessing.active_children() # discard dead process objs
1207 gc.collect() # do garbage collection
1208 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001209 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001210 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001211 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001212 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001213
1214 self.assertEqual(refs, EXPECTED_NUMBER)
1215
1216#
1217# Test of creating a customized manager class
1218#
1219
1220from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1221
1222class FooBar(object):
1223 def f(self):
1224 return 'f()'
1225 def g(self):
1226 raise ValueError
1227 def _h(self):
1228 return '_h()'
1229
1230def baz():
1231 for i in range(10):
1232 yield i*i
1233
1234class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001235 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001236 def __iter__(self):
1237 return self
1238 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001239 return self._callmethod('__next__')
1240
1241class MyManager(BaseManager):
1242 pass
1243
1244MyManager.register('Foo', callable=FooBar)
1245MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1246MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1247
1248
1249class _TestMyManager(BaseTestCase):
1250
1251 ALLOWED_TYPES = ('manager',)
1252
1253 def test_mymanager(self):
1254 manager = MyManager()
1255 manager.start()
1256
1257 foo = manager.Foo()
1258 bar = manager.Bar()
1259 baz = manager.baz()
1260
1261 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1262 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1263
1264 self.assertEqual(foo_methods, ['f', 'g'])
1265 self.assertEqual(bar_methods, ['f', '_h'])
1266
1267 self.assertEqual(foo.f(), 'f()')
1268 self.assertRaises(ValueError, foo.g)
1269 self.assertEqual(foo._callmethod('f'), 'f()')
1270 self.assertRaises(RemoteError, foo._callmethod, '_h')
1271
1272 self.assertEqual(bar.f(), 'f()')
1273 self.assertEqual(bar._h(), '_h()')
1274 self.assertEqual(bar._callmethod('f'), 'f()')
1275 self.assertEqual(bar._callmethod('_h'), '_h()')
1276
1277 self.assertEqual(list(baz), [i*i for i in range(10)])
1278
1279 manager.shutdown()
1280
1281#
1282# Test of connecting to a remote server and using xmlrpclib for serialization
1283#
1284
1285_queue = pyqueue.Queue()
1286def get_queue():
1287 return _queue
1288
1289class QueueManager(BaseManager):
1290 '''manager class used by server process'''
1291QueueManager.register('get_queue', callable=get_queue)
1292
1293class QueueManager2(BaseManager):
1294 '''manager class which specifies the same interface as QueueManager'''
1295QueueManager2.register('get_queue')
1296
1297
1298SERIALIZER = 'xmlrpclib'
1299
1300class _TestRemoteManager(BaseTestCase):
1301
1302 ALLOWED_TYPES = ('manager',)
1303
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001304 @classmethod
1305 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001306 manager = QueueManager2(
1307 address=address, authkey=authkey, serializer=SERIALIZER
1308 )
1309 manager.connect()
1310 queue = manager.get_queue()
1311 queue.put(('hello world', None, True, 2.25))
1312
1313 def test_remote(self):
1314 authkey = os.urandom(32)
1315
1316 manager = QueueManager(
1317 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1318 )
1319 manager.start()
1320
1321 p = self.Process(target=self._putter, args=(manager.address, authkey))
1322 p.start()
1323
1324 manager2 = QueueManager2(
1325 address=manager.address, authkey=authkey, serializer=SERIALIZER
1326 )
1327 manager2.connect()
1328 queue = manager2.get_queue()
1329
1330 # Note that xmlrpclib will deserialize object as a list not a tuple
1331 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1332
1333 # Because we are using xmlrpclib for serialization instead of
1334 # pickle this will cause a serialization error.
1335 self.assertRaises(Exception, queue.put, time.sleep)
1336
1337 # Make queue finalizer run before the server is stopped
1338 del queue
1339 manager.shutdown()
1340
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001341class _TestManagerRestart(BaseTestCase):
1342
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001343 @classmethod
1344 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001345 manager = QueueManager(
1346 address=address, authkey=authkey, serializer=SERIALIZER)
1347 manager.connect()
1348 queue = manager.get_queue()
1349 queue.put('hello world')
1350
1351 def test_rapid_restart(self):
1352 authkey = os.urandom(32)
1353 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001354 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001355 srvr = manager.get_server()
1356 addr = srvr.address
1357 # Close the connection.Listener socket which gets opened as a part
1358 # of manager.get_server(). It's not needed for the test.
1359 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001360 manager.start()
1361
1362 p = self.Process(target=self._putter, args=(manager.address, authkey))
1363 p.start()
1364 queue = manager.get_queue()
1365 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001366 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001367 manager.shutdown()
1368 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001369 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001370 try:
1371 manager.start()
1372 except IOError as e:
1373 if e.errno != errno.EADDRINUSE:
1374 raise
1375 # Retry after some time, in case the old socket was lingering
1376 # (sporadic failure on buildbots)
1377 time.sleep(1.0)
1378 manager = QueueManager(
1379 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001380 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001381
Benjamin Petersone711caf2008-06-11 16:44:04 +00001382#
1383#
1384#
1385
1386SENTINEL = latin('')
1387
1388class _TestConnection(BaseTestCase):
1389
1390 ALLOWED_TYPES = ('processes', 'threads')
1391
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001392 @classmethod
1393 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001394 for msg in iter(conn.recv_bytes, SENTINEL):
1395 conn.send_bytes(msg)
1396 conn.close()
1397
1398 def test_connection(self):
1399 conn, child_conn = self.Pipe()
1400
1401 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001402 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001403 p.start()
1404
1405 seq = [1, 2.25, None]
1406 msg = latin('hello world')
1407 longmsg = msg * 10
1408 arr = array.array('i', list(range(4)))
1409
1410 if self.TYPE == 'processes':
1411 self.assertEqual(type(conn.fileno()), int)
1412
1413 self.assertEqual(conn.send(seq), None)
1414 self.assertEqual(conn.recv(), seq)
1415
1416 self.assertEqual(conn.send_bytes(msg), None)
1417 self.assertEqual(conn.recv_bytes(), msg)
1418
1419 if self.TYPE == 'processes':
1420 buffer = array.array('i', [0]*10)
1421 expected = list(arr) + [0] * (10 - len(arr))
1422 self.assertEqual(conn.send_bytes(arr), None)
1423 self.assertEqual(conn.recv_bytes_into(buffer),
1424 len(arr) * buffer.itemsize)
1425 self.assertEqual(list(buffer), expected)
1426
1427 buffer = array.array('i', [0]*10)
1428 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1429 self.assertEqual(conn.send_bytes(arr), None)
1430 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1431 len(arr) * buffer.itemsize)
1432 self.assertEqual(list(buffer), expected)
1433
1434 buffer = bytearray(latin(' ' * 40))
1435 self.assertEqual(conn.send_bytes(longmsg), None)
1436 try:
1437 res = conn.recv_bytes_into(buffer)
1438 except multiprocessing.BufferTooShort as e:
1439 self.assertEqual(e.args, (longmsg,))
1440 else:
1441 self.fail('expected BufferTooShort, got %s' % res)
1442
1443 poll = TimingWrapper(conn.poll)
1444
1445 self.assertEqual(poll(), False)
1446 self.assertTimingAlmostEqual(poll.elapsed, 0)
1447
1448 self.assertEqual(poll(TIMEOUT1), False)
1449 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1450
1451 conn.send(None)
1452
1453 self.assertEqual(poll(TIMEOUT1), True)
1454 self.assertTimingAlmostEqual(poll.elapsed, 0)
1455
1456 self.assertEqual(conn.recv(), None)
1457
1458 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1459 conn.send_bytes(really_big_msg)
1460 self.assertEqual(conn.recv_bytes(), really_big_msg)
1461
1462 conn.send_bytes(SENTINEL) # tell child to quit
1463 child_conn.close()
1464
1465 if self.TYPE == 'processes':
1466 self.assertEqual(conn.readable, True)
1467 self.assertEqual(conn.writable, True)
1468 self.assertRaises(EOFError, conn.recv)
1469 self.assertRaises(EOFError, conn.recv_bytes)
1470
1471 p.join()
1472
1473 def test_duplex_false(self):
1474 reader, writer = self.Pipe(duplex=False)
1475 self.assertEqual(writer.send(1), None)
1476 self.assertEqual(reader.recv(), 1)
1477 if self.TYPE == 'processes':
1478 self.assertEqual(reader.readable, True)
1479 self.assertEqual(reader.writable, False)
1480 self.assertEqual(writer.readable, False)
1481 self.assertEqual(writer.writable, True)
1482 self.assertRaises(IOError, reader.send, 2)
1483 self.assertRaises(IOError, writer.recv)
1484 self.assertRaises(IOError, writer.poll)
1485
1486 def test_spawn_close(self):
1487 # We test that a pipe connection can be closed by parent
1488 # process immediately after child is spawned. On Windows this
1489 # would have sometimes failed on old versions because
1490 # child_conn would be closed before the child got a chance to
1491 # duplicate it.
1492 conn, child_conn = self.Pipe()
1493
1494 p = self.Process(target=self._echo, args=(child_conn,))
1495 p.start()
1496 child_conn.close() # this might complete before child initializes
1497
1498 msg = latin('hello')
1499 conn.send_bytes(msg)
1500 self.assertEqual(conn.recv_bytes(), msg)
1501
1502 conn.send_bytes(SENTINEL)
1503 conn.close()
1504 p.join()
1505
1506 def test_sendbytes(self):
1507 if self.TYPE != 'processes':
1508 return
1509
1510 msg = latin('abcdefghijklmnopqrstuvwxyz')
1511 a, b = self.Pipe()
1512
1513 a.send_bytes(msg)
1514 self.assertEqual(b.recv_bytes(), msg)
1515
1516 a.send_bytes(msg, 5)
1517 self.assertEqual(b.recv_bytes(), msg[5:])
1518
1519 a.send_bytes(msg, 7, 8)
1520 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1521
1522 a.send_bytes(msg, 26)
1523 self.assertEqual(b.recv_bytes(), latin(''))
1524
1525 a.send_bytes(msg, 26, 0)
1526 self.assertEqual(b.recv_bytes(), latin(''))
1527
1528 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1529
1530 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1531
1532 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1533
1534 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1535
1536 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1537
Benjamin Petersone711caf2008-06-11 16:44:04 +00001538class _TestListenerClient(BaseTestCase):
1539
1540 ALLOWED_TYPES = ('processes', 'threads')
1541
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001542 @classmethod
1543 def _test(cls, address):
1544 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001545 conn.send('hello')
1546 conn.close()
1547
1548 def test_listener_client(self):
1549 for family in self.connection.families:
1550 l = self.connection.Listener(family=family)
1551 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001552 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001553 p.start()
1554 conn = l.accept()
1555 self.assertEqual(conn.recv(), 'hello')
1556 p.join()
1557 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001558#
1559# Test of sending connection and socket objects between processes
1560#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001561"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001562class _TestPicklingConnections(BaseTestCase):
1563
1564 ALLOWED_TYPES = ('processes',)
1565
1566 def _listener(self, conn, families):
1567 for fam in families:
1568 l = self.connection.Listener(family=fam)
1569 conn.send(l.address)
1570 new_conn = l.accept()
1571 conn.send(new_conn)
1572
1573 if self.TYPE == 'processes':
1574 l = socket.socket()
1575 l.bind(('localhost', 0))
1576 conn.send(l.getsockname())
1577 l.listen(1)
1578 new_conn, addr = l.accept()
1579 conn.send(new_conn)
1580
1581 conn.recv()
1582
1583 def _remote(self, conn):
1584 for (address, msg) in iter(conn.recv, None):
1585 client = self.connection.Client(address)
1586 client.send(msg.upper())
1587 client.close()
1588
1589 if self.TYPE == 'processes':
1590 address, msg = conn.recv()
1591 client = socket.socket()
1592 client.connect(address)
1593 client.sendall(msg.upper())
1594 client.close()
1595
1596 conn.close()
1597
1598 def test_pickling(self):
1599 try:
1600 multiprocessing.allow_connection_pickling()
1601 except ImportError:
1602 return
1603
1604 families = self.connection.families
1605
1606 lconn, lconn0 = self.Pipe()
1607 lp = self.Process(target=self._listener, args=(lconn0, families))
1608 lp.start()
1609 lconn0.close()
1610
1611 rconn, rconn0 = self.Pipe()
1612 rp = self.Process(target=self._remote, args=(rconn0,))
1613 rp.start()
1614 rconn0.close()
1615
1616 for fam in families:
1617 msg = ('This connection uses family %s' % fam).encode('ascii')
1618 address = lconn.recv()
1619 rconn.send((address, msg))
1620 new_conn = lconn.recv()
1621 self.assertEqual(new_conn.recv(), msg.upper())
1622
1623 rconn.send(None)
1624
1625 if self.TYPE == 'processes':
1626 msg = latin('This connection uses a normal socket')
1627 address = lconn.recv()
1628 rconn.send((address, msg))
1629 if hasattr(socket, 'fromfd'):
1630 new_conn = lconn.recv()
1631 self.assertEqual(new_conn.recv(100), msg.upper())
1632 else:
1633 # XXX On Windows with Py2.6 need to backport fromfd()
1634 discard = lconn.recv_bytes()
1635
1636 lconn.send(None)
1637
1638 rconn.close()
1639 lconn.close()
1640
1641 lp.join()
1642 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001643"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001644#
1645#
1646#
1647
1648class _TestHeap(BaseTestCase):
1649
1650 ALLOWED_TYPES = ('processes',)
1651
1652 def test_heap(self):
1653 iterations = 5000
1654 maxblocks = 50
1655 blocks = []
1656
1657 # create and destroy lots of blocks of different sizes
1658 for i in range(iterations):
1659 size = int(random.lognormvariate(0, 1) * 1000)
1660 b = multiprocessing.heap.BufferWrapper(size)
1661 blocks.append(b)
1662 if len(blocks) > maxblocks:
1663 i = random.randrange(maxblocks)
1664 del blocks[i]
1665
1666 # get the heap object
1667 heap = multiprocessing.heap.BufferWrapper._heap
1668
1669 # verify the state of the heap
1670 all = []
1671 occupied = 0
1672 for L in list(heap._len_to_seq.values()):
1673 for arena, start, stop in L:
1674 all.append((heap._arenas.index(arena), start, stop,
1675 stop-start, 'free'))
1676 for arena, start, stop in heap._allocated_blocks:
1677 all.append((heap._arenas.index(arena), start, stop,
1678 stop-start, 'occupied'))
1679 occupied += (stop-start)
1680
1681 all.sort()
1682
1683 for i in range(len(all)-1):
1684 (arena, start, stop) = all[i][:3]
1685 (narena, nstart, nstop) = all[i+1][:3]
1686 self.assertTrue((arena != narena and nstart == 0) or
1687 (stop == nstart))
1688
1689#
1690#
1691#
1692
Benjamin Petersone711caf2008-06-11 16:44:04 +00001693class _Foo(Structure):
1694 _fields_ = [
1695 ('x', c_int),
1696 ('y', c_double)
1697 ]
1698
1699class _TestSharedCTypes(BaseTestCase):
1700
1701 ALLOWED_TYPES = ('processes',)
1702
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001703 def setUp(self):
1704 if not HAS_SHAREDCTYPES:
1705 self.skipTest("requires multiprocessing.sharedctypes")
1706
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001707 @classmethod
1708 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001709 x.value *= 2
1710 y.value *= 2
1711 foo.x *= 2
1712 foo.y *= 2
1713 string.value *= 2
1714 for i in range(len(arr)):
1715 arr[i] *= 2
1716
1717 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001718 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001719 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001720 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001721 arr = self.Array('d', list(range(10)), lock=lock)
1722 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001723 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001724
1725 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1726 p.start()
1727 p.join()
1728
1729 self.assertEqual(x.value, 14)
1730 self.assertAlmostEqual(y.value, 2.0/3.0)
1731 self.assertEqual(foo.x, 6)
1732 self.assertAlmostEqual(foo.y, 4.0)
1733 for i in range(10):
1734 self.assertAlmostEqual(arr[i], i*2)
1735 self.assertEqual(string.value, latin('hellohello'))
1736
1737 def test_synchronize(self):
1738 self.test_sharedctypes(lock=True)
1739
1740 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001741 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001742 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001743 foo.x = 0
1744 foo.y = 0
1745 self.assertEqual(bar.x, 2)
1746 self.assertAlmostEqual(bar.y, 5.0)
1747
1748#
1749#
1750#
1751
1752class _TestFinalize(BaseTestCase):
1753
1754 ALLOWED_TYPES = ('processes',)
1755
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001756 @classmethod
1757 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001758 class Foo(object):
1759 pass
1760
1761 a = Foo()
1762 util.Finalize(a, conn.send, args=('a',))
1763 del a # triggers callback for a
1764
1765 b = Foo()
1766 close_b = util.Finalize(b, conn.send, args=('b',))
1767 close_b() # triggers callback for b
1768 close_b() # does nothing because callback has already been called
1769 del b # does nothing because callback has already been called
1770
1771 c = Foo()
1772 util.Finalize(c, conn.send, args=('c',))
1773
1774 d10 = Foo()
1775 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1776
1777 d01 = Foo()
1778 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1779 d02 = Foo()
1780 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1781 d03 = Foo()
1782 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1783
1784 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1785
1786 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1787
Ezio Melotti13925002011-03-16 11:05:33 +02001788 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001789 # garbage collecting locals
1790 util._exit_function()
1791 conn.close()
1792 os._exit(0)
1793
1794 def test_finalize(self):
1795 conn, child_conn = self.Pipe()
1796
1797 p = self.Process(target=self._test_finalize, args=(child_conn,))
1798 p.start()
1799 p.join()
1800
1801 result = [obj for obj in iter(conn.recv, 'STOP')]
1802 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1803
1804#
1805# Test that from ... import * works for each module
1806#
1807
1808class _TestImportStar(BaseTestCase):
1809
1810 ALLOWED_TYPES = ('processes',)
1811
1812 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001813 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001814 'multiprocessing', 'multiprocessing.connection',
1815 'multiprocessing.heap', 'multiprocessing.managers',
1816 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001817 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001818 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001819 ]
1820
1821 if c_int is not None:
1822 # This module requires _ctypes
1823 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001824
1825 for name in modules:
1826 __import__(name)
1827 mod = sys.modules[name]
1828
1829 for attr in getattr(mod, '__all__', ()):
1830 self.assertTrue(
1831 hasattr(mod, attr),
1832 '%r does not have attribute %r' % (mod, attr)
1833 )
1834
1835#
1836# Quick test that logging works -- does not test logging output
1837#
1838
1839class _TestLogging(BaseTestCase):
1840
1841 ALLOWED_TYPES = ('processes',)
1842
1843 def test_enable_logging(self):
1844 logger = multiprocessing.get_logger()
1845 logger.setLevel(util.SUBWARNING)
1846 self.assertTrue(logger is not None)
1847 logger.debug('this will not be printed')
1848 logger.info('nor will this')
1849 logger.setLevel(LOG_LEVEL)
1850
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001851 @classmethod
1852 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001853 logger = multiprocessing.get_logger()
1854 conn.send(logger.getEffectiveLevel())
1855
1856 def test_level(self):
1857 LEVEL1 = 32
1858 LEVEL2 = 37
1859
1860 logger = multiprocessing.get_logger()
1861 root_logger = logging.getLogger()
1862 root_level = root_logger.level
1863
1864 reader, writer = multiprocessing.Pipe(duplex=False)
1865
1866 logger.setLevel(LEVEL1)
1867 self.Process(target=self._test_level, args=(writer,)).start()
1868 self.assertEqual(LEVEL1, reader.recv())
1869
1870 logger.setLevel(logging.NOTSET)
1871 root_logger.setLevel(LEVEL2)
1872 self.Process(target=self._test_level, args=(writer,)).start()
1873 self.assertEqual(LEVEL2, reader.recv())
1874
1875 root_logger.setLevel(root_level)
1876 logger.setLevel(level=LOG_LEVEL)
1877
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001878
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001879# class _TestLoggingProcessName(BaseTestCase):
1880#
1881# def handle(self, record):
1882# assert record.processName == multiprocessing.current_process().name
1883# self.__handled = True
1884#
1885# def test_logging(self):
1886# handler = logging.Handler()
1887# handler.handle = self.handle
1888# self.__handled = False
1889# # Bypass getLogger() and side-effects
1890# logger = logging.getLoggerClass()(
1891# 'multiprocessing.test.TestLoggingProcessName')
1892# logger.addHandler(handler)
1893# logger.propagate = False
1894#
1895# logger.warn('foo')
1896# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001897
Benjamin Petersone711caf2008-06-11 16:44:04 +00001898#
Jesse Noller6214edd2009-01-19 16:23:53 +00001899# Test to verify handle verification, see issue 3321
1900#
1901
1902class TestInvalidHandle(unittest.TestCase):
1903
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001904 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001905 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001906 conn = _multiprocessing.Connection(44977608)
1907 self.assertRaises(IOError, conn.poll)
1908 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001909
Jesse Noller6214edd2009-01-19 16:23:53 +00001910#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001911# Functions used to create test cases from the base ones in this module
1912#
1913
1914def get_attributes(Source, names):
1915 d = {}
1916 for name in names:
1917 obj = getattr(Source, name)
1918 if type(obj) == type(get_attributes):
1919 obj = staticmethod(obj)
1920 d[name] = obj
1921 return d
1922
1923def create_test_cases(Mixin, type):
1924 result = {}
1925 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001926 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001927
1928 for name in list(glob.keys()):
1929 if name.startswith('_Test'):
1930 base = glob[name]
1931 if type in base.ALLOWED_TYPES:
1932 newname = 'With' + Type + name[1:]
1933 class Temp(base, unittest.TestCase, Mixin):
1934 pass
1935 result[newname] = Temp
1936 Temp.__name__ = newname
1937 Temp.__module__ = Mixin.__module__
1938 return result
1939
1940#
1941# Create test cases
1942#
1943
1944class ProcessesMixin(object):
1945 TYPE = 'processes'
1946 Process = multiprocessing.Process
1947 locals().update(get_attributes(multiprocessing, (
1948 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1949 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1950 'RawArray', 'current_process', 'active_children', 'Pipe',
1951 'connection', 'JoinableQueue'
1952 )))
1953
1954testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1955globals().update(testcases_processes)
1956
1957
1958class ManagerMixin(object):
1959 TYPE = 'manager'
1960 Process = multiprocessing.Process
1961 manager = object.__new__(multiprocessing.managers.SyncManager)
1962 locals().update(get_attributes(manager, (
1963 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1964 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1965 'Namespace', 'JoinableQueue'
1966 )))
1967
1968testcases_manager = create_test_cases(ManagerMixin, type='manager')
1969globals().update(testcases_manager)
1970
1971
1972class ThreadsMixin(object):
1973 TYPE = 'threads'
1974 Process = multiprocessing.dummy.Process
1975 locals().update(get_attributes(multiprocessing.dummy, (
1976 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1977 'Condition', 'Event', 'Value', 'Array', 'current_process',
1978 'active_children', 'Pipe', 'connection', 'dict', 'list',
1979 'Namespace', 'JoinableQueue'
1980 )))
1981
1982testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1983globals().update(testcases_threads)
1984
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001985class OtherTest(unittest.TestCase):
1986 # TODO: add more tests for deliver/answer challenge.
1987 def test_deliver_challenge_auth_failure(self):
1988 class _FakeConnection(object):
1989 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001990 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001991 def send_bytes(self, data):
1992 pass
1993 self.assertRaises(multiprocessing.AuthenticationError,
1994 multiprocessing.connection.deliver_challenge,
1995 _FakeConnection(), b'abc')
1996
1997 def test_answer_challenge_auth_failure(self):
1998 class _FakeConnection(object):
1999 def __init__(self):
2000 self.count = 0
2001 def recv_bytes(self, size):
2002 self.count += 1
2003 if self.count == 1:
2004 return multiprocessing.connection.CHALLENGE
2005 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002006 return b'something bogus'
2007 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002008 def send_bytes(self, data):
2009 pass
2010 self.assertRaises(multiprocessing.AuthenticationError,
2011 multiprocessing.connection.answer_challenge,
2012 _FakeConnection(), b'abc')
2013
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002014#
2015# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2016#
2017
2018def initializer(ns):
2019 ns.test += 1
2020
2021class TestInitializers(unittest.TestCase):
2022 def setUp(self):
2023 self.mgr = multiprocessing.Manager()
2024 self.ns = self.mgr.Namespace()
2025 self.ns.test = 0
2026
2027 def tearDown(self):
2028 self.mgr.shutdown()
2029
2030 def test_manager_initializer(self):
2031 m = multiprocessing.managers.SyncManager()
2032 self.assertRaises(TypeError, m.start, 1)
2033 m.start(initializer, (self.ns,))
2034 self.assertEqual(self.ns.test, 1)
2035 m.shutdown()
2036
2037 def test_pool_initializer(self):
2038 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2039 p = multiprocessing.Pool(1, initializer, (self.ns,))
2040 p.close()
2041 p.join()
2042 self.assertEqual(self.ns.test, 1)
2043
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002044#
2045# Issue 5155, 5313, 5331: Test process in processes
2046# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2047#
2048
2049def _ThisSubProcess(q):
2050 try:
2051 item = q.get(block=False)
2052 except pyqueue.Empty:
2053 pass
2054
2055def _TestProcess(q):
2056 queue = multiprocessing.Queue()
2057 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2058 subProc.start()
2059 subProc.join()
2060
2061def _afunc(x):
2062 return x*x
2063
2064def pool_in_process():
2065 pool = multiprocessing.Pool(processes=4)
2066 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2067
2068class _file_like(object):
2069 def __init__(self, delegate):
2070 self._delegate = delegate
2071 self._pid = None
2072
2073 @property
2074 def cache(self):
2075 pid = os.getpid()
2076 # There are no race conditions since fork keeps only the running thread
2077 if pid != self._pid:
2078 self._pid = pid
2079 self._cache = []
2080 return self._cache
2081
2082 def write(self, data):
2083 self.cache.append(data)
2084
2085 def flush(self):
2086 self._delegate.write(''.join(self.cache))
2087 self._cache = []
2088
2089class TestStdinBadfiledescriptor(unittest.TestCase):
2090
2091 def test_queue_in_process(self):
2092 queue = multiprocessing.Queue()
2093 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2094 proc.start()
2095 proc.join()
2096
2097 def test_pool_in_process(self):
2098 p = multiprocessing.Process(target=pool_in_process)
2099 p.start()
2100 p.join()
2101
2102 def test_flushing(self):
2103 sio = io.StringIO()
2104 flike = _file_like(sio)
2105 flike.write('foo')
2106 proc = multiprocessing.Process(target=lambda: flike.flush())
2107 flike.flush()
2108 assert sio.getvalue() == 'foo'
2109
2110testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2111 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002112
Benjamin Petersone711caf2008-06-11 16:44:04 +00002113#
2114#
2115#
2116
2117def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002118 if sys.platform.startswith("linux"):
2119 try:
2120 lock = multiprocessing.RLock()
2121 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002122 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002123
Benjamin Petersone711caf2008-06-11 16:44:04 +00002124 if run is None:
2125 from test.support import run_unittest as run
2126
2127 util.get_temp_dir() # creates temp directory for use by all processes
2128
2129 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2130
Benjamin Peterson41181742008-07-02 20:22:54 +00002131 ProcessesMixin.pool = multiprocessing.Pool(4)
2132 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2133 ManagerMixin.manager.__init__()
2134 ManagerMixin.manager.start()
2135 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002136
2137 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002138 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2139 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002140 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2141 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002142 )
2143
2144 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2145 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2146 run(suite)
2147
Benjamin Peterson41181742008-07-02 20:22:54 +00002148 ThreadsMixin.pool.terminate()
2149 ProcessesMixin.pool.terminate()
2150 ManagerMixin.pool.terminate()
2151 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002152
Benjamin Peterson41181742008-07-02 20:22:54 +00002153 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002154
2155def main():
2156 test_main(unittest.TextTestRunner(verbosity=2).run)
2157
2158if __name__ == '__main__':
2159 main()