blob: 654cffb69252e29a0f94d4a2590ccaa33fe46653 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47#
48#
49
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000050def latin(s):
51 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
Benjamin Petersone711caf2008-06-11 16:44:04 +000053#
54# Constants
55#
56
57LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000058#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60DELTA = 0.1
61CHECK_TIMINGS = False # making true makes tests take a lot longer
62 # and can sometimes cause some non-serious
63 # failures because some calls block a bit
64 # longer than expected
65if CHECK_TIMINGS:
66 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
67else:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
69
70HAVE_GETVALUE = not getattr(_multiprocessing,
71 'HAVE_BROKEN_SEM_GETVALUE', False)
72
Jesse Noller6214edd2009-01-19 16:23:53 +000073WIN32 = (sys.platform == "win32")
74
Benjamin Petersone711caf2008-06-11 16:44:04 +000075#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000076# Some tests require ctypes
77#
78
79try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000080 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000081except ImportError:
82 Structure = object
83 c_int = c_double = None
84
85#
Benjamin Petersone711caf2008-06-11 16:44:04 +000086# Creates a wrapper for a function which records the time it takes to finish
87#
88
89class TimingWrapper(object):
90
91 def __init__(self, func):
92 self.func = func
93 self.elapsed = None
94
95 def __call__(self, *args, **kwds):
96 t = time.time()
97 try:
98 return self.func(*args, **kwds)
99 finally:
100 self.elapsed = time.time() - t
101
102#
103# Base class for test cases
104#
105
106class BaseTestCase(object):
107
108 ALLOWED_TYPES = ('processes', 'manager', 'threads')
109
110 def assertTimingAlmostEqual(self, a, b):
111 if CHECK_TIMINGS:
112 self.assertAlmostEqual(a, b, 1)
113
114 def assertReturnsIfImplemented(self, value, func, *args):
115 try:
116 res = func(*args)
117 except NotImplementedError:
118 pass
119 else:
120 return self.assertEqual(value, res)
121
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000122 # For the sanity of Windows users, rather than crashing or freezing in
123 # multiple ways.
124 def __reduce__(self, *args):
125 raise NotImplementedError("shouldn't try to pickle a test case")
126
127 __reduce_ex__ = __reduce__
128
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129#
130# Return the value of a semaphore
131#
132
133def get_value(self):
134 try:
135 return self.get_value()
136 except AttributeError:
137 try:
138 return self._Semaphore__value
139 except AttributeError:
140 try:
141 return self._value
142 except AttributeError:
143 raise NotImplementedError
144
145#
146# Testcases
147#
148
149class _TestProcess(BaseTestCase):
150
151 ALLOWED_TYPES = ('processes', 'threads')
152
153 def test_current(self):
154 if self.TYPE == 'threads':
155 return
156
157 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159
160 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000161 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000162 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEqual(current.ident, os.getpid())
165 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000167 @classmethod
168 def _test(cls, q, *args, **kwds):
169 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 q.put(args)
171 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000172 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000173 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 q.put(current.pid)
176
177 def test_process(self):
178 q = self.Queue(1)
179 e = self.Event()
180 args = (q, 1, 2)
181 kwargs = {'hello':23, 'bye':2.54}
182 name = 'SomeProcess'
183 p = self.Process(
184 target=self._test, args=args, kwargs=kwargs, name=name
185 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 current = self.current_process()
188
189 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(p.authkey, current.authkey)
191 self.assertEqual(p.is_alive(), False)
192 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000193 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196
197 p.start()
198
Ezio Melottib3aedd42010-11-20 19:04:17 +0000199 self.assertEqual(p.exitcode, None)
200 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000201 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
Ezio Melottib3aedd42010-11-20 19:04:17 +0000203 self.assertEqual(q.get(), args[1:])
204 self.assertEqual(q.get(), kwargs)
205 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000207 self.assertEqual(q.get(), current.authkey)
208 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 p.join()
211
Ezio Melottib3aedd42010-11-20 19:04:17 +0000212 self.assertEqual(p.exitcode, 0)
213 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000214 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000216 @classmethod
217 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 time.sleep(1000)
219
220 def test_terminate(self):
221 if self.TYPE == 'threads':
222 return
223
224 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 p.start()
227
228 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000229 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
232 p.terminate()
233
234 join = TimingWrapper(p.join)
235 self.assertEqual(join(), None)
236 self.assertTimingAlmostEqual(join.elapsed, 0.0)
237
238 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000239 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240
241 p.join()
242
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000243 # XXX sometimes get p.exitcode == 0 on Windows ...
244 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245
246 def test_cpu_count(self):
247 try:
248 cpus = multiprocessing.cpu_count()
249 except NotImplementedError:
250 cpus = 1
251 self.assertTrue(type(cpus) is int)
252 self.assertTrue(cpus >= 1)
253
254 def test_active_children(self):
255 self.assertEqual(type(self.active_children()), list)
256
257 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259
260 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000261 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262
263 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000264 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000266 @classmethod
267 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 from multiprocessing import forking
269 wconn.send(id)
270 if len(id) < 2:
271 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000272 p = cls.Process(
273 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274 )
275 p.start()
276 p.join()
277
278 def test_recursion(self):
279 rconn, wconn = self.Pipe(duplex=False)
280 self._test_recursion(wconn, [])
281
282 time.sleep(DELTA)
283 result = []
284 while rconn.poll():
285 result.append(rconn.recv())
286
287 expected = [
288 [],
289 [0],
290 [0, 0],
291 [0, 1],
292 [1],
293 [1, 0],
294 [1, 1]
295 ]
296 self.assertEqual(result, expected)
297
298#
299#
300#
301
302class _UpperCaser(multiprocessing.Process):
303
304 def __init__(self):
305 multiprocessing.Process.__init__(self)
306 self.child_conn, self.parent_conn = multiprocessing.Pipe()
307
308 def run(self):
309 self.parent_conn.close()
310 for s in iter(self.child_conn.recv, None):
311 self.child_conn.send(s.upper())
312 self.child_conn.close()
313
314 def submit(self, s):
315 assert type(s) is str
316 self.parent_conn.send(s)
317 return self.parent_conn.recv()
318
319 def stop(self):
320 self.parent_conn.send(None)
321 self.parent_conn.close()
322 self.child_conn.close()
323
324class _TestSubclassingProcess(BaseTestCase):
325
326 ALLOWED_TYPES = ('processes',)
327
328 def test_subclassing(self):
329 uppercaser = _UpperCaser()
330 uppercaser.start()
331 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
332 self.assertEqual(uppercaser.submit('world'), 'WORLD')
333 uppercaser.stop()
334 uppercaser.join()
335
336#
337#
338#
339
340def queue_empty(q):
341 if hasattr(q, 'empty'):
342 return q.empty()
343 else:
344 return q.qsize() == 0
345
346def queue_full(q, maxsize):
347 if hasattr(q, 'full'):
348 return q.full()
349 else:
350 return q.qsize() == maxsize
351
352
353class _TestQueue(BaseTestCase):
354
355
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000356 @classmethod
357 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000358 child_can_start.wait()
359 for i in range(6):
360 queue.get()
361 parent_can_continue.set()
362
363 def test_put(self):
364 MAXSIZE = 6
365 queue = self.Queue(maxsize=MAXSIZE)
366 child_can_start = self.Event()
367 parent_can_continue = self.Event()
368
369 proc = self.Process(
370 target=self._test_put,
371 args=(queue, child_can_start, parent_can_continue)
372 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000373 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000374 proc.start()
375
376 self.assertEqual(queue_empty(queue), True)
377 self.assertEqual(queue_full(queue, MAXSIZE), False)
378
379 queue.put(1)
380 queue.put(2, True)
381 queue.put(3, True, None)
382 queue.put(4, False)
383 queue.put(5, False, None)
384 queue.put_nowait(6)
385
386 # the values may be in buffer but not yet in pipe so sleep a bit
387 time.sleep(DELTA)
388
389 self.assertEqual(queue_empty(queue), False)
390 self.assertEqual(queue_full(queue, MAXSIZE), True)
391
392 put = TimingWrapper(queue.put)
393 put_nowait = TimingWrapper(queue.put_nowait)
394
395 self.assertRaises(pyqueue.Full, put, 7, False)
396 self.assertTimingAlmostEqual(put.elapsed, 0)
397
398 self.assertRaises(pyqueue.Full, put, 7, False, None)
399 self.assertTimingAlmostEqual(put.elapsed, 0)
400
401 self.assertRaises(pyqueue.Full, put_nowait, 7)
402 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
403
404 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
405 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
406
407 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
408 self.assertTimingAlmostEqual(put.elapsed, 0)
409
410 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
411 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 self.assertEqual(queue_empty(queue), True)
417 self.assertEqual(queue_full(queue, MAXSIZE), False)
418
419 proc.join()
420
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000421 @classmethod
422 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000424 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 queue.put(2)
426 queue.put(3)
427 queue.put(4)
428 queue.put(5)
429 parent_can_continue.set()
430
431 def test_get(self):
432 queue = self.Queue()
433 child_can_start = self.Event()
434 parent_can_continue = self.Event()
435
436 proc = self.Process(
437 target=self._test_get,
438 args=(queue, child_can_start, parent_can_continue)
439 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000440 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 proc.start()
442
443 self.assertEqual(queue_empty(queue), True)
444
445 child_can_start.set()
446 parent_can_continue.wait()
447
448 time.sleep(DELTA)
449 self.assertEqual(queue_empty(queue), False)
450
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000451 # Hangs unexpectedly, remove for now
452 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 self.assertEqual(queue.get(True, None), 2)
454 self.assertEqual(queue.get(True), 3)
455 self.assertEqual(queue.get(timeout=1), 4)
456 self.assertEqual(queue.get_nowait(), 5)
457
458 self.assertEqual(queue_empty(queue), True)
459
460 get = TimingWrapper(queue.get)
461 get_nowait = TimingWrapper(queue.get_nowait)
462
463 self.assertRaises(pyqueue.Empty, get, False)
464 self.assertTimingAlmostEqual(get.elapsed, 0)
465
466 self.assertRaises(pyqueue.Empty, get, False, None)
467 self.assertTimingAlmostEqual(get.elapsed, 0)
468
469 self.assertRaises(pyqueue.Empty, get_nowait)
470 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
471
472 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
473 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
474
475 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
476 self.assertTimingAlmostEqual(get.elapsed, 0)
477
478 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
479 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
480
481 proc.join()
482
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000483 @classmethod
484 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485 for i in range(10, 20):
486 queue.put(i)
487 # note that at this point the items may only be buffered, so the
488 # process cannot shutdown until the feeder thread has finished
489 # pushing items onto the pipe.
490
491 def test_fork(self):
492 # Old versions of Queue would fail to create a new feeder
493 # thread for a forked process if the original process had its
494 # own feeder thread. This test checks that this no longer
495 # happens.
496
497 queue = self.Queue()
498
499 # put items on queue so that main process starts a feeder thread
500 for i in range(10):
501 queue.put(i)
502
503 # wait to make sure thread starts before we fork a new process
504 time.sleep(DELTA)
505
506 # fork process
507 p = self.Process(target=self._test_fork, args=(queue,))
508 p.start()
509
510 # check that all expected items are in the queue
511 for i in range(20):
512 self.assertEqual(queue.get(), i)
513 self.assertRaises(pyqueue.Empty, queue.get, False)
514
515 p.join()
516
517 def test_qsize(self):
518 q = self.Queue()
519 try:
520 self.assertEqual(q.qsize(), 0)
521 except NotImplementedError:
522 return
523 q.put(1)
524 self.assertEqual(q.qsize(), 1)
525 q.put(5)
526 self.assertEqual(q.qsize(), 2)
527 q.get()
528 self.assertEqual(q.qsize(), 1)
529 q.get()
530 self.assertEqual(q.qsize(), 0)
531
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000532 @classmethod
533 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000534 for obj in iter(q.get, None):
535 time.sleep(DELTA)
536 q.task_done()
537
538 def test_task_done(self):
539 queue = self.JoinableQueue()
540
541 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000542 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000543
544 workers = [self.Process(target=self._test_task_done, args=(queue,))
545 for i in range(4)]
546
547 for p in workers:
548 p.start()
549
550 for i in range(10):
551 queue.put(i)
552
553 queue.join()
554
555 for p in workers:
556 queue.put(None)
557
558 for p in workers:
559 p.join()
560
561#
562#
563#
564
565class _TestLock(BaseTestCase):
566
567 def test_lock(self):
568 lock = self.Lock()
569 self.assertEqual(lock.acquire(), True)
570 self.assertEqual(lock.acquire(False), False)
571 self.assertEqual(lock.release(), None)
572 self.assertRaises((ValueError, threading.ThreadError), lock.release)
573
574 def test_rlock(self):
575 lock = self.RLock()
576 self.assertEqual(lock.acquire(), True)
577 self.assertEqual(lock.acquire(), True)
578 self.assertEqual(lock.acquire(), True)
579 self.assertEqual(lock.release(), None)
580 self.assertEqual(lock.release(), None)
581 self.assertEqual(lock.release(), None)
582 self.assertRaises((AssertionError, RuntimeError), lock.release)
583
Jesse Nollerf8d00852009-03-31 03:25:07 +0000584 def test_lock_context(self):
585 with self.Lock():
586 pass
587
Benjamin Petersone711caf2008-06-11 16:44:04 +0000588
589class _TestSemaphore(BaseTestCase):
590
591 def _test_semaphore(self, sem):
592 self.assertReturnsIfImplemented(2, get_value, sem)
593 self.assertEqual(sem.acquire(), True)
594 self.assertReturnsIfImplemented(1, get_value, sem)
595 self.assertEqual(sem.acquire(), True)
596 self.assertReturnsIfImplemented(0, get_value, sem)
597 self.assertEqual(sem.acquire(False), False)
598 self.assertReturnsIfImplemented(0, get_value, sem)
599 self.assertEqual(sem.release(), None)
600 self.assertReturnsIfImplemented(1, get_value, sem)
601 self.assertEqual(sem.release(), None)
602 self.assertReturnsIfImplemented(2, get_value, sem)
603
604 def test_semaphore(self):
605 sem = self.Semaphore(2)
606 self._test_semaphore(sem)
607 self.assertEqual(sem.release(), None)
608 self.assertReturnsIfImplemented(3, get_value, sem)
609 self.assertEqual(sem.release(), None)
610 self.assertReturnsIfImplemented(4, get_value, sem)
611
612 def test_bounded_semaphore(self):
613 sem = self.BoundedSemaphore(2)
614 self._test_semaphore(sem)
615 # Currently fails on OS/X
616 #if HAVE_GETVALUE:
617 # self.assertRaises(ValueError, sem.release)
618 # self.assertReturnsIfImplemented(2, get_value, sem)
619
620 def test_timeout(self):
621 if self.TYPE != 'processes':
622 return
623
624 sem = self.Semaphore(0)
625 acquire = TimingWrapper(sem.acquire)
626
627 self.assertEqual(acquire(False), False)
628 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
629
630 self.assertEqual(acquire(False, None), False)
631 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
632
633 self.assertEqual(acquire(False, TIMEOUT1), False)
634 self.assertTimingAlmostEqual(acquire.elapsed, 0)
635
636 self.assertEqual(acquire(True, TIMEOUT2), False)
637 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
638
639 self.assertEqual(acquire(timeout=TIMEOUT3), False)
640 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
641
642
643class _TestCondition(BaseTestCase):
644
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000645 @classmethod
646 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647 cond.acquire()
648 sleeping.release()
649 cond.wait(timeout)
650 woken.release()
651 cond.release()
652
653 def check_invariant(self, cond):
654 # this is only supposed to succeed when there are no sleepers
655 if self.TYPE == 'processes':
656 try:
657 sleepers = (cond._sleeping_count.get_value() -
658 cond._woken_count.get_value())
659 self.assertEqual(sleepers, 0)
660 self.assertEqual(cond._wait_semaphore.get_value(), 0)
661 except NotImplementedError:
662 pass
663
664 def test_notify(self):
665 cond = self.Condition()
666 sleeping = self.Semaphore(0)
667 woken = self.Semaphore(0)
668
669 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000670 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 p.start()
672
673 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000674 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000675 p.start()
676
677 # wait for both children to start sleeping
678 sleeping.acquire()
679 sleeping.acquire()
680
681 # check no process/thread has woken up
682 time.sleep(DELTA)
683 self.assertReturnsIfImplemented(0, get_value, woken)
684
685 # wake up one process/thread
686 cond.acquire()
687 cond.notify()
688 cond.release()
689
690 # check one process/thread has woken up
691 time.sleep(DELTA)
692 self.assertReturnsIfImplemented(1, get_value, woken)
693
694 # wake up another
695 cond.acquire()
696 cond.notify()
697 cond.release()
698
699 # check other has woken up
700 time.sleep(DELTA)
701 self.assertReturnsIfImplemented(2, get_value, woken)
702
703 # check state is not mucked up
704 self.check_invariant(cond)
705 p.join()
706
707 def test_notify_all(self):
708 cond = self.Condition()
709 sleeping = self.Semaphore(0)
710 woken = self.Semaphore(0)
711
712 # start some threads/processes which will timeout
713 for i in range(3):
714 p = self.Process(target=self.f,
715 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000716 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000717 p.start()
718
719 t = threading.Thread(target=self.f,
720 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000721 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000722 t.start()
723
724 # wait for them all to sleep
725 for i in range(6):
726 sleeping.acquire()
727
728 # check they have all timed out
729 for i in range(6):
730 woken.acquire()
731 self.assertReturnsIfImplemented(0, get_value, woken)
732
733 # check state is not mucked up
734 self.check_invariant(cond)
735
736 # start some more threads/processes
737 for i in range(3):
738 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000739 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 p.start()
741
742 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000743 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 t.start()
745
746 # wait for them to all sleep
747 for i in range(6):
748 sleeping.acquire()
749
750 # check no process/thread has woken up
751 time.sleep(DELTA)
752 self.assertReturnsIfImplemented(0, get_value, woken)
753
754 # wake them all up
755 cond.acquire()
756 cond.notify_all()
757 cond.release()
758
759 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200760 for i in range(10):
761 try:
762 if get_value(woken) == 6:
763 break
764 except NotImplementedError:
765 break
766 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000767 self.assertReturnsIfImplemented(6, get_value, woken)
768
769 # check state is not mucked up
770 self.check_invariant(cond)
771
772 def test_timeout(self):
773 cond = self.Condition()
774 wait = TimingWrapper(cond.wait)
775 cond.acquire()
776 res = wait(TIMEOUT1)
777 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000778 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
780
781
782class _TestEvent(BaseTestCase):
783
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000784 @classmethod
785 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 time.sleep(TIMEOUT2)
787 event.set()
788
789 def test_event(self):
790 event = self.Event()
791 wait = TimingWrapper(event.wait)
792
Ezio Melotti13925002011-03-16 11:05:33 +0200793 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000794 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000795 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000796
Benjamin Peterson965ce872009-04-05 21:24:58 +0000797 # Removed, threading.Event.wait() will return the value of the __flag
798 # instead of None. API Shear with the semaphore backed mp.Event
799 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000800 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000801 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
803
804 event.set()
805
806 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000807 self.assertEqual(event.is_set(), True)
808 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000809 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000810 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
812 # self.assertEqual(event.is_set(), True)
813
814 event.clear()
815
816 #self.assertEqual(event.is_set(), False)
817
818 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000819 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000820
821#
822#
823#
824
825class _TestValue(BaseTestCase):
826
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000827 ALLOWED_TYPES = ('processes',)
828
Benjamin Petersone711caf2008-06-11 16:44:04 +0000829 codes_values = [
830 ('i', 4343, 24234),
831 ('d', 3.625, -4.25),
832 ('h', -232, 234),
833 ('c', latin('x'), latin('y'))
834 ]
835
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000836 def setUp(self):
837 if not HAS_SHAREDCTYPES:
838 self.skipTest("requires multiprocessing.sharedctypes")
839
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000840 @classmethod
841 def _test(cls, values):
842 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000843 sv.value = cv[2]
844
845
846 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000847 if raw:
848 values = [self.RawValue(code, value)
849 for code, value, _ in self.codes_values]
850 else:
851 values = [self.Value(code, value)
852 for code, value, _ in self.codes_values]
853
854 for sv, cv in zip(values, self.codes_values):
855 self.assertEqual(sv.value, cv[1])
856
857 proc = self.Process(target=self._test, args=(values,))
858 proc.start()
859 proc.join()
860
861 for sv, cv in zip(values, self.codes_values):
862 self.assertEqual(sv.value, cv[2])
863
864 def test_rawvalue(self):
865 self.test_value(raw=True)
866
867 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868 val1 = self.Value('i', 5)
869 lock1 = val1.get_lock()
870 obj1 = val1.get_obj()
871
872 val2 = self.Value('i', 5, lock=None)
873 lock2 = val2.get_lock()
874 obj2 = val2.get_obj()
875
876 lock = self.Lock()
877 val3 = self.Value('i', 5, lock=lock)
878 lock3 = val3.get_lock()
879 obj3 = val3.get_obj()
880 self.assertEqual(lock, lock3)
881
Jesse Nollerb0516a62009-01-18 03:11:38 +0000882 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883 self.assertFalse(hasattr(arr4, 'get_lock'))
884 self.assertFalse(hasattr(arr4, 'get_obj'))
885
Jesse Nollerb0516a62009-01-18 03:11:38 +0000886 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
887
888 arr5 = self.RawValue('i', 5)
889 self.assertFalse(hasattr(arr5, 'get_lock'))
890 self.assertFalse(hasattr(arr5, 'get_obj'))
891
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892
893class _TestArray(BaseTestCase):
894
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000895 ALLOWED_TYPES = ('processes',)
896
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000897 @classmethod
898 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000899 for i in range(1, len(seq)):
900 seq[i] += seq[i-1]
901
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000902 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000903 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
905 if raw:
906 arr = self.RawArray('i', seq)
907 else:
908 arr = self.Array('i', seq)
909
910 self.assertEqual(len(arr), len(seq))
911 self.assertEqual(arr[3], seq[3])
912 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
913
914 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
915
916 self.assertEqual(list(arr[:]), seq)
917
918 self.f(seq)
919
920 p = self.Process(target=self.f, args=(arr,))
921 p.start()
922 p.join()
923
924 self.assertEqual(list(arr[:]), seq)
925
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000926 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000927 def test_array_from_size(self):
928 size = 10
929 # Test for zeroing (see issue #11675).
930 # The repetition below strengthens the test by increasing the chances
931 # of previously allocated non-zero memory being used for the new array
932 # on the 2nd and 3rd loops.
933 for _ in range(3):
934 arr = self.Array('i', size)
935 self.assertEqual(len(arr), size)
936 self.assertEqual(list(arr), [0] * size)
937 arr[:] = range(10)
938 self.assertEqual(list(arr), list(range(10)))
939 del arr
940
941 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942 def test_rawarray(self):
943 self.test_array(raw=True)
944
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000945 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000947 arr1 = self.Array('i', list(range(10)))
948 lock1 = arr1.get_lock()
949 obj1 = arr1.get_obj()
950
951 arr2 = self.Array('i', list(range(10)), lock=None)
952 lock2 = arr2.get_lock()
953 obj2 = arr2.get_obj()
954
955 lock = self.Lock()
956 arr3 = self.Array('i', list(range(10)), lock=lock)
957 lock3 = arr3.get_lock()
958 obj3 = arr3.get_obj()
959 self.assertEqual(lock, lock3)
960
Jesse Nollerb0516a62009-01-18 03:11:38 +0000961 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000962 self.assertFalse(hasattr(arr4, 'get_lock'))
963 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000964 self.assertRaises(AttributeError,
965 self.Array, 'i', range(10), lock='notalock')
966
967 arr5 = self.RawArray('i', range(10))
968 self.assertFalse(hasattr(arr5, 'get_lock'))
969 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970
971#
972#
973#
974
975class _TestContainers(BaseTestCase):
976
977 ALLOWED_TYPES = ('manager',)
978
979 def test_list(self):
980 a = self.list(list(range(10)))
981 self.assertEqual(a[:], list(range(10)))
982
983 b = self.list()
984 self.assertEqual(b[:], [])
985
986 b.extend(list(range(5)))
987 self.assertEqual(b[:], list(range(5)))
988
989 self.assertEqual(b[2], 2)
990 self.assertEqual(b[2:10], [2,3,4])
991
992 b *= 2
993 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
994
995 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
996
997 self.assertEqual(a[:], list(range(10)))
998
999 d = [a, b]
1000 e = self.list(d)
1001 self.assertEqual(
1002 e[:],
1003 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1004 )
1005
1006 f = self.list([a])
1007 a.append('hello')
1008 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1009
1010 def test_dict(self):
1011 d = self.dict()
1012 indices = list(range(65, 70))
1013 for i in indices:
1014 d[i] = chr(i)
1015 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1016 self.assertEqual(sorted(d.keys()), indices)
1017 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1018 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1019
1020 def test_namespace(self):
1021 n = self.Namespace()
1022 n.name = 'Bob'
1023 n.job = 'Builder'
1024 n._hidden = 'hidden'
1025 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1026 del n.job
1027 self.assertEqual(str(n), "Namespace(name='Bob')")
1028 self.assertTrue(hasattr(n, 'name'))
1029 self.assertTrue(not hasattr(n, 'job'))
1030
1031#
1032#
1033#
1034
1035def sqr(x, wait=0.0):
1036 time.sleep(wait)
1037 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001038
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039class _TestPool(BaseTestCase):
1040
1041 def test_apply(self):
1042 papply = self.pool.apply
1043 self.assertEqual(papply(sqr, (5,)), sqr(5))
1044 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1045
1046 def test_map(self):
1047 pmap = self.pool.map
1048 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1049 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1050 list(map(sqr, list(range(100)))))
1051
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001052 def test_map_chunksize(self):
1053 try:
1054 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1055 except multiprocessing.TimeoutError:
1056 self.fail("pool.map_async with chunksize stalled on null list")
1057
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058 def test_async(self):
1059 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1060 get = TimingWrapper(res.get)
1061 self.assertEqual(get(), 49)
1062 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1063
1064 def test_async_timeout(self):
1065 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1066 get = TimingWrapper(res.get)
1067 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1068 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1069
1070 def test_imap(self):
1071 it = self.pool.imap(sqr, list(range(10)))
1072 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1073
1074 it = self.pool.imap(sqr, list(range(10)))
1075 for i in range(10):
1076 self.assertEqual(next(it), i*i)
1077 self.assertRaises(StopIteration, it.__next__)
1078
1079 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1080 for i in range(1000):
1081 self.assertEqual(next(it), i*i)
1082 self.assertRaises(StopIteration, it.__next__)
1083
1084 def test_imap_unordered(self):
1085 it = self.pool.imap_unordered(sqr, list(range(1000)))
1086 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1087
1088 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1089 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1090
1091 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001092 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1093 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1094
Benjamin Petersone711caf2008-06-11 16:44:04 +00001095 p = multiprocessing.Pool(3)
1096 self.assertEqual(3, len(p._pool))
1097 p.close()
1098 p.join()
1099
1100 def test_terminate(self):
1101 if self.TYPE == 'manager':
1102 # On Unix a forked process increfs each shared object to
1103 # which its parent process held a reference. If the
1104 # forked process gets terminated then there is likely to
1105 # be a reference leak. So to prevent
1106 # _TestZZZNumberOfObjects from failing we skip this test
1107 # when using a manager.
1108 return
1109
1110 result = self.pool.map_async(
1111 time.sleep, [0.1 for i in range(10000)], chunksize=1
1112 )
1113 self.pool.terminate()
1114 join = TimingWrapper(self.pool.join)
1115 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001116 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001117
Ask Solem2afcbf22010-11-09 20:55:52 +00001118def raising():
1119 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001120
Ask Solem2afcbf22010-11-09 20:55:52 +00001121def unpickleable_result():
1122 return lambda: 42
1123
1124class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001125 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001126
1127 def test_async_error_callback(self):
1128 p = multiprocessing.Pool(2)
1129
1130 scratchpad = [None]
1131 def errback(exc):
1132 scratchpad[0] = exc
1133
1134 res = p.apply_async(raising, error_callback=errback)
1135 self.assertRaises(KeyError, res.get)
1136 self.assertTrue(scratchpad[0])
1137 self.assertIsInstance(scratchpad[0], KeyError)
1138
1139 p.close()
1140 p.join()
1141
1142 def test_unpickleable_result(self):
1143 from multiprocessing.pool import MaybeEncodingError
1144 p = multiprocessing.Pool(2)
1145
1146 # Make sure we don't lose pool processes because of encoding errors.
1147 for iteration in range(20):
1148
1149 scratchpad = [None]
1150 def errback(exc):
1151 scratchpad[0] = exc
1152
1153 res = p.apply_async(unpickleable_result, error_callback=errback)
1154 self.assertRaises(MaybeEncodingError, res.get)
1155 wrapped = scratchpad[0]
1156 self.assertTrue(wrapped)
1157 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1158 self.assertIsNotNone(wrapped.exc)
1159 self.assertIsNotNone(wrapped.value)
1160
1161 p.close()
1162 p.join()
1163
1164class _TestPoolWorkerLifetime(BaseTestCase):
1165 ALLOWED_TYPES = ('processes', )
1166
Jesse Noller1f0b6582010-01-27 03:36:01 +00001167 def test_pool_worker_lifetime(self):
1168 p = multiprocessing.Pool(3, maxtasksperchild=10)
1169 self.assertEqual(3, len(p._pool))
1170 origworkerpids = [w.pid for w in p._pool]
1171 # Run many tasks so each worker gets replaced (hopefully)
1172 results = []
1173 for i in range(100):
1174 results.append(p.apply_async(sqr, (i, )))
1175 # Fetch the results and verify we got the right answers,
1176 # also ensuring all the tasks have completed.
1177 for (j, res) in enumerate(results):
1178 self.assertEqual(res.get(), sqr(j))
1179 # Refill the pool
1180 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001181 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001182 # (countdown * DELTA = 5 seconds max startup process time)
1183 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001184 while countdown and not all(w.is_alive() for w in p._pool):
1185 countdown -= 1
1186 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001187 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001188 # All pids should be assigned. See issue #7805.
1189 self.assertNotIn(None, origworkerpids)
1190 self.assertNotIn(None, finalworkerpids)
1191 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001192 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1193 p.close()
1194 p.join()
1195
Benjamin Petersone711caf2008-06-11 16:44:04 +00001196#
1197# Test that manager has expected number of shared objects left
1198#
1199
1200class _TestZZZNumberOfObjects(BaseTestCase):
1201 # Because test cases are sorted alphabetically, this one will get
1202 # run after all the other tests for the manager. It tests that
1203 # there have been no "reference leaks" for the manager's shared
1204 # objects. Note the comment in _TestPool.test_terminate().
1205 ALLOWED_TYPES = ('manager',)
1206
1207 def test_number_of_objects(self):
1208 EXPECTED_NUMBER = 1 # the pool object is still alive
1209 multiprocessing.active_children() # discard dead process objs
1210 gc.collect() # do garbage collection
1211 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001212 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001213 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001214 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001215 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001216
1217 self.assertEqual(refs, EXPECTED_NUMBER)
1218
1219#
1220# Test of creating a customized manager class
1221#
1222
1223from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1224
1225class FooBar(object):
1226 def f(self):
1227 return 'f()'
1228 def g(self):
1229 raise ValueError
1230 def _h(self):
1231 return '_h()'
1232
1233def baz():
1234 for i in range(10):
1235 yield i*i
1236
1237class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001238 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001239 def __iter__(self):
1240 return self
1241 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001242 return self._callmethod('__next__')
1243
1244class MyManager(BaseManager):
1245 pass
1246
1247MyManager.register('Foo', callable=FooBar)
1248MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1249MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1250
1251
1252class _TestMyManager(BaseTestCase):
1253
1254 ALLOWED_TYPES = ('manager',)
1255
1256 def test_mymanager(self):
1257 manager = MyManager()
1258 manager.start()
1259
1260 foo = manager.Foo()
1261 bar = manager.Bar()
1262 baz = manager.baz()
1263
1264 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1265 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1266
1267 self.assertEqual(foo_methods, ['f', 'g'])
1268 self.assertEqual(bar_methods, ['f', '_h'])
1269
1270 self.assertEqual(foo.f(), 'f()')
1271 self.assertRaises(ValueError, foo.g)
1272 self.assertEqual(foo._callmethod('f'), 'f()')
1273 self.assertRaises(RemoteError, foo._callmethod, '_h')
1274
1275 self.assertEqual(bar.f(), 'f()')
1276 self.assertEqual(bar._h(), '_h()')
1277 self.assertEqual(bar._callmethod('f'), 'f()')
1278 self.assertEqual(bar._callmethod('_h'), '_h()')
1279
1280 self.assertEqual(list(baz), [i*i for i in range(10)])
1281
1282 manager.shutdown()
1283
1284#
1285# Test of connecting to a remote server and using xmlrpclib for serialization
1286#
1287
1288_queue = pyqueue.Queue()
1289def get_queue():
1290 return _queue
1291
1292class QueueManager(BaseManager):
1293 '''manager class used by server process'''
1294QueueManager.register('get_queue', callable=get_queue)
1295
1296class QueueManager2(BaseManager):
1297 '''manager class which specifies the same interface as QueueManager'''
1298QueueManager2.register('get_queue')
1299
1300
1301SERIALIZER = 'xmlrpclib'
1302
1303class _TestRemoteManager(BaseTestCase):
1304
1305 ALLOWED_TYPES = ('manager',)
1306
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001307 @classmethod
1308 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001309 manager = QueueManager2(
1310 address=address, authkey=authkey, serializer=SERIALIZER
1311 )
1312 manager.connect()
1313 queue = manager.get_queue()
1314 queue.put(('hello world', None, True, 2.25))
1315
1316 def test_remote(self):
1317 authkey = os.urandom(32)
1318
1319 manager = QueueManager(
1320 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1321 )
1322 manager.start()
1323
1324 p = self.Process(target=self._putter, args=(manager.address, authkey))
1325 p.start()
1326
1327 manager2 = QueueManager2(
1328 address=manager.address, authkey=authkey, serializer=SERIALIZER
1329 )
1330 manager2.connect()
1331 queue = manager2.get_queue()
1332
1333 # Note that xmlrpclib will deserialize object as a list not a tuple
1334 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1335
1336 # Because we are using xmlrpclib for serialization instead of
1337 # pickle this will cause a serialization error.
1338 self.assertRaises(Exception, queue.put, time.sleep)
1339
1340 # Make queue finalizer run before the server is stopped
1341 del queue
1342 manager.shutdown()
1343
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001344class _TestManagerRestart(BaseTestCase):
1345
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001346 @classmethod
1347 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001348 manager = QueueManager(
1349 address=address, authkey=authkey, serializer=SERIALIZER)
1350 manager.connect()
1351 queue = manager.get_queue()
1352 queue.put('hello world')
1353
1354 def test_rapid_restart(self):
1355 authkey = os.urandom(32)
1356 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001357 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001358 srvr = manager.get_server()
1359 addr = srvr.address
1360 # Close the connection.Listener socket which gets opened as a part
1361 # of manager.get_server(). It's not needed for the test.
1362 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001363 manager.start()
1364
1365 p = self.Process(target=self._putter, args=(manager.address, authkey))
1366 p.start()
1367 queue = manager.get_queue()
1368 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001369 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001370 manager.shutdown()
1371 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001372 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001373 try:
1374 manager.start()
1375 except IOError as e:
1376 if e.errno != errno.EADDRINUSE:
1377 raise
1378 # Retry after some time, in case the old socket was lingering
1379 # (sporadic failure on buildbots)
1380 time.sleep(1.0)
1381 manager = QueueManager(
1382 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001383 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001384
Benjamin Petersone711caf2008-06-11 16:44:04 +00001385#
1386#
1387#
1388
1389SENTINEL = latin('')
1390
1391class _TestConnection(BaseTestCase):
1392
1393 ALLOWED_TYPES = ('processes', 'threads')
1394
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001395 @classmethod
1396 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001397 for msg in iter(conn.recv_bytes, SENTINEL):
1398 conn.send_bytes(msg)
1399 conn.close()
1400
1401 def test_connection(self):
1402 conn, child_conn = self.Pipe()
1403
1404 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001405 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001406 p.start()
1407
1408 seq = [1, 2.25, None]
1409 msg = latin('hello world')
1410 longmsg = msg * 10
1411 arr = array.array('i', list(range(4)))
1412
1413 if self.TYPE == 'processes':
1414 self.assertEqual(type(conn.fileno()), int)
1415
1416 self.assertEqual(conn.send(seq), None)
1417 self.assertEqual(conn.recv(), seq)
1418
1419 self.assertEqual(conn.send_bytes(msg), None)
1420 self.assertEqual(conn.recv_bytes(), msg)
1421
1422 if self.TYPE == 'processes':
1423 buffer = array.array('i', [0]*10)
1424 expected = list(arr) + [0] * (10 - len(arr))
1425 self.assertEqual(conn.send_bytes(arr), None)
1426 self.assertEqual(conn.recv_bytes_into(buffer),
1427 len(arr) * buffer.itemsize)
1428 self.assertEqual(list(buffer), expected)
1429
1430 buffer = array.array('i', [0]*10)
1431 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1432 self.assertEqual(conn.send_bytes(arr), None)
1433 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1434 len(arr) * buffer.itemsize)
1435 self.assertEqual(list(buffer), expected)
1436
1437 buffer = bytearray(latin(' ' * 40))
1438 self.assertEqual(conn.send_bytes(longmsg), None)
1439 try:
1440 res = conn.recv_bytes_into(buffer)
1441 except multiprocessing.BufferTooShort as e:
1442 self.assertEqual(e.args, (longmsg,))
1443 else:
1444 self.fail('expected BufferTooShort, got %s' % res)
1445
1446 poll = TimingWrapper(conn.poll)
1447
1448 self.assertEqual(poll(), False)
1449 self.assertTimingAlmostEqual(poll.elapsed, 0)
1450
1451 self.assertEqual(poll(TIMEOUT1), False)
1452 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1453
1454 conn.send(None)
1455
1456 self.assertEqual(poll(TIMEOUT1), True)
1457 self.assertTimingAlmostEqual(poll.elapsed, 0)
1458
1459 self.assertEqual(conn.recv(), None)
1460
1461 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1462 conn.send_bytes(really_big_msg)
1463 self.assertEqual(conn.recv_bytes(), really_big_msg)
1464
1465 conn.send_bytes(SENTINEL) # tell child to quit
1466 child_conn.close()
1467
1468 if self.TYPE == 'processes':
1469 self.assertEqual(conn.readable, True)
1470 self.assertEqual(conn.writable, True)
1471 self.assertRaises(EOFError, conn.recv)
1472 self.assertRaises(EOFError, conn.recv_bytes)
1473
1474 p.join()
1475
1476 def test_duplex_false(self):
1477 reader, writer = self.Pipe(duplex=False)
1478 self.assertEqual(writer.send(1), None)
1479 self.assertEqual(reader.recv(), 1)
1480 if self.TYPE == 'processes':
1481 self.assertEqual(reader.readable, True)
1482 self.assertEqual(reader.writable, False)
1483 self.assertEqual(writer.readable, False)
1484 self.assertEqual(writer.writable, True)
1485 self.assertRaises(IOError, reader.send, 2)
1486 self.assertRaises(IOError, writer.recv)
1487 self.assertRaises(IOError, writer.poll)
1488
1489 def test_spawn_close(self):
1490 # We test that a pipe connection can be closed by parent
1491 # process immediately after child is spawned. On Windows this
1492 # would have sometimes failed on old versions because
1493 # child_conn would be closed before the child got a chance to
1494 # duplicate it.
1495 conn, child_conn = self.Pipe()
1496
1497 p = self.Process(target=self._echo, args=(child_conn,))
1498 p.start()
1499 child_conn.close() # this might complete before child initializes
1500
1501 msg = latin('hello')
1502 conn.send_bytes(msg)
1503 self.assertEqual(conn.recv_bytes(), msg)
1504
1505 conn.send_bytes(SENTINEL)
1506 conn.close()
1507 p.join()
1508
1509 def test_sendbytes(self):
1510 if self.TYPE != 'processes':
1511 return
1512
1513 msg = latin('abcdefghijklmnopqrstuvwxyz')
1514 a, b = self.Pipe()
1515
1516 a.send_bytes(msg)
1517 self.assertEqual(b.recv_bytes(), msg)
1518
1519 a.send_bytes(msg, 5)
1520 self.assertEqual(b.recv_bytes(), msg[5:])
1521
1522 a.send_bytes(msg, 7, 8)
1523 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1524
1525 a.send_bytes(msg, 26)
1526 self.assertEqual(b.recv_bytes(), latin(''))
1527
1528 a.send_bytes(msg, 26, 0)
1529 self.assertEqual(b.recv_bytes(), latin(''))
1530
1531 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1532
1533 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1534
1535 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1536
1537 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1538
1539 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1540
Benjamin Petersone711caf2008-06-11 16:44:04 +00001541class _TestListenerClient(BaseTestCase):
1542
1543 ALLOWED_TYPES = ('processes', 'threads')
1544
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001545 @classmethod
1546 def _test(cls, address):
1547 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001548 conn.send('hello')
1549 conn.close()
1550
1551 def test_listener_client(self):
1552 for family in self.connection.families:
1553 l = self.connection.Listener(family=family)
1554 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001555 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001556 p.start()
1557 conn = l.accept()
1558 self.assertEqual(conn.recv(), 'hello')
1559 p.join()
1560 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001561#
1562# Test of sending connection and socket objects between processes
1563#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001564"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001565class _TestPicklingConnections(BaseTestCase):
1566
1567 ALLOWED_TYPES = ('processes',)
1568
1569 def _listener(self, conn, families):
1570 for fam in families:
1571 l = self.connection.Listener(family=fam)
1572 conn.send(l.address)
1573 new_conn = l.accept()
1574 conn.send(new_conn)
1575
1576 if self.TYPE == 'processes':
1577 l = socket.socket()
1578 l.bind(('localhost', 0))
1579 conn.send(l.getsockname())
1580 l.listen(1)
1581 new_conn, addr = l.accept()
1582 conn.send(new_conn)
1583
1584 conn.recv()
1585
1586 def _remote(self, conn):
1587 for (address, msg) in iter(conn.recv, None):
1588 client = self.connection.Client(address)
1589 client.send(msg.upper())
1590 client.close()
1591
1592 if self.TYPE == 'processes':
1593 address, msg = conn.recv()
1594 client = socket.socket()
1595 client.connect(address)
1596 client.sendall(msg.upper())
1597 client.close()
1598
1599 conn.close()
1600
1601 def test_pickling(self):
1602 try:
1603 multiprocessing.allow_connection_pickling()
1604 except ImportError:
1605 return
1606
1607 families = self.connection.families
1608
1609 lconn, lconn0 = self.Pipe()
1610 lp = self.Process(target=self._listener, args=(lconn0, families))
1611 lp.start()
1612 lconn0.close()
1613
1614 rconn, rconn0 = self.Pipe()
1615 rp = self.Process(target=self._remote, args=(rconn0,))
1616 rp.start()
1617 rconn0.close()
1618
1619 for fam in families:
1620 msg = ('This connection uses family %s' % fam).encode('ascii')
1621 address = lconn.recv()
1622 rconn.send((address, msg))
1623 new_conn = lconn.recv()
1624 self.assertEqual(new_conn.recv(), msg.upper())
1625
1626 rconn.send(None)
1627
1628 if self.TYPE == 'processes':
1629 msg = latin('This connection uses a normal socket')
1630 address = lconn.recv()
1631 rconn.send((address, msg))
1632 if hasattr(socket, 'fromfd'):
1633 new_conn = lconn.recv()
1634 self.assertEqual(new_conn.recv(100), msg.upper())
1635 else:
1636 # XXX On Windows with Py2.6 need to backport fromfd()
1637 discard = lconn.recv_bytes()
1638
1639 lconn.send(None)
1640
1641 rconn.close()
1642 lconn.close()
1643
1644 lp.join()
1645 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001646"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001647#
1648#
1649#
1650
1651class _TestHeap(BaseTestCase):
1652
1653 ALLOWED_TYPES = ('processes',)
1654
1655 def test_heap(self):
1656 iterations = 5000
1657 maxblocks = 50
1658 blocks = []
1659
1660 # create and destroy lots of blocks of different sizes
1661 for i in range(iterations):
1662 size = int(random.lognormvariate(0, 1) * 1000)
1663 b = multiprocessing.heap.BufferWrapper(size)
1664 blocks.append(b)
1665 if len(blocks) > maxblocks:
1666 i = random.randrange(maxblocks)
1667 del blocks[i]
1668
1669 # get the heap object
1670 heap = multiprocessing.heap.BufferWrapper._heap
1671
1672 # verify the state of the heap
1673 all = []
1674 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001675 heap._lock.acquire()
1676 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001677 for L in list(heap._len_to_seq.values()):
1678 for arena, start, stop in L:
1679 all.append((heap._arenas.index(arena), start, stop,
1680 stop-start, 'free'))
1681 for arena, start, stop in heap._allocated_blocks:
1682 all.append((heap._arenas.index(arena), start, stop,
1683 stop-start, 'occupied'))
1684 occupied += (stop-start)
1685
1686 all.sort()
1687
1688 for i in range(len(all)-1):
1689 (arena, start, stop) = all[i][:3]
1690 (narena, nstart, nstop) = all[i+1][:3]
1691 self.assertTrue((arena != narena and nstart == 0) or
1692 (stop == nstart))
1693
Charles-François Natali778db492011-07-02 14:35:49 +02001694 def test_free_from_gc(self):
1695 # Check that freeing of blocks by the garbage collector doesn't deadlock
1696 # (issue #12352).
1697 # Make sure the GC is enabled, and set lower collection thresholds to
1698 # make collections more frequent (and increase the probability of
1699 # deadlock).
1700 if not gc.isenabled():
1701 gc.enable()
1702 self.addCleanup(gc.disable)
1703 thresholds = gc.get_threshold()
1704 self.addCleanup(gc.set_threshold, *thresholds)
1705 gc.set_threshold(10)
1706
1707 # perform numerous block allocations, with cyclic references to make
1708 # sure objects are collected asynchronously by the gc
1709 for i in range(5000):
1710 a = multiprocessing.heap.BufferWrapper(1)
1711 b = multiprocessing.heap.BufferWrapper(1)
1712 # circular references
1713 a.buddy = b
1714 b.buddy = a
1715
Benjamin Petersone711caf2008-06-11 16:44:04 +00001716#
1717#
1718#
1719
Benjamin Petersone711caf2008-06-11 16:44:04 +00001720class _Foo(Structure):
1721 _fields_ = [
1722 ('x', c_int),
1723 ('y', c_double)
1724 ]
1725
1726class _TestSharedCTypes(BaseTestCase):
1727
1728 ALLOWED_TYPES = ('processes',)
1729
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001730 def setUp(self):
1731 if not HAS_SHAREDCTYPES:
1732 self.skipTest("requires multiprocessing.sharedctypes")
1733
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001734 @classmethod
1735 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001736 x.value *= 2
1737 y.value *= 2
1738 foo.x *= 2
1739 foo.y *= 2
1740 string.value *= 2
1741 for i in range(len(arr)):
1742 arr[i] *= 2
1743
1744 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001745 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001746 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001747 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001748 arr = self.Array('d', list(range(10)), lock=lock)
1749 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001750 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001751
1752 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1753 p.start()
1754 p.join()
1755
1756 self.assertEqual(x.value, 14)
1757 self.assertAlmostEqual(y.value, 2.0/3.0)
1758 self.assertEqual(foo.x, 6)
1759 self.assertAlmostEqual(foo.y, 4.0)
1760 for i in range(10):
1761 self.assertAlmostEqual(arr[i], i*2)
1762 self.assertEqual(string.value, latin('hellohello'))
1763
1764 def test_synchronize(self):
1765 self.test_sharedctypes(lock=True)
1766
1767 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001768 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001769 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001770 foo.x = 0
1771 foo.y = 0
1772 self.assertEqual(bar.x, 2)
1773 self.assertAlmostEqual(bar.y, 5.0)
1774
1775#
1776#
1777#
1778
1779class _TestFinalize(BaseTestCase):
1780
1781 ALLOWED_TYPES = ('processes',)
1782
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001783 @classmethod
1784 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001785 class Foo(object):
1786 pass
1787
1788 a = Foo()
1789 util.Finalize(a, conn.send, args=('a',))
1790 del a # triggers callback for a
1791
1792 b = Foo()
1793 close_b = util.Finalize(b, conn.send, args=('b',))
1794 close_b() # triggers callback for b
1795 close_b() # does nothing because callback has already been called
1796 del b # does nothing because callback has already been called
1797
1798 c = Foo()
1799 util.Finalize(c, conn.send, args=('c',))
1800
1801 d10 = Foo()
1802 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1803
1804 d01 = Foo()
1805 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1806 d02 = Foo()
1807 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1808 d03 = Foo()
1809 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1810
1811 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1812
1813 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1814
Ezio Melotti13925002011-03-16 11:05:33 +02001815 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001816 # garbage collecting locals
1817 util._exit_function()
1818 conn.close()
1819 os._exit(0)
1820
1821 def test_finalize(self):
1822 conn, child_conn = self.Pipe()
1823
1824 p = self.Process(target=self._test_finalize, args=(child_conn,))
1825 p.start()
1826 p.join()
1827
1828 result = [obj for obj in iter(conn.recv, 'STOP')]
1829 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1830
1831#
1832# Test that from ... import * works for each module
1833#
1834
1835class _TestImportStar(BaseTestCase):
1836
1837 ALLOWED_TYPES = ('processes',)
1838
1839 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001840 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001841 'multiprocessing', 'multiprocessing.connection',
1842 'multiprocessing.heap', 'multiprocessing.managers',
1843 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001844 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001845 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001846 ]
1847
1848 if c_int is not None:
1849 # This module requires _ctypes
1850 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001851
1852 for name in modules:
1853 __import__(name)
1854 mod = sys.modules[name]
1855
1856 for attr in getattr(mod, '__all__', ()):
1857 self.assertTrue(
1858 hasattr(mod, attr),
1859 '%r does not have attribute %r' % (mod, attr)
1860 )
1861
1862#
1863# Quick test that logging works -- does not test logging output
1864#
1865
1866class _TestLogging(BaseTestCase):
1867
1868 ALLOWED_TYPES = ('processes',)
1869
1870 def test_enable_logging(self):
1871 logger = multiprocessing.get_logger()
1872 logger.setLevel(util.SUBWARNING)
1873 self.assertTrue(logger is not None)
1874 logger.debug('this will not be printed')
1875 logger.info('nor will this')
1876 logger.setLevel(LOG_LEVEL)
1877
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001878 @classmethod
1879 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001880 logger = multiprocessing.get_logger()
1881 conn.send(logger.getEffectiveLevel())
1882
1883 def test_level(self):
1884 LEVEL1 = 32
1885 LEVEL2 = 37
1886
1887 logger = multiprocessing.get_logger()
1888 root_logger = logging.getLogger()
1889 root_level = root_logger.level
1890
1891 reader, writer = multiprocessing.Pipe(duplex=False)
1892
1893 logger.setLevel(LEVEL1)
1894 self.Process(target=self._test_level, args=(writer,)).start()
1895 self.assertEqual(LEVEL1, reader.recv())
1896
1897 logger.setLevel(logging.NOTSET)
1898 root_logger.setLevel(LEVEL2)
1899 self.Process(target=self._test_level, args=(writer,)).start()
1900 self.assertEqual(LEVEL2, reader.recv())
1901
1902 root_logger.setLevel(root_level)
1903 logger.setLevel(level=LOG_LEVEL)
1904
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001905
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001906# class _TestLoggingProcessName(BaseTestCase):
1907#
1908# def handle(self, record):
1909# assert record.processName == multiprocessing.current_process().name
1910# self.__handled = True
1911#
1912# def test_logging(self):
1913# handler = logging.Handler()
1914# handler.handle = self.handle
1915# self.__handled = False
1916# # Bypass getLogger() and side-effects
1917# logger = logging.getLoggerClass()(
1918# 'multiprocessing.test.TestLoggingProcessName')
1919# logger.addHandler(handler)
1920# logger.propagate = False
1921#
1922# logger.warn('foo')
1923# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001924
Benjamin Petersone711caf2008-06-11 16:44:04 +00001925#
Jesse Noller6214edd2009-01-19 16:23:53 +00001926# Test to verify handle verification, see issue 3321
1927#
1928
1929class TestInvalidHandle(unittest.TestCase):
1930
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001931 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001932 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001933 conn = _multiprocessing.Connection(44977608)
1934 self.assertRaises(IOError, conn.poll)
1935 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001936
Jesse Noller6214edd2009-01-19 16:23:53 +00001937#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001938# Functions used to create test cases from the base ones in this module
1939#
1940
1941def get_attributes(Source, names):
1942 d = {}
1943 for name in names:
1944 obj = getattr(Source, name)
1945 if type(obj) == type(get_attributes):
1946 obj = staticmethod(obj)
1947 d[name] = obj
1948 return d
1949
1950def create_test_cases(Mixin, type):
1951 result = {}
1952 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001953 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001954
1955 for name in list(glob.keys()):
1956 if name.startswith('_Test'):
1957 base = glob[name]
1958 if type in base.ALLOWED_TYPES:
1959 newname = 'With' + Type + name[1:]
1960 class Temp(base, unittest.TestCase, Mixin):
1961 pass
1962 result[newname] = Temp
1963 Temp.__name__ = newname
1964 Temp.__module__ = Mixin.__module__
1965 return result
1966
1967#
1968# Create test cases
1969#
1970
1971class ProcessesMixin(object):
1972 TYPE = 'processes'
1973 Process = multiprocessing.Process
1974 locals().update(get_attributes(multiprocessing, (
1975 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1976 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1977 'RawArray', 'current_process', 'active_children', 'Pipe',
1978 'connection', 'JoinableQueue'
1979 )))
1980
1981testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1982globals().update(testcases_processes)
1983
1984
1985class ManagerMixin(object):
1986 TYPE = 'manager'
1987 Process = multiprocessing.Process
1988 manager = object.__new__(multiprocessing.managers.SyncManager)
1989 locals().update(get_attributes(manager, (
1990 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1991 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1992 'Namespace', 'JoinableQueue'
1993 )))
1994
1995testcases_manager = create_test_cases(ManagerMixin, type='manager')
1996globals().update(testcases_manager)
1997
1998
1999class ThreadsMixin(object):
2000 TYPE = 'threads'
2001 Process = multiprocessing.dummy.Process
2002 locals().update(get_attributes(multiprocessing.dummy, (
2003 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2004 'Condition', 'Event', 'Value', 'Array', 'current_process',
2005 'active_children', 'Pipe', 'connection', 'dict', 'list',
2006 'Namespace', 'JoinableQueue'
2007 )))
2008
2009testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2010globals().update(testcases_threads)
2011
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002012class OtherTest(unittest.TestCase):
2013 # TODO: add more tests for deliver/answer challenge.
2014 def test_deliver_challenge_auth_failure(self):
2015 class _FakeConnection(object):
2016 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002017 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002018 def send_bytes(self, data):
2019 pass
2020 self.assertRaises(multiprocessing.AuthenticationError,
2021 multiprocessing.connection.deliver_challenge,
2022 _FakeConnection(), b'abc')
2023
2024 def test_answer_challenge_auth_failure(self):
2025 class _FakeConnection(object):
2026 def __init__(self):
2027 self.count = 0
2028 def recv_bytes(self, size):
2029 self.count += 1
2030 if self.count == 1:
2031 return multiprocessing.connection.CHALLENGE
2032 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002033 return b'something bogus'
2034 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002035 def send_bytes(self, data):
2036 pass
2037 self.assertRaises(multiprocessing.AuthenticationError,
2038 multiprocessing.connection.answer_challenge,
2039 _FakeConnection(), b'abc')
2040
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002041#
2042# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2043#
2044
2045def initializer(ns):
2046 ns.test += 1
2047
2048class TestInitializers(unittest.TestCase):
2049 def setUp(self):
2050 self.mgr = multiprocessing.Manager()
2051 self.ns = self.mgr.Namespace()
2052 self.ns.test = 0
2053
2054 def tearDown(self):
2055 self.mgr.shutdown()
2056
2057 def test_manager_initializer(self):
2058 m = multiprocessing.managers.SyncManager()
2059 self.assertRaises(TypeError, m.start, 1)
2060 m.start(initializer, (self.ns,))
2061 self.assertEqual(self.ns.test, 1)
2062 m.shutdown()
2063
2064 def test_pool_initializer(self):
2065 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2066 p = multiprocessing.Pool(1, initializer, (self.ns,))
2067 p.close()
2068 p.join()
2069 self.assertEqual(self.ns.test, 1)
2070
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002071#
2072# Issue 5155, 5313, 5331: Test process in processes
2073# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2074#
2075
2076def _ThisSubProcess(q):
2077 try:
2078 item = q.get(block=False)
2079 except pyqueue.Empty:
2080 pass
2081
2082def _TestProcess(q):
2083 queue = multiprocessing.Queue()
2084 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2085 subProc.start()
2086 subProc.join()
2087
2088def _afunc(x):
2089 return x*x
2090
2091def pool_in_process():
2092 pool = multiprocessing.Pool(processes=4)
2093 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2094
2095class _file_like(object):
2096 def __init__(self, delegate):
2097 self._delegate = delegate
2098 self._pid = None
2099
2100 @property
2101 def cache(self):
2102 pid = os.getpid()
2103 # There are no race conditions since fork keeps only the running thread
2104 if pid != self._pid:
2105 self._pid = pid
2106 self._cache = []
2107 return self._cache
2108
2109 def write(self, data):
2110 self.cache.append(data)
2111
2112 def flush(self):
2113 self._delegate.write(''.join(self.cache))
2114 self._cache = []
2115
2116class TestStdinBadfiledescriptor(unittest.TestCase):
2117
2118 def test_queue_in_process(self):
2119 queue = multiprocessing.Queue()
2120 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2121 proc.start()
2122 proc.join()
2123
2124 def test_pool_in_process(self):
2125 p = multiprocessing.Process(target=pool_in_process)
2126 p.start()
2127 p.join()
2128
2129 def test_flushing(self):
2130 sio = io.StringIO()
2131 flike = _file_like(sio)
2132 flike.write('foo')
2133 proc = multiprocessing.Process(target=lambda: flike.flush())
2134 flike.flush()
2135 assert sio.getvalue() == 'foo'
2136
2137testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2138 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002139
Benjamin Petersone711caf2008-06-11 16:44:04 +00002140#
2141#
2142#
2143
2144def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002145 if sys.platform.startswith("linux"):
2146 try:
2147 lock = multiprocessing.RLock()
2148 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002149 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002150
Benjamin Petersone711caf2008-06-11 16:44:04 +00002151 if run is None:
2152 from test.support import run_unittest as run
2153
2154 util.get_temp_dir() # creates temp directory for use by all processes
2155
2156 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2157
Benjamin Peterson41181742008-07-02 20:22:54 +00002158 ProcessesMixin.pool = multiprocessing.Pool(4)
2159 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2160 ManagerMixin.manager.__init__()
2161 ManagerMixin.manager.start()
2162 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002163
2164 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002165 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2166 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002167 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2168 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002169 )
2170
2171 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2172 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2173 run(suite)
2174
Benjamin Peterson41181742008-07-02 20:22:54 +00002175 ThreadsMixin.pool.terminate()
2176 ProcessesMixin.pool.terminate()
2177 ManagerMixin.pool.terminate()
2178 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002179
Benjamin Peterson41181742008-07-02 20:22:54 +00002180 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002181
2182def main():
2183 test_main(unittest.TextTestRunner(verbosity=2).run)
2184
2185if __name__ == '__main__':
2186 main()