blob: 768119c4c87ff6bfb6930f08d58edc0530b58bd0 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47#
48#
49
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000050def latin(s):
51 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
Benjamin Petersone711caf2008-06-11 16:44:04 +000053#
54# Constants
55#
56
57LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000058#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60DELTA = 0.1
61CHECK_TIMINGS = False # making true makes tests take a lot longer
62 # and can sometimes cause some non-serious
63 # failures because some calls block a bit
64 # longer than expected
65if CHECK_TIMINGS:
66 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
67else:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
69
70HAVE_GETVALUE = not getattr(_multiprocessing,
71 'HAVE_BROKEN_SEM_GETVALUE', False)
72
Jesse Noller6214edd2009-01-19 16:23:53 +000073WIN32 = (sys.platform == "win32")
74
Benjamin Petersone711caf2008-06-11 16:44:04 +000075#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000076# Some tests require ctypes
77#
78
79try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000080 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000081except ImportError:
82 Structure = object
83 c_int = c_double = None
84
85#
Benjamin Petersone711caf2008-06-11 16:44:04 +000086# Creates a wrapper for a function which records the time it takes to finish
87#
88
89class TimingWrapper(object):
90
91 def __init__(self, func):
92 self.func = func
93 self.elapsed = None
94
95 def __call__(self, *args, **kwds):
96 t = time.time()
97 try:
98 return self.func(*args, **kwds)
99 finally:
100 self.elapsed = time.time() - t
101
102#
103# Base class for test cases
104#
105
106class BaseTestCase(object):
107
108 ALLOWED_TYPES = ('processes', 'manager', 'threads')
109
110 def assertTimingAlmostEqual(self, a, b):
111 if CHECK_TIMINGS:
112 self.assertAlmostEqual(a, b, 1)
113
114 def assertReturnsIfImplemented(self, value, func, *args):
115 try:
116 res = func(*args)
117 except NotImplementedError:
118 pass
119 else:
120 return self.assertEqual(value, res)
121
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000122 # For the sanity of Windows users, rather than crashing or freezing in
123 # multiple ways.
124 def __reduce__(self, *args):
125 raise NotImplementedError("shouldn't try to pickle a test case")
126
127 __reduce_ex__ = __reduce__
128
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129#
130# Return the value of a semaphore
131#
132
133def get_value(self):
134 try:
135 return self.get_value()
136 except AttributeError:
137 try:
138 return self._Semaphore__value
139 except AttributeError:
140 try:
141 return self._value
142 except AttributeError:
143 raise NotImplementedError
144
145#
146# Testcases
147#
148
149class _TestProcess(BaseTestCase):
150
151 ALLOWED_TYPES = ('processes', 'threads')
152
153 def test_current(self):
154 if self.TYPE == 'threads':
155 return
156
157 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159
160 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000161 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000162 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEqual(current.ident, os.getpid())
165 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000167 @classmethod
168 def _test(cls, q, *args, **kwds):
169 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 q.put(args)
171 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000172 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000173 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 q.put(current.pid)
176
177 def test_process(self):
178 q = self.Queue(1)
179 e = self.Event()
180 args = (q, 1, 2)
181 kwargs = {'hello':23, 'bye':2.54}
182 name = 'SomeProcess'
183 p = self.Process(
184 target=self._test, args=args, kwargs=kwargs, name=name
185 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 current = self.current_process()
188
189 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(p.authkey, current.authkey)
191 self.assertEqual(p.is_alive(), False)
192 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000193 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196
197 p.start()
198
Ezio Melottib3aedd42010-11-20 19:04:17 +0000199 self.assertEqual(p.exitcode, None)
200 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000201 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
Ezio Melottib3aedd42010-11-20 19:04:17 +0000203 self.assertEqual(q.get(), args[1:])
204 self.assertEqual(q.get(), kwargs)
205 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000207 self.assertEqual(q.get(), current.authkey)
208 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 p.join()
211
Ezio Melottib3aedd42010-11-20 19:04:17 +0000212 self.assertEqual(p.exitcode, 0)
213 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000214 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000216 @classmethod
217 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 time.sleep(1000)
219
220 def test_terminate(self):
221 if self.TYPE == 'threads':
222 return
223
224 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 p.start()
227
228 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000229 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
232 p.terminate()
233
234 join = TimingWrapper(p.join)
235 self.assertEqual(join(), None)
236 self.assertTimingAlmostEqual(join.elapsed, 0.0)
237
238 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000239 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240
241 p.join()
242
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000243 # XXX sometimes get p.exitcode == 0 on Windows ...
244 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245
246 def test_cpu_count(self):
247 try:
248 cpus = multiprocessing.cpu_count()
249 except NotImplementedError:
250 cpus = 1
251 self.assertTrue(type(cpus) is int)
252 self.assertTrue(cpus >= 1)
253
254 def test_active_children(self):
255 self.assertEqual(type(self.active_children()), list)
256
257 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259
260 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000261 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262
263 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000264 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000266 @classmethod
267 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 from multiprocessing import forking
269 wconn.send(id)
270 if len(id) < 2:
271 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000272 p = cls.Process(
273 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274 )
275 p.start()
276 p.join()
277
278 def test_recursion(self):
279 rconn, wconn = self.Pipe(duplex=False)
280 self._test_recursion(wconn, [])
281
282 time.sleep(DELTA)
283 result = []
284 while rconn.poll():
285 result.append(rconn.recv())
286
287 expected = [
288 [],
289 [0],
290 [0, 0],
291 [0, 1],
292 [1],
293 [1, 0],
294 [1, 1]
295 ]
296 self.assertEqual(result, expected)
297
298#
299#
300#
301
302class _UpperCaser(multiprocessing.Process):
303
304 def __init__(self):
305 multiprocessing.Process.__init__(self)
306 self.child_conn, self.parent_conn = multiprocessing.Pipe()
307
308 def run(self):
309 self.parent_conn.close()
310 for s in iter(self.child_conn.recv, None):
311 self.child_conn.send(s.upper())
312 self.child_conn.close()
313
314 def submit(self, s):
315 assert type(s) is str
316 self.parent_conn.send(s)
317 return self.parent_conn.recv()
318
319 def stop(self):
320 self.parent_conn.send(None)
321 self.parent_conn.close()
322 self.child_conn.close()
323
324class _TestSubclassingProcess(BaseTestCase):
325
326 ALLOWED_TYPES = ('processes',)
327
328 def test_subclassing(self):
329 uppercaser = _UpperCaser()
330 uppercaser.start()
331 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
332 self.assertEqual(uppercaser.submit('world'), 'WORLD')
333 uppercaser.stop()
334 uppercaser.join()
335
336#
337#
338#
339
340def queue_empty(q):
341 if hasattr(q, 'empty'):
342 return q.empty()
343 else:
344 return q.qsize() == 0
345
346def queue_full(q, maxsize):
347 if hasattr(q, 'full'):
348 return q.full()
349 else:
350 return q.qsize() == maxsize
351
352
353class _TestQueue(BaseTestCase):
354
355
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000356 @classmethod
357 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000358 child_can_start.wait()
359 for i in range(6):
360 queue.get()
361 parent_can_continue.set()
362
363 def test_put(self):
364 MAXSIZE = 6
365 queue = self.Queue(maxsize=MAXSIZE)
366 child_can_start = self.Event()
367 parent_can_continue = self.Event()
368
369 proc = self.Process(
370 target=self._test_put,
371 args=(queue, child_can_start, parent_can_continue)
372 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000373 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000374 proc.start()
375
376 self.assertEqual(queue_empty(queue), True)
377 self.assertEqual(queue_full(queue, MAXSIZE), False)
378
379 queue.put(1)
380 queue.put(2, True)
381 queue.put(3, True, None)
382 queue.put(4, False)
383 queue.put(5, False, None)
384 queue.put_nowait(6)
385
386 # the values may be in buffer but not yet in pipe so sleep a bit
387 time.sleep(DELTA)
388
389 self.assertEqual(queue_empty(queue), False)
390 self.assertEqual(queue_full(queue, MAXSIZE), True)
391
392 put = TimingWrapper(queue.put)
393 put_nowait = TimingWrapper(queue.put_nowait)
394
395 self.assertRaises(pyqueue.Full, put, 7, False)
396 self.assertTimingAlmostEqual(put.elapsed, 0)
397
398 self.assertRaises(pyqueue.Full, put, 7, False, None)
399 self.assertTimingAlmostEqual(put.elapsed, 0)
400
401 self.assertRaises(pyqueue.Full, put_nowait, 7)
402 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
403
404 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
405 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
406
407 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
408 self.assertTimingAlmostEqual(put.elapsed, 0)
409
410 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
411 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 self.assertEqual(queue_empty(queue), True)
417 self.assertEqual(queue_full(queue, MAXSIZE), False)
418
419 proc.join()
420
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000421 @classmethod
422 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000424 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 queue.put(2)
426 queue.put(3)
427 queue.put(4)
428 queue.put(5)
429 parent_can_continue.set()
430
431 def test_get(self):
432 queue = self.Queue()
433 child_can_start = self.Event()
434 parent_can_continue = self.Event()
435
436 proc = self.Process(
437 target=self._test_get,
438 args=(queue, child_can_start, parent_can_continue)
439 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000440 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 proc.start()
442
443 self.assertEqual(queue_empty(queue), True)
444
445 child_can_start.set()
446 parent_can_continue.wait()
447
448 time.sleep(DELTA)
449 self.assertEqual(queue_empty(queue), False)
450
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000451 # Hangs unexpectedly, remove for now
452 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 self.assertEqual(queue.get(True, None), 2)
454 self.assertEqual(queue.get(True), 3)
455 self.assertEqual(queue.get(timeout=1), 4)
456 self.assertEqual(queue.get_nowait(), 5)
457
458 self.assertEqual(queue_empty(queue), True)
459
460 get = TimingWrapper(queue.get)
461 get_nowait = TimingWrapper(queue.get_nowait)
462
463 self.assertRaises(pyqueue.Empty, get, False)
464 self.assertTimingAlmostEqual(get.elapsed, 0)
465
466 self.assertRaises(pyqueue.Empty, get, False, None)
467 self.assertTimingAlmostEqual(get.elapsed, 0)
468
469 self.assertRaises(pyqueue.Empty, get_nowait)
470 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
471
472 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
473 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
474
475 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
476 self.assertTimingAlmostEqual(get.elapsed, 0)
477
478 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
479 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
480
481 proc.join()
482
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000483 @classmethod
484 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485 for i in range(10, 20):
486 queue.put(i)
487 # note that at this point the items may only be buffered, so the
488 # process cannot shutdown until the feeder thread has finished
489 # pushing items onto the pipe.
490
491 def test_fork(self):
492 # Old versions of Queue would fail to create a new feeder
493 # thread for a forked process if the original process had its
494 # own feeder thread. This test checks that this no longer
495 # happens.
496
497 queue = self.Queue()
498
499 # put items on queue so that main process starts a feeder thread
500 for i in range(10):
501 queue.put(i)
502
503 # wait to make sure thread starts before we fork a new process
504 time.sleep(DELTA)
505
506 # fork process
507 p = self.Process(target=self._test_fork, args=(queue,))
508 p.start()
509
510 # check that all expected items are in the queue
511 for i in range(20):
512 self.assertEqual(queue.get(), i)
513 self.assertRaises(pyqueue.Empty, queue.get, False)
514
515 p.join()
516
517 def test_qsize(self):
518 q = self.Queue()
519 try:
520 self.assertEqual(q.qsize(), 0)
521 except NotImplementedError:
522 return
523 q.put(1)
524 self.assertEqual(q.qsize(), 1)
525 q.put(5)
526 self.assertEqual(q.qsize(), 2)
527 q.get()
528 self.assertEqual(q.qsize(), 1)
529 q.get()
530 self.assertEqual(q.qsize(), 0)
531
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000532 @classmethod
533 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000534 for obj in iter(q.get, None):
535 time.sleep(DELTA)
536 q.task_done()
537
538 def test_task_done(self):
539 queue = self.JoinableQueue()
540
541 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000542 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000543
544 workers = [self.Process(target=self._test_task_done, args=(queue,))
545 for i in range(4)]
546
547 for p in workers:
548 p.start()
549
550 for i in range(10):
551 queue.put(i)
552
553 queue.join()
554
555 for p in workers:
556 queue.put(None)
557
558 for p in workers:
559 p.join()
560
561#
562#
563#
564
565class _TestLock(BaseTestCase):
566
567 def test_lock(self):
568 lock = self.Lock()
569 self.assertEqual(lock.acquire(), True)
570 self.assertEqual(lock.acquire(False), False)
571 self.assertEqual(lock.release(), None)
572 self.assertRaises((ValueError, threading.ThreadError), lock.release)
573
574 def test_rlock(self):
575 lock = self.RLock()
576 self.assertEqual(lock.acquire(), True)
577 self.assertEqual(lock.acquire(), True)
578 self.assertEqual(lock.acquire(), True)
579 self.assertEqual(lock.release(), None)
580 self.assertEqual(lock.release(), None)
581 self.assertEqual(lock.release(), None)
582 self.assertRaises((AssertionError, RuntimeError), lock.release)
583
Jesse Nollerf8d00852009-03-31 03:25:07 +0000584 def test_lock_context(self):
585 with self.Lock():
586 pass
587
Benjamin Petersone711caf2008-06-11 16:44:04 +0000588
589class _TestSemaphore(BaseTestCase):
590
591 def _test_semaphore(self, sem):
592 self.assertReturnsIfImplemented(2, get_value, sem)
593 self.assertEqual(sem.acquire(), True)
594 self.assertReturnsIfImplemented(1, get_value, sem)
595 self.assertEqual(sem.acquire(), True)
596 self.assertReturnsIfImplemented(0, get_value, sem)
597 self.assertEqual(sem.acquire(False), False)
598 self.assertReturnsIfImplemented(0, get_value, sem)
599 self.assertEqual(sem.release(), None)
600 self.assertReturnsIfImplemented(1, get_value, sem)
601 self.assertEqual(sem.release(), None)
602 self.assertReturnsIfImplemented(2, get_value, sem)
603
604 def test_semaphore(self):
605 sem = self.Semaphore(2)
606 self._test_semaphore(sem)
607 self.assertEqual(sem.release(), None)
608 self.assertReturnsIfImplemented(3, get_value, sem)
609 self.assertEqual(sem.release(), None)
610 self.assertReturnsIfImplemented(4, get_value, sem)
611
612 def test_bounded_semaphore(self):
613 sem = self.BoundedSemaphore(2)
614 self._test_semaphore(sem)
615 # Currently fails on OS/X
616 #if HAVE_GETVALUE:
617 # self.assertRaises(ValueError, sem.release)
618 # self.assertReturnsIfImplemented(2, get_value, sem)
619
620 def test_timeout(self):
621 if self.TYPE != 'processes':
622 return
623
624 sem = self.Semaphore(0)
625 acquire = TimingWrapper(sem.acquire)
626
627 self.assertEqual(acquire(False), False)
628 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
629
630 self.assertEqual(acquire(False, None), False)
631 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
632
633 self.assertEqual(acquire(False, TIMEOUT1), False)
634 self.assertTimingAlmostEqual(acquire.elapsed, 0)
635
636 self.assertEqual(acquire(True, TIMEOUT2), False)
637 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
638
639 self.assertEqual(acquire(timeout=TIMEOUT3), False)
640 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
641
642
643class _TestCondition(BaseTestCase):
644
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000645 @classmethod
646 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647 cond.acquire()
648 sleeping.release()
649 cond.wait(timeout)
650 woken.release()
651 cond.release()
652
653 def check_invariant(self, cond):
654 # this is only supposed to succeed when there are no sleepers
655 if self.TYPE == 'processes':
656 try:
657 sleepers = (cond._sleeping_count.get_value() -
658 cond._woken_count.get_value())
659 self.assertEqual(sleepers, 0)
660 self.assertEqual(cond._wait_semaphore.get_value(), 0)
661 except NotImplementedError:
662 pass
663
664 def test_notify(self):
665 cond = self.Condition()
666 sleeping = self.Semaphore(0)
667 woken = self.Semaphore(0)
668
669 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000670 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 p.start()
672
673 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000674 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000675 p.start()
676
677 # wait for both children to start sleeping
678 sleeping.acquire()
679 sleeping.acquire()
680
681 # check no process/thread has woken up
682 time.sleep(DELTA)
683 self.assertReturnsIfImplemented(0, get_value, woken)
684
685 # wake up one process/thread
686 cond.acquire()
687 cond.notify()
688 cond.release()
689
690 # check one process/thread has woken up
691 time.sleep(DELTA)
692 self.assertReturnsIfImplemented(1, get_value, woken)
693
694 # wake up another
695 cond.acquire()
696 cond.notify()
697 cond.release()
698
699 # check other has woken up
700 time.sleep(DELTA)
701 self.assertReturnsIfImplemented(2, get_value, woken)
702
703 # check state is not mucked up
704 self.check_invariant(cond)
705 p.join()
706
707 def test_notify_all(self):
708 cond = self.Condition()
709 sleeping = self.Semaphore(0)
710 woken = self.Semaphore(0)
711
712 # start some threads/processes which will timeout
713 for i in range(3):
714 p = self.Process(target=self.f,
715 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000716 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000717 p.start()
718
719 t = threading.Thread(target=self.f,
720 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000721 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000722 t.start()
723
724 # wait for them all to sleep
725 for i in range(6):
726 sleeping.acquire()
727
728 # check they have all timed out
729 for i in range(6):
730 woken.acquire()
731 self.assertReturnsIfImplemented(0, get_value, woken)
732
733 # check state is not mucked up
734 self.check_invariant(cond)
735
736 # start some more threads/processes
737 for i in range(3):
738 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000739 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 p.start()
741
742 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000743 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 t.start()
745
746 # wait for them to all sleep
747 for i in range(6):
748 sleeping.acquire()
749
750 # check no process/thread has woken up
751 time.sleep(DELTA)
752 self.assertReturnsIfImplemented(0, get_value, woken)
753
754 # wake them all up
755 cond.acquire()
756 cond.notify_all()
757 cond.release()
758
759 # check they have all woken
760 time.sleep(DELTA)
761 self.assertReturnsIfImplemented(6, get_value, woken)
762
763 # check state is not mucked up
764 self.check_invariant(cond)
765
766 def test_timeout(self):
767 cond = self.Condition()
768 wait = TimingWrapper(cond.wait)
769 cond.acquire()
770 res = wait(TIMEOUT1)
771 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000772 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000773 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
774
775
776class _TestEvent(BaseTestCase):
777
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000778 @classmethod
779 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 time.sleep(TIMEOUT2)
781 event.set()
782
783 def test_event(self):
784 event = self.Event()
785 wait = TimingWrapper(event.wait)
786
Ezio Melotti13925002011-03-16 11:05:33 +0200787 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000788 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000789 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000790
Benjamin Peterson965ce872009-04-05 21:24:58 +0000791 # Removed, threading.Event.wait() will return the value of the __flag
792 # instead of None. API Shear with the semaphore backed mp.Event
793 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000794 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000795 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000796 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
797
798 event.set()
799
800 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000801 self.assertEqual(event.is_set(), True)
802 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000803 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000804 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000805 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
806 # self.assertEqual(event.is_set(), True)
807
808 event.clear()
809
810 #self.assertEqual(event.is_set(), False)
811
812 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000813 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814
815#
816#
817#
818
819class _TestValue(BaseTestCase):
820
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000821 ALLOWED_TYPES = ('processes',)
822
Benjamin Petersone711caf2008-06-11 16:44:04 +0000823 codes_values = [
824 ('i', 4343, 24234),
825 ('d', 3.625, -4.25),
826 ('h', -232, 234),
827 ('c', latin('x'), latin('y'))
828 ]
829
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000830 def setUp(self):
831 if not HAS_SHAREDCTYPES:
832 self.skipTest("requires multiprocessing.sharedctypes")
833
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000834 @classmethod
835 def _test(cls, values):
836 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000837 sv.value = cv[2]
838
839
840 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000841 if raw:
842 values = [self.RawValue(code, value)
843 for code, value, _ in self.codes_values]
844 else:
845 values = [self.Value(code, value)
846 for code, value, _ in self.codes_values]
847
848 for sv, cv in zip(values, self.codes_values):
849 self.assertEqual(sv.value, cv[1])
850
851 proc = self.Process(target=self._test, args=(values,))
852 proc.start()
853 proc.join()
854
855 for sv, cv in zip(values, self.codes_values):
856 self.assertEqual(sv.value, cv[2])
857
858 def test_rawvalue(self):
859 self.test_value(raw=True)
860
861 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000862 val1 = self.Value('i', 5)
863 lock1 = val1.get_lock()
864 obj1 = val1.get_obj()
865
866 val2 = self.Value('i', 5, lock=None)
867 lock2 = val2.get_lock()
868 obj2 = val2.get_obj()
869
870 lock = self.Lock()
871 val3 = self.Value('i', 5, lock=lock)
872 lock3 = val3.get_lock()
873 obj3 = val3.get_obj()
874 self.assertEqual(lock, lock3)
875
Jesse Nollerb0516a62009-01-18 03:11:38 +0000876 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000877 self.assertFalse(hasattr(arr4, 'get_lock'))
878 self.assertFalse(hasattr(arr4, 'get_obj'))
879
Jesse Nollerb0516a62009-01-18 03:11:38 +0000880 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
881
882 arr5 = self.RawValue('i', 5)
883 self.assertFalse(hasattr(arr5, 'get_lock'))
884 self.assertFalse(hasattr(arr5, 'get_obj'))
885
Benjamin Petersone711caf2008-06-11 16:44:04 +0000886
887class _TestArray(BaseTestCase):
888
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000889 ALLOWED_TYPES = ('processes',)
890
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000891 @classmethod
892 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000893 for i in range(1, len(seq)):
894 seq[i] += seq[i-1]
895
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000896 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
899 if raw:
900 arr = self.RawArray('i', seq)
901 else:
902 arr = self.Array('i', seq)
903
904 self.assertEqual(len(arr), len(seq))
905 self.assertEqual(arr[3], seq[3])
906 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
907
908 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
909
910 self.assertEqual(list(arr[:]), seq)
911
912 self.f(seq)
913
914 p = self.Process(target=self.f, args=(arr,))
915 p.start()
916 p.join()
917
918 self.assertEqual(list(arr[:]), seq)
919
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000920 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000921 def test_array_from_size(self):
922 size = 10
923 # Test for zeroing (see issue #11675).
924 # The repetition below strengthens the test by increasing the chances
925 # of previously allocated non-zero memory being used for the new array
926 # on the 2nd and 3rd loops.
927 for _ in range(3):
928 arr = self.Array('i', size)
929 self.assertEqual(len(arr), size)
930 self.assertEqual(list(arr), [0] * size)
931 arr[:] = range(10)
932 self.assertEqual(list(arr), list(range(10)))
933 del arr
934
935 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000936 def test_rawarray(self):
937 self.test_array(raw=True)
938
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000939 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000941 arr1 = self.Array('i', list(range(10)))
942 lock1 = arr1.get_lock()
943 obj1 = arr1.get_obj()
944
945 arr2 = self.Array('i', list(range(10)), lock=None)
946 lock2 = arr2.get_lock()
947 obj2 = arr2.get_obj()
948
949 lock = self.Lock()
950 arr3 = self.Array('i', list(range(10)), lock=lock)
951 lock3 = arr3.get_lock()
952 obj3 = arr3.get_obj()
953 self.assertEqual(lock, lock3)
954
Jesse Nollerb0516a62009-01-18 03:11:38 +0000955 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000956 self.assertFalse(hasattr(arr4, 'get_lock'))
957 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000958 self.assertRaises(AttributeError,
959 self.Array, 'i', range(10), lock='notalock')
960
961 arr5 = self.RawArray('i', range(10))
962 self.assertFalse(hasattr(arr5, 'get_lock'))
963 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964
965#
966#
967#
968
969class _TestContainers(BaseTestCase):
970
971 ALLOWED_TYPES = ('manager',)
972
973 def test_list(self):
974 a = self.list(list(range(10)))
975 self.assertEqual(a[:], list(range(10)))
976
977 b = self.list()
978 self.assertEqual(b[:], [])
979
980 b.extend(list(range(5)))
981 self.assertEqual(b[:], list(range(5)))
982
983 self.assertEqual(b[2], 2)
984 self.assertEqual(b[2:10], [2,3,4])
985
986 b *= 2
987 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
988
989 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
990
991 self.assertEqual(a[:], list(range(10)))
992
993 d = [a, b]
994 e = self.list(d)
995 self.assertEqual(
996 e[:],
997 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
998 )
999
1000 f = self.list([a])
1001 a.append('hello')
1002 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1003
1004 def test_dict(self):
1005 d = self.dict()
1006 indices = list(range(65, 70))
1007 for i in indices:
1008 d[i] = chr(i)
1009 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1010 self.assertEqual(sorted(d.keys()), indices)
1011 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1012 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1013
1014 def test_namespace(self):
1015 n = self.Namespace()
1016 n.name = 'Bob'
1017 n.job = 'Builder'
1018 n._hidden = 'hidden'
1019 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1020 del n.job
1021 self.assertEqual(str(n), "Namespace(name='Bob')")
1022 self.assertTrue(hasattr(n, 'name'))
1023 self.assertTrue(not hasattr(n, 'job'))
1024
1025#
1026#
1027#
1028
1029def sqr(x, wait=0.0):
1030 time.sleep(wait)
1031 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001032
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033class _TestPool(BaseTestCase):
1034
1035 def test_apply(self):
1036 papply = self.pool.apply
1037 self.assertEqual(papply(sqr, (5,)), sqr(5))
1038 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1039
1040 def test_map(self):
1041 pmap = self.pool.map
1042 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1043 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1044 list(map(sqr, list(range(100)))))
1045
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001046 def test_map_chunksize(self):
1047 try:
1048 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1049 except multiprocessing.TimeoutError:
1050 self.fail("pool.map_async with chunksize stalled on null list")
1051
Benjamin Petersone711caf2008-06-11 16:44:04 +00001052 def test_async(self):
1053 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1054 get = TimingWrapper(res.get)
1055 self.assertEqual(get(), 49)
1056 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1057
1058 def test_async_timeout(self):
1059 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1060 get = TimingWrapper(res.get)
1061 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1062 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1063
1064 def test_imap(self):
1065 it = self.pool.imap(sqr, list(range(10)))
1066 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1067
1068 it = self.pool.imap(sqr, list(range(10)))
1069 for i in range(10):
1070 self.assertEqual(next(it), i*i)
1071 self.assertRaises(StopIteration, it.__next__)
1072
1073 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1074 for i in range(1000):
1075 self.assertEqual(next(it), i*i)
1076 self.assertRaises(StopIteration, it.__next__)
1077
1078 def test_imap_unordered(self):
1079 it = self.pool.imap_unordered(sqr, list(range(1000)))
1080 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1081
1082 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1083 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1084
1085 def test_make_pool(self):
1086 p = multiprocessing.Pool(3)
1087 self.assertEqual(3, len(p._pool))
1088 p.close()
1089 p.join()
1090
1091 def test_terminate(self):
1092 if self.TYPE == 'manager':
1093 # On Unix a forked process increfs each shared object to
1094 # which its parent process held a reference. If the
1095 # forked process gets terminated then there is likely to
1096 # be a reference leak. So to prevent
1097 # _TestZZZNumberOfObjects from failing we skip this test
1098 # when using a manager.
1099 return
1100
1101 result = self.pool.map_async(
1102 time.sleep, [0.1 for i in range(10000)], chunksize=1
1103 )
1104 self.pool.terminate()
1105 join = TimingWrapper(self.pool.join)
1106 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001107 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001108
Ask Solem2afcbf22010-11-09 20:55:52 +00001109def raising():
1110 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001111
Ask Solem2afcbf22010-11-09 20:55:52 +00001112def unpickleable_result():
1113 return lambda: 42
1114
1115class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001116 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001117
1118 def test_async_error_callback(self):
1119 p = multiprocessing.Pool(2)
1120
1121 scratchpad = [None]
1122 def errback(exc):
1123 scratchpad[0] = exc
1124
1125 res = p.apply_async(raising, error_callback=errback)
1126 self.assertRaises(KeyError, res.get)
1127 self.assertTrue(scratchpad[0])
1128 self.assertIsInstance(scratchpad[0], KeyError)
1129
1130 p.close()
1131 p.join()
1132
1133 def test_unpickleable_result(self):
1134 from multiprocessing.pool import MaybeEncodingError
1135 p = multiprocessing.Pool(2)
1136
1137 # Make sure we don't lose pool processes because of encoding errors.
1138 for iteration in range(20):
1139
1140 scratchpad = [None]
1141 def errback(exc):
1142 scratchpad[0] = exc
1143
1144 res = p.apply_async(unpickleable_result, error_callback=errback)
1145 self.assertRaises(MaybeEncodingError, res.get)
1146 wrapped = scratchpad[0]
1147 self.assertTrue(wrapped)
1148 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1149 self.assertIsNotNone(wrapped.exc)
1150 self.assertIsNotNone(wrapped.value)
1151
1152 p.close()
1153 p.join()
1154
1155class _TestPoolWorkerLifetime(BaseTestCase):
1156 ALLOWED_TYPES = ('processes', )
1157
Jesse Noller1f0b6582010-01-27 03:36:01 +00001158 def test_pool_worker_lifetime(self):
1159 p = multiprocessing.Pool(3, maxtasksperchild=10)
1160 self.assertEqual(3, len(p._pool))
1161 origworkerpids = [w.pid for w in p._pool]
1162 # Run many tasks so each worker gets replaced (hopefully)
1163 results = []
1164 for i in range(100):
1165 results.append(p.apply_async(sqr, (i, )))
1166 # Fetch the results and verify we got the right answers,
1167 # also ensuring all the tasks have completed.
1168 for (j, res) in enumerate(results):
1169 self.assertEqual(res.get(), sqr(j))
1170 # Refill the pool
1171 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001172 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001173 # (countdown * DELTA = 5 seconds max startup process time)
1174 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001175 while countdown and not all(w.is_alive() for w in p._pool):
1176 countdown -= 1
1177 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001178 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001179 # All pids should be assigned. See issue #7805.
1180 self.assertNotIn(None, origworkerpids)
1181 self.assertNotIn(None, finalworkerpids)
1182 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001183 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1184 p.close()
1185 p.join()
1186
Benjamin Petersone711caf2008-06-11 16:44:04 +00001187#
1188# Test that manager has expected number of shared objects left
1189#
1190
1191class _TestZZZNumberOfObjects(BaseTestCase):
1192 # Because test cases are sorted alphabetically, this one will get
1193 # run after all the other tests for the manager. It tests that
1194 # there have been no "reference leaks" for the manager's shared
1195 # objects. Note the comment in _TestPool.test_terminate().
1196 ALLOWED_TYPES = ('manager',)
1197
1198 def test_number_of_objects(self):
1199 EXPECTED_NUMBER = 1 # the pool object is still alive
1200 multiprocessing.active_children() # discard dead process objs
1201 gc.collect() # do garbage collection
1202 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001203 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001204 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001205 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001206 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001207
1208 self.assertEqual(refs, EXPECTED_NUMBER)
1209
1210#
1211# Test of creating a customized manager class
1212#
1213
1214from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1215
1216class FooBar(object):
1217 def f(self):
1218 return 'f()'
1219 def g(self):
1220 raise ValueError
1221 def _h(self):
1222 return '_h()'
1223
1224def baz():
1225 for i in range(10):
1226 yield i*i
1227
1228class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001229 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001230 def __iter__(self):
1231 return self
1232 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001233 return self._callmethod('__next__')
1234
1235class MyManager(BaseManager):
1236 pass
1237
1238MyManager.register('Foo', callable=FooBar)
1239MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1240MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1241
1242
1243class _TestMyManager(BaseTestCase):
1244
1245 ALLOWED_TYPES = ('manager',)
1246
1247 def test_mymanager(self):
1248 manager = MyManager()
1249 manager.start()
1250
1251 foo = manager.Foo()
1252 bar = manager.Bar()
1253 baz = manager.baz()
1254
1255 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1256 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1257
1258 self.assertEqual(foo_methods, ['f', 'g'])
1259 self.assertEqual(bar_methods, ['f', '_h'])
1260
1261 self.assertEqual(foo.f(), 'f()')
1262 self.assertRaises(ValueError, foo.g)
1263 self.assertEqual(foo._callmethod('f'), 'f()')
1264 self.assertRaises(RemoteError, foo._callmethod, '_h')
1265
1266 self.assertEqual(bar.f(), 'f()')
1267 self.assertEqual(bar._h(), '_h()')
1268 self.assertEqual(bar._callmethod('f'), 'f()')
1269 self.assertEqual(bar._callmethod('_h'), '_h()')
1270
1271 self.assertEqual(list(baz), [i*i for i in range(10)])
1272
1273 manager.shutdown()
1274
1275#
1276# Test of connecting to a remote server and using xmlrpclib for serialization
1277#
1278
1279_queue = pyqueue.Queue()
1280def get_queue():
1281 return _queue
1282
1283class QueueManager(BaseManager):
1284 '''manager class used by server process'''
1285QueueManager.register('get_queue', callable=get_queue)
1286
1287class QueueManager2(BaseManager):
1288 '''manager class which specifies the same interface as QueueManager'''
1289QueueManager2.register('get_queue')
1290
1291
1292SERIALIZER = 'xmlrpclib'
1293
1294class _TestRemoteManager(BaseTestCase):
1295
1296 ALLOWED_TYPES = ('manager',)
1297
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001298 @classmethod
1299 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001300 manager = QueueManager2(
1301 address=address, authkey=authkey, serializer=SERIALIZER
1302 )
1303 manager.connect()
1304 queue = manager.get_queue()
1305 queue.put(('hello world', None, True, 2.25))
1306
1307 def test_remote(self):
1308 authkey = os.urandom(32)
1309
1310 manager = QueueManager(
1311 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1312 )
1313 manager.start()
1314
1315 p = self.Process(target=self._putter, args=(manager.address, authkey))
1316 p.start()
1317
1318 manager2 = QueueManager2(
1319 address=manager.address, authkey=authkey, serializer=SERIALIZER
1320 )
1321 manager2.connect()
1322 queue = manager2.get_queue()
1323
1324 # Note that xmlrpclib will deserialize object as a list not a tuple
1325 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1326
1327 # Because we are using xmlrpclib for serialization instead of
1328 # pickle this will cause a serialization error.
1329 self.assertRaises(Exception, queue.put, time.sleep)
1330
1331 # Make queue finalizer run before the server is stopped
1332 del queue
1333 manager.shutdown()
1334
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001335class _TestManagerRestart(BaseTestCase):
1336
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001337 @classmethod
1338 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001339 manager = QueueManager(
1340 address=address, authkey=authkey, serializer=SERIALIZER)
1341 manager.connect()
1342 queue = manager.get_queue()
1343 queue.put('hello world')
1344
1345 def test_rapid_restart(self):
1346 authkey = os.urandom(32)
1347 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001348 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001349 srvr = manager.get_server()
1350 addr = srvr.address
1351 # Close the connection.Listener socket which gets opened as a part
1352 # of manager.get_server(). It's not needed for the test.
1353 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001354 manager.start()
1355
1356 p = self.Process(target=self._putter, args=(manager.address, authkey))
1357 p.start()
1358 queue = manager.get_queue()
1359 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001360 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001361 manager.shutdown()
1362 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001363 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001364 try:
1365 manager.start()
1366 except IOError as e:
1367 if e.errno != errno.EADDRINUSE:
1368 raise
1369 # Retry after some time, in case the old socket was lingering
1370 # (sporadic failure on buildbots)
1371 time.sleep(1.0)
1372 manager = QueueManager(
1373 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001374 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001375
Benjamin Petersone711caf2008-06-11 16:44:04 +00001376#
1377#
1378#
1379
1380SENTINEL = latin('')
1381
1382class _TestConnection(BaseTestCase):
1383
1384 ALLOWED_TYPES = ('processes', 'threads')
1385
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001386 @classmethod
1387 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001388 for msg in iter(conn.recv_bytes, SENTINEL):
1389 conn.send_bytes(msg)
1390 conn.close()
1391
1392 def test_connection(self):
1393 conn, child_conn = self.Pipe()
1394
1395 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001396 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001397 p.start()
1398
1399 seq = [1, 2.25, None]
1400 msg = latin('hello world')
1401 longmsg = msg * 10
1402 arr = array.array('i', list(range(4)))
1403
1404 if self.TYPE == 'processes':
1405 self.assertEqual(type(conn.fileno()), int)
1406
1407 self.assertEqual(conn.send(seq), None)
1408 self.assertEqual(conn.recv(), seq)
1409
1410 self.assertEqual(conn.send_bytes(msg), None)
1411 self.assertEqual(conn.recv_bytes(), msg)
1412
1413 if self.TYPE == 'processes':
1414 buffer = array.array('i', [0]*10)
1415 expected = list(arr) + [0] * (10 - len(arr))
1416 self.assertEqual(conn.send_bytes(arr), None)
1417 self.assertEqual(conn.recv_bytes_into(buffer),
1418 len(arr) * buffer.itemsize)
1419 self.assertEqual(list(buffer), expected)
1420
1421 buffer = array.array('i', [0]*10)
1422 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1423 self.assertEqual(conn.send_bytes(arr), None)
1424 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1425 len(arr) * buffer.itemsize)
1426 self.assertEqual(list(buffer), expected)
1427
1428 buffer = bytearray(latin(' ' * 40))
1429 self.assertEqual(conn.send_bytes(longmsg), None)
1430 try:
1431 res = conn.recv_bytes_into(buffer)
1432 except multiprocessing.BufferTooShort as e:
1433 self.assertEqual(e.args, (longmsg,))
1434 else:
1435 self.fail('expected BufferTooShort, got %s' % res)
1436
1437 poll = TimingWrapper(conn.poll)
1438
1439 self.assertEqual(poll(), False)
1440 self.assertTimingAlmostEqual(poll.elapsed, 0)
1441
1442 self.assertEqual(poll(TIMEOUT1), False)
1443 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1444
1445 conn.send(None)
1446
1447 self.assertEqual(poll(TIMEOUT1), True)
1448 self.assertTimingAlmostEqual(poll.elapsed, 0)
1449
1450 self.assertEqual(conn.recv(), None)
1451
1452 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1453 conn.send_bytes(really_big_msg)
1454 self.assertEqual(conn.recv_bytes(), really_big_msg)
1455
1456 conn.send_bytes(SENTINEL) # tell child to quit
1457 child_conn.close()
1458
1459 if self.TYPE == 'processes':
1460 self.assertEqual(conn.readable, True)
1461 self.assertEqual(conn.writable, True)
1462 self.assertRaises(EOFError, conn.recv)
1463 self.assertRaises(EOFError, conn.recv_bytes)
1464
1465 p.join()
1466
1467 def test_duplex_false(self):
1468 reader, writer = self.Pipe(duplex=False)
1469 self.assertEqual(writer.send(1), None)
1470 self.assertEqual(reader.recv(), 1)
1471 if self.TYPE == 'processes':
1472 self.assertEqual(reader.readable, True)
1473 self.assertEqual(reader.writable, False)
1474 self.assertEqual(writer.readable, False)
1475 self.assertEqual(writer.writable, True)
1476 self.assertRaises(IOError, reader.send, 2)
1477 self.assertRaises(IOError, writer.recv)
1478 self.assertRaises(IOError, writer.poll)
1479
1480 def test_spawn_close(self):
1481 # We test that a pipe connection can be closed by parent
1482 # process immediately after child is spawned. On Windows this
1483 # would have sometimes failed on old versions because
1484 # child_conn would be closed before the child got a chance to
1485 # duplicate it.
1486 conn, child_conn = self.Pipe()
1487
1488 p = self.Process(target=self._echo, args=(child_conn,))
1489 p.start()
1490 child_conn.close() # this might complete before child initializes
1491
1492 msg = latin('hello')
1493 conn.send_bytes(msg)
1494 self.assertEqual(conn.recv_bytes(), msg)
1495
1496 conn.send_bytes(SENTINEL)
1497 conn.close()
1498 p.join()
1499
1500 def test_sendbytes(self):
1501 if self.TYPE != 'processes':
1502 return
1503
1504 msg = latin('abcdefghijklmnopqrstuvwxyz')
1505 a, b = self.Pipe()
1506
1507 a.send_bytes(msg)
1508 self.assertEqual(b.recv_bytes(), msg)
1509
1510 a.send_bytes(msg, 5)
1511 self.assertEqual(b.recv_bytes(), msg[5:])
1512
1513 a.send_bytes(msg, 7, 8)
1514 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1515
1516 a.send_bytes(msg, 26)
1517 self.assertEqual(b.recv_bytes(), latin(''))
1518
1519 a.send_bytes(msg, 26, 0)
1520 self.assertEqual(b.recv_bytes(), latin(''))
1521
1522 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1523
1524 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1525
1526 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1527
1528 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1529
1530 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1531
Benjamin Petersone711caf2008-06-11 16:44:04 +00001532class _TestListenerClient(BaseTestCase):
1533
1534 ALLOWED_TYPES = ('processes', 'threads')
1535
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001536 @classmethod
1537 def _test(cls, address):
1538 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001539 conn.send('hello')
1540 conn.close()
1541
1542 def test_listener_client(self):
1543 for family in self.connection.families:
1544 l = self.connection.Listener(family=family)
1545 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001546 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001547 p.start()
1548 conn = l.accept()
1549 self.assertEqual(conn.recv(), 'hello')
1550 p.join()
1551 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001552#
1553# Test of sending connection and socket objects between processes
1554#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001555"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001556class _TestPicklingConnections(BaseTestCase):
1557
1558 ALLOWED_TYPES = ('processes',)
1559
1560 def _listener(self, conn, families):
1561 for fam in families:
1562 l = self.connection.Listener(family=fam)
1563 conn.send(l.address)
1564 new_conn = l.accept()
1565 conn.send(new_conn)
1566
1567 if self.TYPE == 'processes':
1568 l = socket.socket()
1569 l.bind(('localhost', 0))
1570 conn.send(l.getsockname())
1571 l.listen(1)
1572 new_conn, addr = l.accept()
1573 conn.send(new_conn)
1574
1575 conn.recv()
1576
1577 def _remote(self, conn):
1578 for (address, msg) in iter(conn.recv, None):
1579 client = self.connection.Client(address)
1580 client.send(msg.upper())
1581 client.close()
1582
1583 if self.TYPE == 'processes':
1584 address, msg = conn.recv()
1585 client = socket.socket()
1586 client.connect(address)
1587 client.sendall(msg.upper())
1588 client.close()
1589
1590 conn.close()
1591
1592 def test_pickling(self):
1593 try:
1594 multiprocessing.allow_connection_pickling()
1595 except ImportError:
1596 return
1597
1598 families = self.connection.families
1599
1600 lconn, lconn0 = self.Pipe()
1601 lp = self.Process(target=self._listener, args=(lconn0, families))
1602 lp.start()
1603 lconn0.close()
1604
1605 rconn, rconn0 = self.Pipe()
1606 rp = self.Process(target=self._remote, args=(rconn0,))
1607 rp.start()
1608 rconn0.close()
1609
1610 for fam in families:
1611 msg = ('This connection uses family %s' % fam).encode('ascii')
1612 address = lconn.recv()
1613 rconn.send((address, msg))
1614 new_conn = lconn.recv()
1615 self.assertEqual(new_conn.recv(), msg.upper())
1616
1617 rconn.send(None)
1618
1619 if self.TYPE == 'processes':
1620 msg = latin('This connection uses a normal socket')
1621 address = lconn.recv()
1622 rconn.send((address, msg))
1623 if hasattr(socket, 'fromfd'):
1624 new_conn = lconn.recv()
1625 self.assertEqual(new_conn.recv(100), msg.upper())
1626 else:
1627 # XXX On Windows with Py2.6 need to backport fromfd()
1628 discard = lconn.recv_bytes()
1629
1630 lconn.send(None)
1631
1632 rconn.close()
1633 lconn.close()
1634
1635 lp.join()
1636 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001637"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001638#
1639#
1640#
1641
1642class _TestHeap(BaseTestCase):
1643
1644 ALLOWED_TYPES = ('processes',)
1645
1646 def test_heap(self):
1647 iterations = 5000
1648 maxblocks = 50
1649 blocks = []
1650
1651 # create and destroy lots of blocks of different sizes
1652 for i in range(iterations):
1653 size = int(random.lognormvariate(0, 1) * 1000)
1654 b = multiprocessing.heap.BufferWrapper(size)
1655 blocks.append(b)
1656 if len(blocks) > maxblocks:
1657 i = random.randrange(maxblocks)
1658 del blocks[i]
1659
1660 # get the heap object
1661 heap = multiprocessing.heap.BufferWrapper._heap
1662
1663 # verify the state of the heap
1664 all = []
1665 occupied = 0
1666 for L in list(heap._len_to_seq.values()):
1667 for arena, start, stop in L:
1668 all.append((heap._arenas.index(arena), start, stop,
1669 stop-start, 'free'))
1670 for arena, start, stop in heap._allocated_blocks:
1671 all.append((heap._arenas.index(arena), start, stop,
1672 stop-start, 'occupied'))
1673 occupied += (stop-start)
1674
1675 all.sort()
1676
1677 for i in range(len(all)-1):
1678 (arena, start, stop) = all[i][:3]
1679 (narena, nstart, nstop) = all[i+1][:3]
1680 self.assertTrue((arena != narena and nstart == 0) or
1681 (stop == nstart))
1682
1683#
1684#
1685#
1686
Benjamin Petersone711caf2008-06-11 16:44:04 +00001687class _Foo(Structure):
1688 _fields_ = [
1689 ('x', c_int),
1690 ('y', c_double)
1691 ]
1692
1693class _TestSharedCTypes(BaseTestCase):
1694
1695 ALLOWED_TYPES = ('processes',)
1696
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001697 def setUp(self):
1698 if not HAS_SHAREDCTYPES:
1699 self.skipTest("requires multiprocessing.sharedctypes")
1700
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001701 @classmethod
1702 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001703 x.value *= 2
1704 y.value *= 2
1705 foo.x *= 2
1706 foo.y *= 2
1707 string.value *= 2
1708 for i in range(len(arr)):
1709 arr[i] *= 2
1710
1711 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001712 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001713 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001714 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001715 arr = self.Array('d', list(range(10)), lock=lock)
1716 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001717 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001718
1719 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1720 p.start()
1721 p.join()
1722
1723 self.assertEqual(x.value, 14)
1724 self.assertAlmostEqual(y.value, 2.0/3.0)
1725 self.assertEqual(foo.x, 6)
1726 self.assertAlmostEqual(foo.y, 4.0)
1727 for i in range(10):
1728 self.assertAlmostEqual(arr[i], i*2)
1729 self.assertEqual(string.value, latin('hellohello'))
1730
1731 def test_synchronize(self):
1732 self.test_sharedctypes(lock=True)
1733
1734 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001735 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001736 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001737 foo.x = 0
1738 foo.y = 0
1739 self.assertEqual(bar.x, 2)
1740 self.assertAlmostEqual(bar.y, 5.0)
1741
1742#
1743#
1744#
1745
1746class _TestFinalize(BaseTestCase):
1747
1748 ALLOWED_TYPES = ('processes',)
1749
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001750 @classmethod
1751 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001752 class Foo(object):
1753 pass
1754
1755 a = Foo()
1756 util.Finalize(a, conn.send, args=('a',))
1757 del a # triggers callback for a
1758
1759 b = Foo()
1760 close_b = util.Finalize(b, conn.send, args=('b',))
1761 close_b() # triggers callback for b
1762 close_b() # does nothing because callback has already been called
1763 del b # does nothing because callback has already been called
1764
1765 c = Foo()
1766 util.Finalize(c, conn.send, args=('c',))
1767
1768 d10 = Foo()
1769 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1770
1771 d01 = Foo()
1772 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1773 d02 = Foo()
1774 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1775 d03 = Foo()
1776 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1777
1778 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1779
1780 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1781
Ezio Melotti13925002011-03-16 11:05:33 +02001782 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001783 # garbage collecting locals
1784 util._exit_function()
1785 conn.close()
1786 os._exit(0)
1787
1788 def test_finalize(self):
1789 conn, child_conn = self.Pipe()
1790
1791 p = self.Process(target=self._test_finalize, args=(child_conn,))
1792 p.start()
1793 p.join()
1794
1795 result = [obj for obj in iter(conn.recv, 'STOP')]
1796 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1797
1798#
1799# Test that from ... import * works for each module
1800#
1801
1802class _TestImportStar(BaseTestCase):
1803
1804 ALLOWED_TYPES = ('processes',)
1805
1806 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001807 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001808 'multiprocessing', 'multiprocessing.connection',
1809 'multiprocessing.heap', 'multiprocessing.managers',
1810 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001811 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001812 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001813 ]
1814
1815 if c_int is not None:
1816 # This module requires _ctypes
1817 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001818
1819 for name in modules:
1820 __import__(name)
1821 mod = sys.modules[name]
1822
1823 for attr in getattr(mod, '__all__', ()):
1824 self.assertTrue(
1825 hasattr(mod, attr),
1826 '%r does not have attribute %r' % (mod, attr)
1827 )
1828
1829#
1830# Quick test that logging works -- does not test logging output
1831#
1832
1833class _TestLogging(BaseTestCase):
1834
1835 ALLOWED_TYPES = ('processes',)
1836
1837 def test_enable_logging(self):
1838 logger = multiprocessing.get_logger()
1839 logger.setLevel(util.SUBWARNING)
1840 self.assertTrue(logger is not None)
1841 logger.debug('this will not be printed')
1842 logger.info('nor will this')
1843 logger.setLevel(LOG_LEVEL)
1844
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001845 @classmethod
1846 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001847 logger = multiprocessing.get_logger()
1848 conn.send(logger.getEffectiveLevel())
1849
1850 def test_level(self):
1851 LEVEL1 = 32
1852 LEVEL2 = 37
1853
1854 logger = multiprocessing.get_logger()
1855 root_logger = logging.getLogger()
1856 root_level = root_logger.level
1857
1858 reader, writer = multiprocessing.Pipe(duplex=False)
1859
1860 logger.setLevel(LEVEL1)
1861 self.Process(target=self._test_level, args=(writer,)).start()
1862 self.assertEqual(LEVEL1, reader.recv())
1863
1864 logger.setLevel(logging.NOTSET)
1865 root_logger.setLevel(LEVEL2)
1866 self.Process(target=self._test_level, args=(writer,)).start()
1867 self.assertEqual(LEVEL2, reader.recv())
1868
1869 root_logger.setLevel(root_level)
1870 logger.setLevel(level=LOG_LEVEL)
1871
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001872
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001873# class _TestLoggingProcessName(BaseTestCase):
1874#
1875# def handle(self, record):
1876# assert record.processName == multiprocessing.current_process().name
1877# self.__handled = True
1878#
1879# def test_logging(self):
1880# handler = logging.Handler()
1881# handler.handle = self.handle
1882# self.__handled = False
1883# # Bypass getLogger() and side-effects
1884# logger = logging.getLoggerClass()(
1885# 'multiprocessing.test.TestLoggingProcessName')
1886# logger.addHandler(handler)
1887# logger.propagate = False
1888#
1889# logger.warn('foo')
1890# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001891
Benjamin Petersone711caf2008-06-11 16:44:04 +00001892#
Jesse Noller6214edd2009-01-19 16:23:53 +00001893# Test to verify handle verification, see issue 3321
1894#
1895
1896class TestInvalidHandle(unittest.TestCase):
1897
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001898 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001899 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001900 conn = _multiprocessing.Connection(44977608)
1901 self.assertRaises(IOError, conn.poll)
1902 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001903
Jesse Noller6214edd2009-01-19 16:23:53 +00001904#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001905# Functions used to create test cases from the base ones in this module
1906#
1907
1908def get_attributes(Source, names):
1909 d = {}
1910 for name in names:
1911 obj = getattr(Source, name)
1912 if type(obj) == type(get_attributes):
1913 obj = staticmethod(obj)
1914 d[name] = obj
1915 return d
1916
1917def create_test_cases(Mixin, type):
1918 result = {}
1919 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001920 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001921
1922 for name in list(glob.keys()):
1923 if name.startswith('_Test'):
1924 base = glob[name]
1925 if type in base.ALLOWED_TYPES:
1926 newname = 'With' + Type + name[1:]
1927 class Temp(base, unittest.TestCase, Mixin):
1928 pass
1929 result[newname] = Temp
1930 Temp.__name__ = newname
1931 Temp.__module__ = Mixin.__module__
1932 return result
1933
1934#
1935# Create test cases
1936#
1937
1938class ProcessesMixin(object):
1939 TYPE = 'processes'
1940 Process = multiprocessing.Process
1941 locals().update(get_attributes(multiprocessing, (
1942 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1943 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1944 'RawArray', 'current_process', 'active_children', 'Pipe',
1945 'connection', 'JoinableQueue'
1946 )))
1947
1948testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1949globals().update(testcases_processes)
1950
1951
1952class ManagerMixin(object):
1953 TYPE = 'manager'
1954 Process = multiprocessing.Process
1955 manager = object.__new__(multiprocessing.managers.SyncManager)
1956 locals().update(get_attributes(manager, (
1957 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1958 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1959 'Namespace', 'JoinableQueue'
1960 )))
1961
1962testcases_manager = create_test_cases(ManagerMixin, type='manager')
1963globals().update(testcases_manager)
1964
1965
1966class ThreadsMixin(object):
1967 TYPE = 'threads'
1968 Process = multiprocessing.dummy.Process
1969 locals().update(get_attributes(multiprocessing.dummy, (
1970 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1971 'Condition', 'Event', 'Value', 'Array', 'current_process',
1972 'active_children', 'Pipe', 'connection', 'dict', 'list',
1973 'Namespace', 'JoinableQueue'
1974 )))
1975
1976testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1977globals().update(testcases_threads)
1978
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001979class OtherTest(unittest.TestCase):
1980 # TODO: add more tests for deliver/answer challenge.
1981 def test_deliver_challenge_auth_failure(self):
1982 class _FakeConnection(object):
1983 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001984 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001985 def send_bytes(self, data):
1986 pass
1987 self.assertRaises(multiprocessing.AuthenticationError,
1988 multiprocessing.connection.deliver_challenge,
1989 _FakeConnection(), b'abc')
1990
1991 def test_answer_challenge_auth_failure(self):
1992 class _FakeConnection(object):
1993 def __init__(self):
1994 self.count = 0
1995 def recv_bytes(self, size):
1996 self.count += 1
1997 if self.count == 1:
1998 return multiprocessing.connection.CHALLENGE
1999 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002000 return b'something bogus'
2001 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002002 def send_bytes(self, data):
2003 pass
2004 self.assertRaises(multiprocessing.AuthenticationError,
2005 multiprocessing.connection.answer_challenge,
2006 _FakeConnection(), b'abc')
2007
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002008#
2009# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2010#
2011
2012def initializer(ns):
2013 ns.test += 1
2014
2015class TestInitializers(unittest.TestCase):
2016 def setUp(self):
2017 self.mgr = multiprocessing.Manager()
2018 self.ns = self.mgr.Namespace()
2019 self.ns.test = 0
2020
2021 def tearDown(self):
2022 self.mgr.shutdown()
2023
2024 def test_manager_initializer(self):
2025 m = multiprocessing.managers.SyncManager()
2026 self.assertRaises(TypeError, m.start, 1)
2027 m.start(initializer, (self.ns,))
2028 self.assertEqual(self.ns.test, 1)
2029 m.shutdown()
2030
2031 def test_pool_initializer(self):
2032 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2033 p = multiprocessing.Pool(1, initializer, (self.ns,))
2034 p.close()
2035 p.join()
2036 self.assertEqual(self.ns.test, 1)
2037
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002038#
2039# Issue 5155, 5313, 5331: Test process in processes
2040# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2041#
2042
2043def _ThisSubProcess(q):
2044 try:
2045 item = q.get(block=False)
2046 except pyqueue.Empty:
2047 pass
2048
2049def _TestProcess(q):
2050 queue = multiprocessing.Queue()
2051 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2052 subProc.start()
2053 subProc.join()
2054
2055def _afunc(x):
2056 return x*x
2057
2058def pool_in_process():
2059 pool = multiprocessing.Pool(processes=4)
2060 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2061
2062class _file_like(object):
2063 def __init__(self, delegate):
2064 self._delegate = delegate
2065 self._pid = None
2066
2067 @property
2068 def cache(self):
2069 pid = os.getpid()
2070 # There are no race conditions since fork keeps only the running thread
2071 if pid != self._pid:
2072 self._pid = pid
2073 self._cache = []
2074 return self._cache
2075
2076 def write(self, data):
2077 self.cache.append(data)
2078
2079 def flush(self):
2080 self._delegate.write(''.join(self.cache))
2081 self._cache = []
2082
2083class TestStdinBadfiledescriptor(unittest.TestCase):
2084
2085 def test_queue_in_process(self):
2086 queue = multiprocessing.Queue()
2087 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2088 proc.start()
2089 proc.join()
2090
2091 def test_pool_in_process(self):
2092 p = multiprocessing.Process(target=pool_in_process)
2093 p.start()
2094 p.join()
2095
2096 def test_flushing(self):
2097 sio = io.StringIO()
2098 flike = _file_like(sio)
2099 flike.write('foo')
2100 proc = multiprocessing.Process(target=lambda: flike.flush())
2101 flike.flush()
2102 assert sio.getvalue() == 'foo'
2103
2104testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2105 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002106
Benjamin Petersone711caf2008-06-11 16:44:04 +00002107#
2108#
2109#
2110
2111def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002112 if sys.platform.startswith("linux"):
2113 try:
2114 lock = multiprocessing.RLock()
2115 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002116 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002117
Benjamin Petersone711caf2008-06-11 16:44:04 +00002118 if run is None:
2119 from test.support import run_unittest as run
2120
2121 util.get_temp_dir() # creates temp directory for use by all processes
2122
2123 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2124
Benjamin Peterson41181742008-07-02 20:22:54 +00002125 ProcessesMixin.pool = multiprocessing.Pool(4)
2126 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2127 ManagerMixin.manager.__init__()
2128 ManagerMixin.manager.start()
2129 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002130
2131 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002132 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2133 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002134 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2135 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002136 )
2137
2138 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2139 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2140 run(suite)
2141
Benjamin Peterson41181742008-07-02 20:22:54 +00002142 ThreadsMixin.pool.terminate()
2143 ProcessesMixin.pool.terminate()
2144 ManagerMixin.pool.terminate()
2145 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002146
Benjamin Peterson41181742008-07-02 20:22:54 +00002147 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002148
2149def main():
2150 test_main(unittest.TextTestRunner(verbosity=2).run)
2151
2152if __name__ == '__main__':
2153 main()