blob: 9d58cdc0eb7007b5b53e52111c76fa6d09f421e7 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000033
34from multiprocessing import util
35
36#
37#
38#
39
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000040def latin(s):
41 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000042
Benjamin Petersone711caf2008-06-11 16:44:04 +000043#
44# Constants
45#
46
47LOG_LEVEL = util.SUBWARNING
48#LOG_LEVEL = logging.WARNING
49
50DELTA = 0.1
51CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
59
60HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
62
Jesse Noller6214edd2009-01-19 16:23:53 +000063WIN32 = (sys.platform == "win32")
64
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Creates a wrapper for a function which records the time it takes to finish
67#
68
69class TimingWrapper(object):
70
71 def __init__(self, func):
72 self.func = func
73 self.elapsed = None
74
75 def __call__(self, *args, **kwds):
76 t = time.time()
77 try:
78 return self.func(*args, **kwds)
79 finally:
80 self.elapsed = time.time() - t
81
82#
83# Base class for test cases
84#
85
86class BaseTestCase(object):
87
88 ALLOWED_TYPES = ('processes', 'manager', 'threads')
89
90 def assertTimingAlmostEqual(self, a, b):
91 if CHECK_TIMINGS:
92 self.assertAlmostEqual(a, b, 1)
93
94 def assertReturnsIfImplemented(self, value, func, *args):
95 try:
96 res = func(*args)
97 except NotImplementedError:
98 pass
99 else:
100 return self.assertEqual(value, res)
101
102#
103# Return the value of a semaphore
104#
105
106def get_value(self):
107 try:
108 return self.get_value()
109 except AttributeError:
110 try:
111 return self._Semaphore__value
112 except AttributeError:
113 try:
114 return self._value
115 except AttributeError:
116 raise NotImplementedError
117
118#
119# Testcases
120#
121
122class _TestProcess(BaseTestCase):
123
124 ALLOWED_TYPES = ('processes', 'threads')
125
126 def test_current(self):
127 if self.TYPE == 'threads':
128 return
129
130 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000131 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000132
133 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000134 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000135 self.assertTrue(isinstance(authkey, bytes))
136 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000137 self.assertEqual(current.ident, os.getpid())
138 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139
140 def _test(self, q, *args, **kwds):
141 current = self.current_process()
142 q.put(args)
143 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000144 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000145 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000146 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147 q.put(current.pid)
148
149 def test_process(self):
150 q = self.Queue(1)
151 e = self.Event()
152 args = (q, 1, 2)
153 kwargs = {'hello':23, 'bye':2.54}
154 name = 'SomeProcess'
155 p = self.Process(
156 target=self._test, args=args, kwargs=kwargs, name=name
157 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 current = self.current_process()
160
161 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000162 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 self.assertTrue(p not in self.active_children())
166 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000167 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168
169 p.start()
170
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 self.assertEquals(p.is_alive(), True)
173 self.assertTrue(p in self.active_children())
174
175 self.assertEquals(q.get(), args[1:])
176 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000177 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000179 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 self.assertEquals(q.get(), p.pid)
181
182 p.join()
183
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000184 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 self.assertEquals(p.is_alive(), False)
186 self.assertTrue(p not in self.active_children())
187
188 def _test_terminate(self):
189 time.sleep(1000)
190
191 def test_terminate(self):
192 if self.TYPE == 'threads':
193 return
194
195 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 p.start()
198
199 self.assertEqual(p.is_alive(), True)
200 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 p.terminate()
204
205 join = TimingWrapper(p.join)
206 self.assertEqual(join(), None)
207 self.assertTimingAlmostEqual(join.elapsed, 0.0)
208
209 self.assertEqual(p.is_alive(), False)
210 self.assertTrue(p not in self.active_children())
211
212 p.join()
213
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000214 # XXX sometimes get p.exitcode == 0 on Windows ...
215 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
217 def test_cpu_count(self):
218 try:
219 cpus = multiprocessing.cpu_count()
220 except NotImplementedError:
221 cpus = 1
222 self.assertTrue(type(cpus) is int)
223 self.assertTrue(cpus >= 1)
224
225 def test_active_children(self):
226 self.assertEqual(type(self.active_children()), list)
227
228 p = self.Process(target=time.sleep, args=(DELTA,))
229 self.assertTrue(p not in self.active_children())
230
231 p.start()
232 self.assertTrue(p in self.active_children())
233
234 p.join()
235 self.assertTrue(p not in self.active_children())
236
237 def _test_recursion(self, wconn, id):
238 from multiprocessing import forking
239 wconn.send(id)
240 if len(id) < 2:
241 for i in range(2):
242 p = self.Process(
243 target=self._test_recursion, args=(wconn, id+[i])
244 )
245 p.start()
246 p.join()
247
248 def test_recursion(self):
249 rconn, wconn = self.Pipe(duplex=False)
250 self._test_recursion(wconn, [])
251
252 time.sleep(DELTA)
253 result = []
254 while rconn.poll():
255 result.append(rconn.recv())
256
257 expected = [
258 [],
259 [0],
260 [0, 0],
261 [0, 1],
262 [1],
263 [1, 0],
264 [1, 1]
265 ]
266 self.assertEqual(result, expected)
267
268#
269#
270#
271
272class _UpperCaser(multiprocessing.Process):
273
274 def __init__(self):
275 multiprocessing.Process.__init__(self)
276 self.child_conn, self.parent_conn = multiprocessing.Pipe()
277
278 def run(self):
279 self.parent_conn.close()
280 for s in iter(self.child_conn.recv, None):
281 self.child_conn.send(s.upper())
282 self.child_conn.close()
283
284 def submit(self, s):
285 assert type(s) is str
286 self.parent_conn.send(s)
287 return self.parent_conn.recv()
288
289 def stop(self):
290 self.parent_conn.send(None)
291 self.parent_conn.close()
292 self.child_conn.close()
293
294class _TestSubclassingProcess(BaseTestCase):
295
296 ALLOWED_TYPES = ('processes',)
297
298 def test_subclassing(self):
299 uppercaser = _UpperCaser()
300 uppercaser.start()
301 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
302 self.assertEqual(uppercaser.submit('world'), 'WORLD')
303 uppercaser.stop()
304 uppercaser.join()
305
306#
307#
308#
309
310def queue_empty(q):
311 if hasattr(q, 'empty'):
312 return q.empty()
313 else:
314 return q.qsize() == 0
315
316def queue_full(q, maxsize):
317 if hasattr(q, 'full'):
318 return q.full()
319 else:
320 return q.qsize() == maxsize
321
322
323class _TestQueue(BaseTestCase):
324
325
326 def _test_put(self, queue, child_can_start, parent_can_continue):
327 child_can_start.wait()
328 for i in range(6):
329 queue.get()
330 parent_can_continue.set()
331
332 def test_put(self):
333 MAXSIZE = 6
334 queue = self.Queue(maxsize=MAXSIZE)
335 child_can_start = self.Event()
336 parent_can_continue = self.Event()
337
338 proc = self.Process(
339 target=self._test_put,
340 args=(queue, child_can_start, parent_can_continue)
341 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000342 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343 proc.start()
344
345 self.assertEqual(queue_empty(queue), True)
346 self.assertEqual(queue_full(queue, MAXSIZE), False)
347
348 queue.put(1)
349 queue.put(2, True)
350 queue.put(3, True, None)
351 queue.put(4, False)
352 queue.put(5, False, None)
353 queue.put_nowait(6)
354
355 # the values may be in buffer but not yet in pipe so sleep a bit
356 time.sleep(DELTA)
357
358 self.assertEqual(queue_empty(queue), False)
359 self.assertEqual(queue_full(queue, MAXSIZE), True)
360
361 put = TimingWrapper(queue.put)
362 put_nowait = TimingWrapper(queue.put_nowait)
363
364 self.assertRaises(pyqueue.Full, put, 7, False)
365 self.assertTimingAlmostEqual(put.elapsed, 0)
366
367 self.assertRaises(pyqueue.Full, put, 7, False, None)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(pyqueue.Full, put_nowait, 7)
371 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
372
373 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
374 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
375
376 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
377 self.assertTimingAlmostEqual(put.elapsed, 0)
378
379 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
380 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
381
382 child_can_start.set()
383 parent_can_continue.wait()
384
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
387
388 proc.join()
389
390 def _test_get(self, queue, child_can_start, parent_can_continue):
391 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000392 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393 queue.put(2)
394 queue.put(3)
395 queue.put(4)
396 queue.put(5)
397 parent_can_continue.set()
398
399 def test_get(self):
400 queue = self.Queue()
401 child_can_start = self.Event()
402 parent_can_continue = self.Event()
403
404 proc = self.Process(
405 target=self._test_get,
406 args=(queue, child_can_start, parent_can_continue)
407 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000408 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409 proc.start()
410
411 self.assertEqual(queue_empty(queue), True)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 time.sleep(DELTA)
417 self.assertEqual(queue_empty(queue), False)
418
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000419 # Hangs unexpectedly, remove for now
420 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421 self.assertEqual(queue.get(True, None), 2)
422 self.assertEqual(queue.get(True), 3)
423 self.assertEqual(queue.get(timeout=1), 4)
424 self.assertEqual(queue.get_nowait(), 5)
425
426 self.assertEqual(queue_empty(queue), True)
427
428 get = TimingWrapper(queue.get)
429 get_nowait = TimingWrapper(queue.get_nowait)
430
431 self.assertRaises(pyqueue.Empty, get, False)
432 self.assertTimingAlmostEqual(get.elapsed, 0)
433
434 self.assertRaises(pyqueue.Empty, get, False, None)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(pyqueue.Empty, get_nowait)
438 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
439
440 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
442
443 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
444 self.assertTimingAlmostEqual(get.elapsed, 0)
445
446 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
447 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
448
449 proc.join()
450
451 def _test_fork(self, queue):
452 for i in range(10, 20):
453 queue.put(i)
454 # note that at this point the items may only be buffered, so the
455 # process cannot shutdown until the feeder thread has finished
456 # pushing items onto the pipe.
457
458 def test_fork(self):
459 # Old versions of Queue would fail to create a new feeder
460 # thread for a forked process if the original process had its
461 # own feeder thread. This test checks that this no longer
462 # happens.
463
464 queue = self.Queue()
465
466 # put items on queue so that main process starts a feeder thread
467 for i in range(10):
468 queue.put(i)
469
470 # wait to make sure thread starts before we fork a new process
471 time.sleep(DELTA)
472
473 # fork process
474 p = self.Process(target=self._test_fork, args=(queue,))
475 p.start()
476
477 # check that all expected items are in the queue
478 for i in range(20):
479 self.assertEqual(queue.get(), i)
480 self.assertRaises(pyqueue.Empty, queue.get, False)
481
482 p.join()
483
484 def test_qsize(self):
485 q = self.Queue()
486 try:
487 self.assertEqual(q.qsize(), 0)
488 except NotImplementedError:
489 return
490 q.put(1)
491 self.assertEqual(q.qsize(), 1)
492 q.put(5)
493 self.assertEqual(q.qsize(), 2)
494 q.get()
495 self.assertEqual(q.qsize(), 1)
496 q.get()
497 self.assertEqual(q.qsize(), 0)
498
499 def _test_task_done(self, q):
500 for obj in iter(q.get, None):
501 time.sleep(DELTA)
502 q.task_done()
503
504 def test_task_done(self):
505 queue = self.JoinableQueue()
506
507 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
508 return
509
510 workers = [self.Process(target=self._test_task_done, args=(queue,))
511 for i in range(4)]
512
513 for p in workers:
514 p.start()
515
516 for i in range(10):
517 queue.put(i)
518
519 queue.join()
520
521 for p in workers:
522 queue.put(None)
523
524 for p in workers:
525 p.join()
526
527#
528#
529#
530
531class _TestLock(BaseTestCase):
532
533 def test_lock(self):
534 lock = self.Lock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(False), False)
537 self.assertEqual(lock.release(), None)
538 self.assertRaises((ValueError, threading.ThreadError), lock.release)
539
540 def test_rlock(self):
541 lock = self.RLock()
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((AssertionError, RuntimeError), lock.release)
549
Jesse Nollerf8d00852009-03-31 03:25:07 +0000550 def test_lock_context(self):
551 with self.Lock():
552 pass
553
Benjamin Petersone711caf2008-06-11 16:44:04 +0000554
555class _TestSemaphore(BaseTestCase):
556
557 def _test_semaphore(self, sem):
558 self.assertReturnsIfImplemented(2, get_value, sem)
559 self.assertEqual(sem.acquire(), True)
560 self.assertReturnsIfImplemented(1, get_value, sem)
561 self.assertEqual(sem.acquire(), True)
562 self.assertReturnsIfImplemented(0, get_value, sem)
563 self.assertEqual(sem.acquire(False), False)
564 self.assertReturnsIfImplemented(0, get_value, sem)
565 self.assertEqual(sem.release(), None)
566 self.assertReturnsIfImplemented(1, get_value, sem)
567 self.assertEqual(sem.release(), None)
568 self.assertReturnsIfImplemented(2, get_value, sem)
569
570 def test_semaphore(self):
571 sem = self.Semaphore(2)
572 self._test_semaphore(sem)
573 self.assertEqual(sem.release(), None)
574 self.assertReturnsIfImplemented(3, get_value, sem)
575 self.assertEqual(sem.release(), None)
576 self.assertReturnsIfImplemented(4, get_value, sem)
577
578 def test_bounded_semaphore(self):
579 sem = self.BoundedSemaphore(2)
580 self._test_semaphore(sem)
581 # Currently fails on OS/X
582 #if HAVE_GETVALUE:
583 # self.assertRaises(ValueError, sem.release)
584 # self.assertReturnsIfImplemented(2, get_value, sem)
585
586 def test_timeout(self):
587 if self.TYPE != 'processes':
588 return
589
590 sem = self.Semaphore(0)
591 acquire = TimingWrapper(sem.acquire)
592
593 self.assertEqual(acquire(False), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
595
596 self.assertEqual(acquire(False, None), False)
597 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
598
599 self.assertEqual(acquire(False, TIMEOUT1), False)
600 self.assertTimingAlmostEqual(acquire.elapsed, 0)
601
602 self.assertEqual(acquire(True, TIMEOUT2), False)
603 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
604
605 self.assertEqual(acquire(timeout=TIMEOUT3), False)
606 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
607
608
609class _TestCondition(BaseTestCase):
610
611 def f(self, cond, sleeping, woken, timeout=None):
612 cond.acquire()
613 sleeping.release()
614 cond.wait(timeout)
615 woken.release()
616 cond.release()
617
618 def check_invariant(self, cond):
619 # this is only supposed to succeed when there are no sleepers
620 if self.TYPE == 'processes':
621 try:
622 sleepers = (cond._sleeping_count.get_value() -
623 cond._woken_count.get_value())
624 self.assertEqual(sleepers, 0)
625 self.assertEqual(cond._wait_semaphore.get_value(), 0)
626 except NotImplementedError:
627 pass
628
629 def test_notify(self):
630 cond = self.Condition()
631 sleeping = self.Semaphore(0)
632 woken = self.Semaphore(0)
633
634 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000635 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000636 p.start()
637
638 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000639 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000640 p.start()
641
642 # wait for both children to start sleeping
643 sleeping.acquire()
644 sleeping.acquire()
645
646 # check no process/thread has woken up
647 time.sleep(DELTA)
648 self.assertReturnsIfImplemented(0, get_value, woken)
649
650 # wake up one process/thread
651 cond.acquire()
652 cond.notify()
653 cond.release()
654
655 # check one process/thread has woken up
656 time.sleep(DELTA)
657 self.assertReturnsIfImplemented(1, get_value, woken)
658
659 # wake up another
660 cond.acquire()
661 cond.notify()
662 cond.release()
663
664 # check other has woken up
665 time.sleep(DELTA)
666 self.assertReturnsIfImplemented(2, get_value, woken)
667
668 # check state is not mucked up
669 self.check_invariant(cond)
670 p.join()
671
672 def test_notify_all(self):
673 cond = self.Condition()
674 sleeping = self.Semaphore(0)
675 woken = self.Semaphore(0)
676
677 # start some threads/processes which will timeout
678 for i in range(3):
679 p = self.Process(target=self.f,
680 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000681 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000682 p.start()
683
684 t = threading.Thread(target=self.f,
685 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000686 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000687 t.start()
688
689 # wait for them all to sleep
690 for i in range(6):
691 sleeping.acquire()
692
693 # check they have all timed out
694 for i in range(6):
695 woken.acquire()
696 self.assertReturnsIfImplemented(0, get_value, woken)
697
698 # check state is not mucked up
699 self.check_invariant(cond)
700
701 # start some more threads/processes
702 for i in range(3):
703 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000704 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000705 p.start()
706
707 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000708 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000709 t.start()
710
711 # wait for them to all sleep
712 for i in range(6):
713 sleeping.acquire()
714
715 # check no process/thread has woken up
716 time.sleep(DELTA)
717 self.assertReturnsIfImplemented(0, get_value, woken)
718
719 # wake them all up
720 cond.acquire()
721 cond.notify_all()
722 cond.release()
723
724 # check they have all woken
725 time.sleep(DELTA)
726 self.assertReturnsIfImplemented(6, get_value, woken)
727
728 # check state is not mucked up
729 self.check_invariant(cond)
730
731 def test_timeout(self):
732 cond = self.Condition()
733 wait = TimingWrapper(cond.wait)
734 cond.acquire()
735 res = wait(TIMEOUT1)
736 cond.release()
737 self.assertEqual(res, None)
738 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
739
740
741class _TestEvent(BaseTestCase):
742
743 def _test_event(self, event):
744 time.sleep(TIMEOUT2)
745 event.set()
746
747 def test_event(self):
748 event = self.Event()
749 wait = TimingWrapper(event.wait)
750
751 # Removed temporaily, due to API shear, this does not
752 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000753 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000754
Benjamin Peterson965ce872009-04-05 21:24:58 +0000755 # Removed, threading.Event.wait() will return the value of the __flag
756 # instead of None. API Shear with the semaphore backed mp.Event
757 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000758 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000759 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000760 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
761
762 event.set()
763
764 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000765 self.assertEqual(event.is_set(), True)
766 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000767 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000768 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000769 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
770 # self.assertEqual(event.is_set(), True)
771
772 event.clear()
773
774 #self.assertEqual(event.is_set(), False)
775
776 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000777 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000778
779#
780#
781#
782
783class _TestValue(BaseTestCase):
784
785 codes_values = [
786 ('i', 4343, 24234),
787 ('d', 3.625, -4.25),
788 ('h', -232, 234),
789 ('c', latin('x'), latin('y'))
790 ]
791
792 def _test(self, values):
793 for sv, cv in zip(values, self.codes_values):
794 sv.value = cv[2]
795
796
797 def test_value(self, raw=False):
798 if self.TYPE != 'processes':
799 return
800
801 if raw:
802 values = [self.RawValue(code, value)
803 for code, value, _ in self.codes_values]
804 else:
805 values = [self.Value(code, value)
806 for code, value, _ in self.codes_values]
807
808 for sv, cv in zip(values, self.codes_values):
809 self.assertEqual(sv.value, cv[1])
810
811 proc = self.Process(target=self._test, args=(values,))
812 proc.start()
813 proc.join()
814
815 for sv, cv in zip(values, self.codes_values):
816 self.assertEqual(sv.value, cv[2])
817
818 def test_rawvalue(self):
819 self.test_value(raw=True)
820
821 def test_getobj_getlock(self):
822 if self.TYPE != 'processes':
823 return
824
825 val1 = self.Value('i', 5)
826 lock1 = val1.get_lock()
827 obj1 = val1.get_obj()
828
829 val2 = self.Value('i', 5, lock=None)
830 lock2 = val2.get_lock()
831 obj2 = val2.get_obj()
832
833 lock = self.Lock()
834 val3 = self.Value('i', 5, lock=lock)
835 lock3 = val3.get_lock()
836 obj3 = val3.get_obj()
837 self.assertEqual(lock, lock3)
838
Jesse Nollerb0516a62009-01-18 03:11:38 +0000839 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000840 self.assertFalse(hasattr(arr4, 'get_lock'))
841 self.assertFalse(hasattr(arr4, 'get_obj'))
842
Jesse Nollerb0516a62009-01-18 03:11:38 +0000843 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
844
845 arr5 = self.RawValue('i', 5)
846 self.assertFalse(hasattr(arr5, 'get_lock'))
847 self.assertFalse(hasattr(arr5, 'get_obj'))
848
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849
850class _TestArray(BaseTestCase):
851
852 def f(self, seq):
853 for i in range(1, len(seq)):
854 seq[i] += seq[i-1]
855
856 def test_array(self, raw=False):
857 if self.TYPE != 'processes':
858 return
859
860 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
861 if raw:
862 arr = self.RawArray('i', seq)
863 else:
864 arr = self.Array('i', seq)
865
866 self.assertEqual(len(arr), len(seq))
867 self.assertEqual(arr[3], seq[3])
868 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
869
870 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
871
872 self.assertEqual(list(arr[:]), seq)
873
874 self.f(seq)
875
876 p = self.Process(target=self.f, args=(arr,))
877 p.start()
878 p.join()
879
880 self.assertEqual(list(arr[:]), seq)
881
882 def test_rawarray(self):
883 self.test_array(raw=True)
884
885 def test_getobj_getlock_obj(self):
886 if self.TYPE != 'processes':
887 return
888
889 arr1 = self.Array('i', list(range(10)))
890 lock1 = arr1.get_lock()
891 obj1 = arr1.get_obj()
892
893 arr2 = self.Array('i', list(range(10)), lock=None)
894 lock2 = arr2.get_lock()
895 obj2 = arr2.get_obj()
896
897 lock = self.Lock()
898 arr3 = self.Array('i', list(range(10)), lock=lock)
899 lock3 = arr3.get_lock()
900 obj3 = arr3.get_obj()
901 self.assertEqual(lock, lock3)
902
Jesse Nollerb0516a62009-01-18 03:11:38 +0000903 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 self.assertFalse(hasattr(arr4, 'get_lock'))
905 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000906 self.assertRaises(AttributeError,
907 self.Array, 'i', range(10), lock='notalock')
908
909 arr5 = self.RawArray('i', range(10))
910 self.assertFalse(hasattr(arr5, 'get_lock'))
911 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000912
913#
914#
915#
916
917class _TestContainers(BaseTestCase):
918
919 ALLOWED_TYPES = ('manager',)
920
921 def test_list(self):
922 a = self.list(list(range(10)))
923 self.assertEqual(a[:], list(range(10)))
924
925 b = self.list()
926 self.assertEqual(b[:], [])
927
928 b.extend(list(range(5)))
929 self.assertEqual(b[:], list(range(5)))
930
931 self.assertEqual(b[2], 2)
932 self.assertEqual(b[2:10], [2,3,4])
933
934 b *= 2
935 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
936
937 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
938
939 self.assertEqual(a[:], list(range(10)))
940
941 d = [a, b]
942 e = self.list(d)
943 self.assertEqual(
944 e[:],
945 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
946 )
947
948 f = self.list([a])
949 a.append('hello')
950 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
951
952 def test_dict(self):
953 d = self.dict()
954 indices = list(range(65, 70))
955 for i in indices:
956 d[i] = chr(i)
957 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
958 self.assertEqual(sorted(d.keys()), indices)
959 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
960 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
961
962 def test_namespace(self):
963 n = self.Namespace()
964 n.name = 'Bob'
965 n.job = 'Builder'
966 n._hidden = 'hidden'
967 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
968 del n.job
969 self.assertEqual(str(n), "Namespace(name='Bob')")
970 self.assertTrue(hasattr(n, 'name'))
971 self.assertTrue(not hasattr(n, 'job'))
972
973#
974#
975#
976
977def sqr(x, wait=0.0):
978 time.sleep(wait)
979 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000980class _TestPool(BaseTestCase):
981
982 def test_apply(self):
983 papply = self.pool.apply
984 self.assertEqual(papply(sqr, (5,)), sqr(5))
985 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
986
987 def test_map(self):
988 pmap = self.pool.map
989 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
990 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
991 list(map(sqr, list(range(100)))))
992
993 def test_async(self):
994 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
995 get = TimingWrapper(res.get)
996 self.assertEqual(get(), 49)
997 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
998
999 def test_async_timeout(self):
1000 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1001 get = TimingWrapper(res.get)
1002 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1003 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1004
1005 def test_imap(self):
1006 it = self.pool.imap(sqr, list(range(10)))
1007 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1008
1009 it = self.pool.imap(sqr, list(range(10)))
1010 for i in range(10):
1011 self.assertEqual(next(it), i*i)
1012 self.assertRaises(StopIteration, it.__next__)
1013
1014 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1015 for i in range(1000):
1016 self.assertEqual(next(it), i*i)
1017 self.assertRaises(StopIteration, it.__next__)
1018
1019 def test_imap_unordered(self):
1020 it = self.pool.imap_unordered(sqr, list(range(1000)))
1021 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1022
1023 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1024 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1025
1026 def test_make_pool(self):
1027 p = multiprocessing.Pool(3)
1028 self.assertEqual(3, len(p._pool))
1029 p.close()
1030 p.join()
1031
1032 def test_terminate(self):
1033 if self.TYPE == 'manager':
1034 # On Unix a forked process increfs each shared object to
1035 # which its parent process held a reference. If the
1036 # forked process gets terminated then there is likely to
1037 # be a reference leak. So to prevent
1038 # _TestZZZNumberOfObjects from failing we skip this test
1039 # when using a manager.
1040 return
1041
1042 result = self.pool.map_async(
1043 time.sleep, [0.1 for i in range(10000)], chunksize=1
1044 )
1045 self.pool.terminate()
1046 join = TimingWrapper(self.pool.join)
1047 join()
1048 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001049#
1050# Test that manager has expected number of shared objects left
1051#
1052
1053class _TestZZZNumberOfObjects(BaseTestCase):
1054 # Because test cases are sorted alphabetically, this one will get
1055 # run after all the other tests for the manager. It tests that
1056 # there have been no "reference leaks" for the manager's shared
1057 # objects. Note the comment in _TestPool.test_terminate().
1058 ALLOWED_TYPES = ('manager',)
1059
1060 def test_number_of_objects(self):
1061 EXPECTED_NUMBER = 1 # the pool object is still alive
1062 multiprocessing.active_children() # discard dead process objs
1063 gc.collect() # do garbage collection
1064 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001065 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001066 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001067 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001068 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001069
1070 self.assertEqual(refs, EXPECTED_NUMBER)
1071
1072#
1073# Test of creating a customized manager class
1074#
1075
1076from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1077
1078class FooBar(object):
1079 def f(self):
1080 return 'f()'
1081 def g(self):
1082 raise ValueError
1083 def _h(self):
1084 return '_h()'
1085
1086def baz():
1087 for i in range(10):
1088 yield i*i
1089
1090class IteratorProxy(BaseProxy):
1091 _exposed_ = ('next', '__next__')
1092 def __iter__(self):
1093 return self
1094 def __next__(self):
1095 return self._callmethod('next')
1096 def __next__(self):
1097 return self._callmethod('__next__')
1098
1099class MyManager(BaseManager):
1100 pass
1101
1102MyManager.register('Foo', callable=FooBar)
1103MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1104MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1105
1106
1107class _TestMyManager(BaseTestCase):
1108
1109 ALLOWED_TYPES = ('manager',)
1110
1111 def test_mymanager(self):
1112 manager = MyManager()
1113 manager.start()
1114
1115 foo = manager.Foo()
1116 bar = manager.Bar()
1117 baz = manager.baz()
1118
1119 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1120 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1121
1122 self.assertEqual(foo_methods, ['f', 'g'])
1123 self.assertEqual(bar_methods, ['f', '_h'])
1124
1125 self.assertEqual(foo.f(), 'f()')
1126 self.assertRaises(ValueError, foo.g)
1127 self.assertEqual(foo._callmethod('f'), 'f()')
1128 self.assertRaises(RemoteError, foo._callmethod, '_h')
1129
1130 self.assertEqual(bar.f(), 'f()')
1131 self.assertEqual(bar._h(), '_h()')
1132 self.assertEqual(bar._callmethod('f'), 'f()')
1133 self.assertEqual(bar._callmethod('_h'), '_h()')
1134
1135 self.assertEqual(list(baz), [i*i for i in range(10)])
1136
1137 manager.shutdown()
1138
1139#
1140# Test of connecting to a remote server and using xmlrpclib for serialization
1141#
1142
1143_queue = pyqueue.Queue()
1144def get_queue():
1145 return _queue
1146
1147class QueueManager(BaseManager):
1148 '''manager class used by server process'''
1149QueueManager.register('get_queue', callable=get_queue)
1150
1151class QueueManager2(BaseManager):
1152 '''manager class which specifies the same interface as QueueManager'''
1153QueueManager2.register('get_queue')
1154
1155
1156SERIALIZER = 'xmlrpclib'
1157
1158class _TestRemoteManager(BaseTestCase):
1159
1160 ALLOWED_TYPES = ('manager',)
1161
1162 def _putter(self, address, authkey):
1163 manager = QueueManager2(
1164 address=address, authkey=authkey, serializer=SERIALIZER
1165 )
1166 manager.connect()
1167 queue = manager.get_queue()
1168 queue.put(('hello world', None, True, 2.25))
1169
1170 def test_remote(self):
1171 authkey = os.urandom(32)
1172
1173 manager = QueueManager(
1174 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1175 )
1176 manager.start()
1177
1178 p = self.Process(target=self._putter, args=(manager.address, authkey))
1179 p.start()
1180
1181 manager2 = QueueManager2(
1182 address=manager.address, authkey=authkey, serializer=SERIALIZER
1183 )
1184 manager2.connect()
1185 queue = manager2.get_queue()
1186
1187 # Note that xmlrpclib will deserialize object as a list not a tuple
1188 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1189
1190 # Because we are using xmlrpclib for serialization instead of
1191 # pickle this will cause a serialization error.
1192 self.assertRaises(Exception, queue.put, time.sleep)
1193
1194 # Make queue finalizer run before the server is stopped
1195 del queue
1196 manager.shutdown()
1197
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001198class _TestManagerRestart(BaseTestCase):
1199
1200 def _putter(self, address, authkey):
1201 manager = QueueManager(
1202 address=address, authkey=authkey, serializer=SERIALIZER)
1203 manager.connect()
1204 queue = manager.get_queue()
1205 queue.put('hello world')
1206
1207 def test_rapid_restart(self):
1208 authkey = os.urandom(32)
1209 manager = QueueManager(
1210 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1211 manager.start()
1212
1213 p = self.Process(target=self._putter, args=(manager.address, authkey))
1214 p.start()
1215 queue = manager.get_queue()
1216 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001217 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001218 manager.shutdown()
1219 manager = QueueManager(
1220 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1221 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001222 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001223
Benjamin Petersone711caf2008-06-11 16:44:04 +00001224#
1225#
1226#
1227
1228SENTINEL = latin('')
1229
1230class _TestConnection(BaseTestCase):
1231
1232 ALLOWED_TYPES = ('processes', 'threads')
1233
1234 def _echo(self, conn):
1235 for msg in iter(conn.recv_bytes, SENTINEL):
1236 conn.send_bytes(msg)
1237 conn.close()
1238
1239 def test_connection(self):
1240 conn, child_conn = self.Pipe()
1241
1242 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001243 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001244 p.start()
1245
1246 seq = [1, 2.25, None]
1247 msg = latin('hello world')
1248 longmsg = msg * 10
1249 arr = array.array('i', list(range(4)))
1250
1251 if self.TYPE == 'processes':
1252 self.assertEqual(type(conn.fileno()), int)
1253
1254 self.assertEqual(conn.send(seq), None)
1255 self.assertEqual(conn.recv(), seq)
1256
1257 self.assertEqual(conn.send_bytes(msg), None)
1258 self.assertEqual(conn.recv_bytes(), msg)
1259
1260 if self.TYPE == 'processes':
1261 buffer = array.array('i', [0]*10)
1262 expected = list(arr) + [0] * (10 - len(arr))
1263 self.assertEqual(conn.send_bytes(arr), None)
1264 self.assertEqual(conn.recv_bytes_into(buffer),
1265 len(arr) * buffer.itemsize)
1266 self.assertEqual(list(buffer), expected)
1267
1268 buffer = array.array('i', [0]*10)
1269 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1270 self.assertEqual(conn.send_bytes(arr), None)
1271 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1272 len(arr) * buffer.itemsize)
1273 self.assertEqual(list(buffer), expected)
1274
1275 buffer = bytearray(latin(' ' * 40))
1276 self.assertEqual(conn.send_bytes(longmsg), None)
1277 try:
1278 res = conn.recv_bytes_into(buffer)
1279 except multiprocessing.BufferTooShort as e:
1280 self.assertEqual(e.args, (longmsg,))
1281 else:
1282 self.fail('expected BufferTooShort, got %s' % res)
1283
1284 poll = TimingWrapper(conn.poll)
1285
1286 self.assertEqual(poll(), False)
1287 self.assertTimingAlmostEqual(poll.elapsed, 0)
1288
1289 self.assertEqual(poll(TIMEOUT1), False)
1290 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1291
1292 conn.send(None)
1293
1294 self.assertEqual(poll(TIMEOUT1), True)
1295 self.assertTimingAlmostEqual(poll.elapsed, 0)
1296
1297 self.assertEqual(conn.recv(), None)
1298
1299 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1300 conn.send_bytes(really_big_msg)
1301 self.assertEqual(conn.recv_bytes(), really_big_msg)
1302
1303 conn.send_bytes(SENTINEL) # tell child to quit
1304 child_conn.close()
1305
1306 if self.TYPE == 'processes':
1307 self.assertEqual(conn.readable, True)
1308 self.assertEqual(conn.writable, True)
1309 self.assertRaises(EOFError, conn.recv)
1310 self.assertRaises(EOFError, conn.recv_bytes)
1311
1312 p.join()
1313
1314 def test_duplex_false(self):
1315 reader, writer = self.Pipe(duplex=False)
1316 self.assertEqual(writer.send(1), None)
1317 self.assertEqual(reader.recv(), 1)
1318 if self.TYPE == 'processes':
1319 self.assertEqual(reader.readable, True)
1320 self.assertEqual(reader.writable, False)
1321 self.assertEqual(writer.readable, False)
1322 self.assertEqual(writer.writable, True)
1323 self.assertRaises(IOError, reader.send, 2)
1324 self.assertRaises(IOError, writer.recv)
1325 self.assertRaises(IOError, writer.poll)
1326
1327 def test_spawn_close(self):
1328 # We test that a pipe connection can be closed by parent
1329 # process immediately after child is spawned. On Windows this
1330 # would have sometimes failed on old versions because
1331 # child_conn would be closed before the child got a chance to
1332 # duplicate it.
1333 conn, child_conn = self.Pipe()
1334
1335 p = self.Process(target=self._echo, args=(child_conn,))
1336 p.start()
1337 child_conn.close() # this might complete before child initializes
1338
1339 msg = latin('hello')
1340 conn.send_bytes(msg)
1341 self.assertEqual(conn.recv_bytes(), msg)
1342
1343 conn.send_bytes(SENTINEL)
1344 conn.close()
1345 p.join()
1346
1347 def test_sendbytes(self):
1348 if self.TYPE != 'processes':
1349 return
1350
1351 msg = latin('abcdefghijklmnopqrstuvwxyz')
1352 a, b = self.Pipe()
1353
1354 a.send_bytes(msg)
1355 self.assertEqual(b.recv_bytes(), msg)
1356
1357 a.send_bytes(msg, 5)
1358 self.assertEqual(b.recv_bytes(), msg[5:])
1359
1360 a.send_bytes(msg, 7, 8)
1361 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1362
1363 a.send_bytes(msg, 26)
1364 self.assertEqual(b.recv_bytes(), latin(''))
1365
1366 a.send_bytes(msg, 26, 0)
1367 self.assertEqual(b.recv_bytes(), latin(''))
1368
1369 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1370
1371 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1372
1373 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1374
1375 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1376
1377 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1378
Benjamin Petersone711caf2008-06-11 16:44:04 +00001379class _TestListenerClient(BaseTestCase):
1380
1381 ALLOWED_TYPES = ('processes', 'threads')
1382
1383 def _test(self, address):
1384 conn = self.connection.Client(address)
1385 conn.send('hello')
1386 conn.close()
1387
1388 def test_listener_client(self):
1389 for family in self.connection.families:
1390 l = self.connection.Listener(family=family)
1391 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001392 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001393 p.start()
1394 conn = l.accept()
1395 self.assertEqual(conn.recv(), 'hello')
1396 p.join()
1397 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001398#
1399# Test of sending connection and socket objects between processes
1400#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001401"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001402class _TestPicklingConnections(BaseTestCase):
1403
1404 ALLOWED_TYPES = ('processes',)
1405
1406 def _listener(self, conn, families):
1407 for fam in families:
1408 l = self.connection.Listener(family=fam)
1409 conn.send(l.address)
1410 new_conn = l.accept()
1411 conn.send(new_conn)
1412
1413 if self.TYPE == 'processes':
1414 l = socket.socket()
1415 l.bind(('localhost', 0))
1416 conn.send(l.getsockname())
1417 l.listen(1)
1418 new_conn, addr = l.accept()
1419 conn.send(new_conn)
1420
1421 conn.recv()
1422
1423 def _remote(self, conn):
1424 for (address, msg) in iter(conn.recv, None):
1425 client = self.connection.Client(address)
1426 client.send(msg.upper())
1427 client.close()
1428
1429 if self.TYPE == 'processes':
1430 address, msg = conn.recv()
1431 client = socket.socket()
1432 client.connect(address)
1433 client.sendall(msg.upper())
1434 client.close()
1435
1436 conn.close()
1437
1438 def test_pickling(self):
1439 try:
1440 multiprocessing.allow_connection_pickling()
1441 except ImportError:
1442 return
1443
1444 families = self.connection.families
1445
1446 lconn, lconn0 = self.Pipe()
1447 lp = self.Process(target=self._listener, args=(lconn0, families))
1448 lp.start()
1449 lconn0.close()
1450
1451 rconn, rconn0 = self.Pipe()
1452 rp = self.Process(target=self._remote, args=(rconn0,))
1453 rp.start()
1454 rconn0.close()
1455
1456 for fam in families:
1457 msg = ('This connection uses family %s' % fam).encode('ascii')
1458 address = lconn.recv()
1459 rconn.send((address, msg))
1460 new_conn = lconn.recv()
1461 self.assertEqual(new_conn.recv(), msg.upper())
1462
1463 rconn.send(None)
1464
1465 if self.TYPE == 'processes':
1466 msg = latin('This connection uses a normal socket')
1467 address = lconn.recv()
1468 rconn.send((address, msg))
1469 if hasattr(socket, 'fromfd'):
1470 new_conn = lconn.recv()
1471 self.assertEqual(new_conn.recv(100), msg.upper())
1472 else:
1473 # XXX On Windows with Py2.6 need to backport fromfd()
1474 discard = lconn.recv_bytes()
1475
1476 lconn.send(None)
1477
1478 rconn.close()
1479 lconn.close()
1480
1481 lp.join()
1482 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001483"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001484#
1485#
1486#
1487
1488class _TestHeap(BaseTestCase):
1489
1490 ALLOWED_TYPES = ('processes',)
1491
1492 def test_heap(self):
1493 iterations = 5000
1494 maxblocks = 50
1495 blocks = []
1496
1497 # create and destroy lots of blocks of different sizes
1498 for i in range(iterations):
1499 size = int(random.lognormvariate(0, 1) * 1000)
1500 b = multiprocessing.heap.BufferWrapper(size)
1501 blocks.append(b)
1502 if len(blocks) > maxblocks:
1503 i = random.randrange(maxblocks)
1504 del blocks[i]
1505
1506 # get the heap object
1507 heap = multiprocessing.heap.BufferWrapper._heap
1508
1509 # verify the state of the heap
1510 all = []
1511 occupied = 0
1512 for L in list(heap._len_to_seq.values()):
1513 for arena, start, stop in L:
1514 all.append((heap._arenas.index(arena), start, stop,
1515 stop-start, 'free'))
1516 for arena, start, stop in heap._allocated_blocks:
1517 all.append((heap._arenas.index(arena), start, stop,
1518 stop-start, 'occupied'))
1519 occupied += (stop-start)
1520
1521 all.sort()
1522
1523 for i in range(len(all)-1):
1524 (arena, start, stop) = all[i][:3]
1525 (narena, nstart, nstop) = all[i+1][:3]
1526 self.assertTrue((arena != narena and nstart == 0) or
1527 (stop == nstart))
1528
1529#
1530#
1531#
1532
1533try:
1534 from ctypes import Structure, Value, copy, c_int, c_double
1535except ImportError:
1536 Structure = object
1537 c_int = c_double = None
1538
1539class _Foo(Structure):
1540 _fields_ = [
1541 ('x', c_int),
1542 ('y', c_double)
1543 ]
1544
1545class _TestSharedCTypes(BaseTestCase):
1546
1547 ALLOWED_TYPES = ('processes',)
1548
1549 def _double(self, x, y, foo, arr, string):
1550 x.value *= 2
1551 y.value *= 2
1552 foo.x *= 2
1553 foo.y *= 2
1554 string.value *= 2
1555 for i in range(len(arr)):
1556 arr[i] *= 2
1557
1558 def test_sharedctypes(self, lock=False):
1559 if c_int is None:
1560 return
1561
1562 x = Value('i', 7, lock=lock)
1563 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1564 foo = Value(_Foo, 3, 2, lock=lock)
1565 arr = Array('d', list(range(10)), lock=lock)
1566 string = Array('c', 20, lock=lock)
1567 string.value = 'hello'
1568
1569 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1570 p.start()
1571 p.join()
1572
1573 self.assertEqual(x.value, 14)
1574 self.assertAlmostEqual(y.value, 2.0/3.0)
1575 self.assertEqual(foo.x, 6)
1576 self.assertAlmostEqual(foo.y, 4.0)
1577 for i in range(10):
1578 self.assertAlmostEqual(arr[i], i*2)
1579 self.assertEqual(string.value, latin('hellohello'))
1580
1581 def test_synchronize(self):
1582 self.test_sharedctypes(lock=True)
1583
1584 def test_copy(self):
1585 if c_int is None:
1586 return
1587
1588 foo = _Foo(2, 5.0)
1589 bar = copy(foo)
1590 foo.x = 0
1591 foo.y = 0
1592 self.assertEqual(bar.x, 2)
1593 self.assertAlmostEqual(bar.y, 5.0)
1594
1595#
1596#
1597#
1598
1599class _TestFinalize(BaseTestCase):
1600
1601 ALLOWED_TYPES = ('processes',)
1602
1603 def _test_finalize(self, conn):
1604 class Foo(object):
1605 pass
1606
1607 a = Foo()
1608 util.Finalize(a, conn.send, args=('a',))
1609 del a # triggers callback for a
1610
1611 b = Foo()
1612 close_b = util.Finalize(b, conn.send, args=('b',))
1613 close_b() # triggers callback for b
1614 close_b() # does nothing because callback has already been called
1615 del b # does nothing because callback has already been called
1616
1617 c = Foo()
1618 util.Finalize(c, conn.send, args=('c',))
1619
1620 d10 = Foo()
1621 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1622
1623 d01 = Foo()
1624 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1625 d02 = Foo()
1626 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1627 d03 = Foo()
1628 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1629
1630 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1631
1632 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1633
1634 # call mutliprocessing's cleanup function then exit process without
1635 # garbage collecting locals
1636 util._exit_function()
1637 conn.close()
1638 os._exit(0)
1639
1640 def test_finalize(self):
1641 conn, child_conn = self.Pipe()
1642
1643 p = self.Process(target=self._test_finalize, args=(child_conn,))
1644 p.start()
1645 p.join()
1646
1647 result = [obj for obj in iter(conn.recv, 'STOP')]
1648 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1649
1650#
1651# Test that from ... import * works for each module
1652#
1653
1654class _TestImportStar(BaseTestCase):
1655
1656 ALLOWED_TYPES = ('processes',)
1657
1658 def test_import(self):
1659 modules = (
1660 'multiprocessing', 'multiprocessing.connection',
1661 'multiprocessing.heap', 'multiprocessing.managers',
1662 'multiprocessing.pool', 'multiprocessing.process',
1663 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1664 'multiprocessing.synchronize', 'multiprocessing.util'
1665 )
1666
1667 for name in modules:
1668 __import__(name)
1669 mod = sys.modules[name]
1670
1671 for attr in getattr(mod, '__all__', ()):
1672 self.assertTrue(
1673 hasattr(mod, attr),
1674 '%r does not have attribute %r' % (mod, attr)
1675 )
1676
1677#
1678# Quick test that logging works -- does not test logging output
1679#
1680
1681class _TestLogging(BaseTestCase):
1682
1683 ALLOWED_TYPES = ('processes',)
1684
1685 def test_enable_logging(self):
1686 logger = multiprocessing.get_logger()
1687 logger.setLevel(util.SUBWARNING)
1688 self.assertTrue(logger is not None)
1689 logger.debug('this will not be printed')
1690 logger.info('nor will this')
1691 logger.setLevel(LOG_LEVEL)
1692
1693 def _test_level(self, conn):
1694 logger = multiprocessing.get_logger()
1695 conn.send(logger.getEffectiveLevel())
1696
1697 def test_level(self):
1698 LEVEL1 = 32
1699 LEVEL2 = 37
1700
1701 logger = multiprocessing.get_logger()
1702 root_logger = logging.getLogger()
1703 root_level = root_logger.level
1704
1705 reader, writer = multiprocessing.Pipe(duplex=False)
1706
1707 logger.setLevel(LEVEL1)
1708 self.Process(target=self._test_level, args=(writer,)).start()
1709 self.assertEqual(LEVEL1, reader.recv())
1710
1711 logger.setLevel(logging.NOTSET)
1712 root_logger.setLevel(LEVEL2)
1713 self.Process(target=self._test_level, args=(writer,)).start()
1714 self.assertEqual(LEVEL2, reader.recv())
1715
1716 root_logger.setLevel(root_level)
1717 logger.setLevel(level=LOG_LEVEL)
1718
1719#
Jesse Noller6214edd2009-01-19 16:23:53 +00001720# Test to verify handle verification, see issue 3321
1721#
1722
1723class TestInvalidHandle(unittest.TestCase):
1724
1725 def test_invalid_handles(self):
1726 if WIN32:
1727 return
1728 conn = _multiprocessing.Connection(44977608)
1729 self.assertRaises(IOError, conn.poll)
1730 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1731#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001732# Functions used to create test cases from the base ones in this module
1733#
1734
1735def get_attributes(Source, names):
1736 d = {}
1737 for name in names:
1738 obj = getattr(Source, name)
1739 if type(obj) == type(get_attributes):
1740 obj = staticmethod(obj)
1741 d[name] = obj
1742 return d
1743
1744def create_test_cases(Mixin, type):
1745 result = {}
1746 glob = globals()
1747 Type = type[0].upper() + type[1:]
1748
1749 for name in list(glob.keys()):
1750 if name.startswith('_Test'):
1751 base = glob[name]
1752 if type in base.ALLOWED_TYPES:
1753 newname = 'With' + Type + name[1:]
1754 class Temp(base, unittest.TestCase, Mixin):
1755 pass
1756 result[newname] = Temp
1757 Temp.__name__ = newname
1758 Temp.__module__ = Mixin.__module__
1759 return result
1760
1761#
1762# Create test cases
1763#
1764
1765class ProcessesMixin(object):
1766 TYPE = 'processes'
1767 Process = multiprocessing.Process
1768 locals().update(get_attributes(multiprocessing, (
1769 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1770 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1771 'RawArray', 'current_process', 'active_children', 'Pipe',
1772 'connection', 'JoinableQueue'
1773 )))
1774
1775testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1776globals().update(testcases_processes)
1777
1778
1779class ManagerMixin(object):
1780 TYPE = 'manager'
1781 Process = multiprocessing.Process
1782 manager = object.__new__(multiprocessing.managers.SyncManager)
1783 locals().update(get_attributes(manager, (
1784 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1785 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1786 'Namespace', 'JoinableQueue'
1787 )))
1788
1789testcases_manager = create_test_cases(ManagerMixin, type='manager')
1790globals().update(testcases_manager)
1791
1792
1793class ThreadsMixin(object):
1794 TYPE = 'threads'
1795 Process = multiprocessing.dummy.Process
1796 locals().update(get_attributes(multiprocessing.dummy, (
1797 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1798 'Condition', 'Event', 'Value', 'Array', 'current_process',
1799 'active_children', 'Pipe', 'connection', 'dict', 'list',
1800 'Namespace', 'JoinableQueue'
1801 )))
1802
1803testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1804globals().update(testcases_threads)
1805
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001806class OtherTest(unittest.TestCase):
1807 # TODO: add more tests for deliver/answer challenge.
1808 def test_deliver_challenge_auth_failure(self):
1809 class _FakeConnection(object):
1810 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001811 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001812 def send_bytes(self, data):
1813 pass
1814 self.assertRaises(multiprocessing.AuthenticationError,
1815 multiprocessing.connection.deliver_challenge,
1816 _FakeConnection(), b'abc')
1817
1818 def test_answer_challenge_auth_failure(self):
1819 class _FakeConnection(object):
1820 def __init__(self):
1821 self.count = 0
1822 def recv_bytes(self, size):
1823 self.count += 1
1824 if self.count == 1:
1825 return multiprocessing.connection.CHALLENGE
1826 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001827 return b'something bogus'
1828 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001829 def send_bytes(self, data):
1830 pass
1831 self.assertRaises(multiprocessing.AuthenticationError,
1832 multiprocessing.connection.answer_challenge,
1833 _FakeConnection(), b'abc')
1834
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001835#
1836# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1837#
1838
1839def initializer(ns):
1840 ns.test += 1
1841
1842class TestInitializers(unittest.TestCase):
1843 def setUp(self):
1844 self.mgr = multiprocessing.Manager()
1845 self.ns = self.mgr.Namespace()
1846 self.ns.test = 0
1847
1848 def tearDown(self):
1849 self.mgr.shutdown()
1850
1851 def test_manager_initializer(self):
1852 m = multiprocessing.managers.SyncManager()
1853 self.assertRaises(TypeError, m.start, 1)
1854 m.start(initializer, (self.ns,))
1855 self.assertEqual(self.ns.test, 1)
1856 m.shutdown()
1857
1858 def test_pool_initializer(self):
1859 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1860 p = multiprocessing.Pool(1, initializer, (self.ns,))
1861 p.close()
1862 p.join()
1863 self.assertEqual(self.ns.test, 1)
1864
1865testcases_other = [OtherTest, TestInvalidHandle, TestInitializers]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001866
Benjamin Petersone711caf2008-06-11 16:44:04 +00001867#
1868#
1869#
1870
1871def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001872 if sys.platform.startswith("linux"):
1873 try:
1874 lock = multiprocessing.RLock()
1875 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00001876 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001877
Benjamin Petersone711caf2008-06-11 16:44:04 +00001878 if run is None:
1879 from test.support import run_unittest as run
1880
1881 util.get_temp_dir() # creates temp directory for use by all processes
1882
1883 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1884
Benjamin Peterson41181742008-07-02 20:22:54 +00001885 ProcessesMixin.pool = multiprocessing.Pool(4)
1886 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1887 ManagerMixin.manager.__init__()
1888 ManagerMixin.manager.start()
1889 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001890
1891 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001892 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1893 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001894 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1895 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001896 )
1897
1898 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1899 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1900 run(suite)
1901
Benjamin Peterson41181742008-07-02 20:22:54 +00001902 ThreadsMixin.pool.terminate()
1903 ProcessesMixin.pool.terminate()
1904 ManagerMixin.pool.terminate()
1905 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001906
Benjamin Peterson41181742008-07-02 20:22:54 +00001907 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001908
1909def main():
1910 test_main(unittest.TextTestRunner(verbosity=2).run)
1911
1912if __name__ == '__main__':
1913 main()