blob: baaece8afd36958bb6b7177c764c951d3f477737 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
R. David Murray3db8a342009-03-30 23:05:48 +000020import test_support
Benjamin Petersondfd79492008-06-13 19:13:39 +000021
Jesse Noller37040cd2008-09-30 00:15:45 +000022
R. David Murray3db8a342009-03-30 23:05:48 +000023_multiprocessing = test_support.import_module('_multiprocessing')
24
Jesse Noller37040cd2008-09-30 00:15:45 +000025# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000026test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000027
Benjamin Petersondfd79492008-06-13 19:13:39 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000032import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000033
34from multiprocessing import util
35
36#
37#
38#
39
Benjamin Petersone79edf52008-07-13 18:34:58 +000040latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000041
Benjamin Petersondfd79492008-06-13 19:13:39 +000042#
43# Constants
44#
45
46LOG_LEVEL = util.SUBWARNING
47#LOG_LEVEL = logging.WARNING
48
49DELTA = 0.1
50CHECK_TIMINGS = False # making true makes tests take a lot longer
51 # and can sometimes cause some non-serious
52 # failures because some calls block a bit
53 # longer than expected
54if CHECK_TIMINGS:
55 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
56else:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
58
59HAVE_GETVALUE = not getattr(_multiprocessing,
60 'HAVE_BROKEN_SEM_GETVALUE', False)
61
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000062WIN32 = (sys.platform == "win32")
63
Benjamin Petersondfd79492008-06-13 19:13:39 +000064#
65# Creates a wrapper for a function which records the time it takes to finish
66#
67
68class TimingWrapper(object):
69
70 def __init__(self, func):
71 self.func = func
72 self.elapsed = None
73
74 def __call__(self, *args, **kwds):
75 t = time.time()
76 try:
77 return self.func(*args, **kwds)
78 finally:
79 self.elapsed = time.time() - t
80
81#
82# Base class for test cases
83#
84
85class BaseTestCase(object):
86
87 ALLOWED_TYPES = ('processes', 'manager', 'threads')
88
89 def assertTimingAlmostEqual(self, a, b):
90 if CHECK_TIMINGS:
91 self.assertAlmostEqual(a, b, 1)
92
93 def assertReturnsIfImplemented(self, value, func, *args):
94 try:
95 res = func(*args)
96 except NotImplementedError:
97 pass
98 else:
99 return self.assertEqual(value, res)
100
101#
102# Return the value of a semaphore
103#
104
105def get_value(self):
106 try:
107 return self.get_value()
108 except AttributeError:
109 try:
110 return self._Semaphore__value
111 except AttributeError:
112 try:
113 return self._value
114 except AttributeError:
115 raise NotImplementedError
116
117#
118# Testcases
119#
120
121class _TestProcess(BaseTestCase):
122
123 ALLOWED_TYPES = ('processes', 'threads')
124
125 def test_current(self):
126 if self.TYPE == 'threads':
127 return
128
129 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000130 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000131
132 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000133 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000134 self.assertTrue(isinstance(authkey, bytes))
135 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000136 self.assertEqual(current.ident, os.getpid())
137 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000138
139 def _test(self, q, *args, **kwds):
140 current = self.current_process()
141 q.put(args)
142 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000143 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000144 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000145 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000146 q.put(current.pid)
147
148 def test_process(self):
149 q = self.Queue(1)
150 e = self.Event()
151 args = (q, 1, 2)
152 kwargs = {'hello':23, 'bye':2.54}
153 name = 'SomeProcess'
154 p = self.Process(
155 target=self._test, args=args, kwargs=kwargs, name=name
156 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158 current = self.current_process()
159
160 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000161 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000163 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000164 self.assertTrue(p not in self.active_children())
165 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000166 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000167
168 p.start()
169
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 self.assertEquals(p.is_alive(), True)
172 self.assertTrue(p in self.active_children())
173
174 self.assertEquals(q.get(), args[1:])
175 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000176 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000177 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000178 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179 self.assertEquals(q.get(), p.pid)
180
181 p.join()
182
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000183 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184 self.assertEquals(p.is_alive(), False)
185 self.assertTrue(p not in self.active_children())
186
187 def _test_terminate(self):
188 time.sleep(1000)
189
190 def test_terminate(self):
191 if self.TYPE == 'threads':
192 return
193
194 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000195 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196 p.start()
197
198 self.assertEqual(p.is_alive(), True)
199 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201
202 p.terminate()
203
204 join = TimingWrapper(p.join)
205 self.assertEqual(join(), None)
206 self.assertTimingAlmostEqual(join.elapsed, 0.0)
207
208 self.assertEqual(p.is_alive(), False)
209 self.assertTrue(p not in self.active_children())
210
211 p.join()
212
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000213 # XXX sometimes get p.exitcode == 0 on Windows ...
214 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215
216 def test_cpu_count(self):
217 try:
218 cpus = multiprocessing.cpu_count()
219 except NotImplementedError:
220 cpus = 1
221 self.assertTrue(type(cpus) is int)
222 self.assertTrue(cpus >= 1)
223
224 def test_active_children(self):
225 self.assertEqual(type(self.active_children()), list)
226
227 p = self.Process(target=time.sleep, args=(DELTA,))
228 self.assertTrue(p not in self.active_children())
229
230 p.start()
231 self.assertTrue(p in self.active_children())
232
233 p.join()
234 self.assertTrue(p not in self.active_children())
235
236 def _test_recursion(self, wconn, id):
237 from multiprocessing import forking
238 wconn.send(id)
239 if len(id) < 2:
240 for i in range(2):
241 p = self.Process(
242 target=self._test_recursion, args=(wconn, id+[i])
243 )
244 p.start()
245 p.join()
246
247 def test_recursion(self):
248 rconn, wconn = self.Pipe(duplex=False)
249 self._test_recursion(wconn, [])
250
251 time.sleep(DELTA)
252 result = []
253 while rconn.poll():
254 result.append(rconn.recv())
255
256 expected = [
257 [],
258 [0],
259 [0, 0],
260 [0, 1],
261 [1],
262 [1, 0],
263 [1, 1]
264 ]
265 self.assertEqual(result, expected)
266
267#
268#
269#
270
271class _UpperCaser(multiprocessing.Process):
272
273 def __init__(self):
274 multiprocessing.Process.__init__(self)
275 self.child_conn, self.parent_conn = multiprocessing.Pipe()
276
277 def run(self):
278 self.parent_conn.close()
279 for s in iter(self.child_conn.recv, None):
280 self.child_conn.send(s.upper())
281 self.child_conn.close()
282
283 def submit(self, s):
284 assert type(s) is str
285 self.parent_conn.send(s)
286 return self.parent_conn.recv()
287
288 def stop(self):
289 self.parent_conn.send(None)
290 self.parent_conn.close()
291 self.child_conn.close()
292
293class _TestSubclassingProcess(BaseTestCase):
294
295 ALLOWED_TYPES = ('processes',)
296
297 def test_subclassing(self):
298 uppercaser = _UpperCaser()
299 uppercaser.start()
300 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
301 self.assertEqual(uppercaser.submit('world'), 'WORLD')
302 uppercaser.stop()
303 uppercaser.join()
304
305#
306#
307#
308
309def queue_empty(q):
310 if hasattr(q, 'empty'):
311 return q.empty()
312 else:
313 return q.qsize() == 0
314
315def queue_full(q, maxsize):
316 if hasattr(q, 'full'):
317 return q.full()
318 else:
319 return q.qsize() == maxsize
320
321
322class _TestQueue(BaseTestCase):
323
324
325 def _test_put(self, queue, child_can_start, parent_can_continue):
326 child_can_start.wait()
327 for i in range(6):
328 queue.get()
329 parent_can_continue.set()
330
331 def test_put(self):
332 MAXSIZE = 6
333 queue = self.Queue(maxsize=MAXSIZE)
334 child_can_start = self.Event()
335 parent_can_continue = self.Event()
336
337 proc = self.Process(
338 target=self._test_put,
339 args=(queue, child_can_start, parent_can_continue)
340 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000341 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000342 proc.start()
343
344 self.assertEqual(queue_empty(queue), True)
345 self.assertEqual(queue_full(queue, MAXSIZE), False)
346
347 queue.put(1)
348 queue.put(2, True)
349 queue.put(3, True, None)
350 queue.put(4, False)
351 queue.put(5, False, None)
352 queue.put_nowait(6)
353
354 # the values may be in buffer but not yet in pipe so sleep a bit
355 time.sleep(DELTA)
356
357 self.assertEqual(queue_empty(queue), False)
358 self.assertEqual(queue_full(queue, MAXSIZE), True)
359
360 put = TimingWrapper(queue.put)
361 put_nowait = TimingWrapper(queue.put_nowait)
362
363 self.assertRaises(Queue.Full, put, 7, False)
364 self.assertTimingAlmostEqual(put.elapsed, 0)
365
366 self.assertRaises(Queue.Full, put, 7, False, None)
367 self.assertTimingAlmostEqual(put.elapsed, 0)
368
369 self.assertRaises(Queue.Full, put_nowait, 7)
370 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
371
372 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
373 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
374
375 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
376 self.assertTimingAlmostEqual(put.elapsed, 0)
377
378 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
379 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
380
381 child_can_start.set()
382 parent_can_continue.wait()
383
384 self.assertEqual(queue_empty(queue), True)
385 self.assertEqual(queue_full(queue, MAXSIZE), False)
386
387 proc.join()
388
389 def _test_get(self, queue, child_can_start, parent_can_continue):
390 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000391 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000392 queue.put(2)
393 queue.put(3)
394 queue.put(4)
395 queue.put(5)
396 parent_can_continue.set()
397
398 def test_get(self):
399 queue = self.Queue()
400 child_can_start = self.Event()
401 parent_can_continue = self.Event()
402
403 proc = self.Process(
404 target=self._test_get,
405 args=(queue, child_can_start, parent_can_continue)
406 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000407 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000408 proc.start()
409
410 self.assertEqual(queue_empty(queue), True)
411
412 child_can_start.set()
413 parent_can_continue.wait()
414
415 time.sleep(DELTA)
416 self.assertEqual(queue_empty(queue), False)
417
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000418 # Hangs unexpectedly, remove for now
419 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000420 self.assertEqual(queue.get(True, None), 2)
421 self.assertEqual(queue.get(True), 3)
422 self.assertEqual(queue.get(timeout=1), 4)
423 self.assertEqual(queue.get_nowait(), 5)
424
425 self.assertEqual(queue_empty(queue), True)
426
427 get = TimingWrapper(queue.get)
428 get_nowait = TimingWrapper(queue.get_nowait)
429
430 self.assertRaises(Queue.Empty, get, False)
431 self.assertTimingAlmostEqual(get.elapsed, 0)
432
433 self.assertRaises(Queue.Empty, get, False, None)
434 self.assertTimingAlmostEqual(get.elapsed, 0)
435
436 self.assertRaises(Queue.Empty, get_nowait)
437 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
438
439 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
440 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
441
442 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
443 self.assertTimingAlmostEqual(get.elapsed, 0)
444
445 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
446 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
447
448 proc.join()
449
450 def _test_fork(self, queue):
451 for i in range(10, 20):
452 queue.put(i)
453 # note that at this point the items may only be buffered, so the
454 # process cannot shutdown until the feeder thread has finished
455 # pushing items onto the pipe.
456
457 def test_fork(self):
458 # Old versions of Queue would fail to create a new feeder
459 # thread for a forked process if the original process had its
460 # own feeder thread. This test checks that this no longer
461 # happens.
462
463 queue = self.Queue()
464
465 # put items on queue so that main process starts a feeder thread
466 for i in range(10):
467 queue.put(i)
468
469 # wait to make sure thread starts before we fork a new process
470 time.sleep(DELTA)
471
472 # fork process
473 p = self.Process(target=self._test_fork, args=(queue,))
474 p.start()
475
476 # check that all expected items are in the queue
477 for i in range(20):
478 self.assertEqual(queue.get(), i)
479 self.assertRaises(Queue.Empty, queue.get, False)
480
481 p.join()
482
483 def test_qsize(self):
484 q = self.Queue()
485 try:
486 self.assertEqual(q.qsize(), 0)
487 except NotImplementedError:
488 return
489 q.put(1)
490 self.assertEqual(q.qsize(), 1)
491 q.put(5)
492 self.assertEqual(q.qsize(), 2)
493 q.get()
494 self.assertEqual(q.qsize(), 1)
495 q.get()
496 self.assertEqual(q.qsize(), 0)
497
498 def _test_task_done(self, q):
499 for obj in iter(q.get, None):
500 time.sleep(DELTA)
501 q.task_done()
502
503 def test_task_done(self):
504 queue = self.JoinableQueue()
505
506 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
507 return
508
509 workers = [self.Process(target=self._test_task_done, args=(queue,))
510 for i in xrange(4)]
511
512 for p in workers:
513 p.start()
514
515 for i in xrange(10):
516 queue.put(i)
517
518 queue.join()
519
520 for p in workers:
521 queue.put(None)
522
523 for p in workers:
524 p.join()
525
526#
527#
528#
529
530class _TestLock(BaseTestCase):
531
532 def test_lock(self):
533 lock = self.Lock()
534 self.assertEqual(lock.acquire(), True)
535 self.assertEqual(lock.acquire(False), False)
536 self.assertEqual(lock.release(), None)
537 self.assertRaises((ValueError, threading.ThreadError), lock.release)
538
539 def test_rlock(self):
540 lock = self.RLock()
541 self.assertEqual(lock.acquire(), True)
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.release(), None)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertRaises((AssertionError, RuntimeError), lock.release)
548
Jesse Noller82eb5902009-03-30 23:29:31 +0000549 def test_lock_context(self):
550 with self.Lock():
551 pass
552
Benjamin Petersondfd79492008-06-13 19:13:39 +0000553
554class _TestSemaphore(BaseTestCase):
555
556 def _test_semaphore(self, sem):
557 self.assertReturnsIfImplemented(2, get_value, sem)
558 self.assertEqual(sem.acquire(), True)
559 self.assertReturnsIfImplemented(1, get_value, sem)
560 self.assertEqual(sem.acquire(), True)
561 self.assertReturnsIfImplemented(0, get_value, sem)
562 self.assertEqual(sem.acquire(False), False)
563 self.assertReturnsIfImplemented(0, get_value, sem)
564 self.assertEqual(sem.release(), None)
565 self.assertReturnsIfImplemented(1, get_value, sem)
566 self.assertEqual(sem.release(), None)
567 self.assertReturnsIfImplemented(2, get_value, sem)
568
569 def test_semaphore(self):
570 sem = self.Semaphore(2)
571 self._test_semaphore(sem)
572 self.assertEqual(sem.release(), None)
573 self.assertReturnsIfImplemented(3, get_value, sem)
574 self.assertEqual(sem.release(), None)
575 self.assertReturnsIfImplemented(4, get_value, sem)
576
577 def test_bounded_semaphore(self):
578 sem = self.BoundedSemaphore(2)
579 self._test_semaphore(sem)
580 # Currently fails on OS/X
581 #if HAVE_GETVALUE:
582 # self.assertRaises(ValueError, sem.release)
583 # self.assertReturnsIfImplemented(2, get_value, sem)
584
585 def test_timeout(self):
586 if self.TYPE != 'processes':
587 return
588
589 sem = self.Semaphore(0)
590 acquire = TimingWrapper(sem.acquire)
591
592 self.assertEqual(acquire(False), False)
593 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
594
595 self.assertEqual(acquire(False, None), False)
596 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
597
598 self.assertEqual(acquire(False, TIMEOUT1), False)
599 self.assertTimingAlmostEqual(acquire.elapsed, 0)
600
601 self.assertEqual(acquire(True, TIMEOUT2), False)
602 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
603
604 self.assertEqual(acquire(timeout=TIMEOUT3), False)
605 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
606
607
608class _TestCondition(BaseTestCase):
609
610 def f(self, cond, sleeping, woken, timeout=None):
611 cond.acquire()
612 sleeping.release()
613 cond.wait(timeout)
614 woken.release()
615 cond.release()
616
617 def check_invariant(self, cond):
618 # this is only supposed to succeed when there are no sleepers
619 if self.TYPE == 'processes':
620 try:
621 sleepers = (cond._sleeping_count.get_value() -
622 cond._woken_count.get_value())
623 self.assertEqual(sleepers, 0)
624 self.assertEqual(cond._wait_semaphore.get_value(), 0)
625 except NotImplementedError:
626 pass
627
628 def test_notify(self):
629 cond = self.Condition()
630 sleeping = self.Semaphore(0)
631 woken = self.Semaphore(0)
632
633 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000634 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000635 p.start()
636
637 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000638 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000639 p.start()
640
641 # wait for both children to start sleeping
642 sleeping.acquire()
643 sleeping.acquire()
644
645 # check no process/thread has woken up
646 time.sleep(DELTA)
647 self.assertReturnsIfImplemented(0, get_value, woken)
648
649 # wake up one process/thread
650 cond.acquire()
651 cond.notify()
652 cond.release()
653
654 # check one process/thread has woken up
655 time.sleep(DELTA)
656 self.assertReturnsIfImplemented(1, get_value, woken)
657
658 # wake up another
659 cond.acquire()
660 cond.notify()
661 cond.release()
662
663 # check other has woken up
664 time.sleep(DELTA)
665 self.assertReturnsIfImplemented(2, get_value, woken)
666
667 # check state is not mucked up
668 self.check_invariant(cond)
669 p.join()
670
671 def test_notify_all(self):
672 cond = self.Condition()
673 sleeping = self.Semaphore(0)
674 woken = self.Semaphore(0)
675
676 # start some threads/processes which will timeout
677 for i in range(3):
678 p = self.Process(target=self.f,
679 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000680 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000681 p.start()
682
683 t = threading.Thread(target=self.f,
684 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000685 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000686 t.start()
687
688 # wait for them all to sleep
689 for i in xrange(6):
690 sleeping.acquire()
691
692 # check they have all timed out
693 for i in xrange(6):
694 woken.acquire()
695 self.assertReturnsIfImplemented(0, get_value, woken)
696
697 # check state is not mucked up
698 self.check_invariant(cond)
699
700 # start some more threads/processes
701 for i in range(3):
702 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000703 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704 p.start()
705
706 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000707 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000708 t.start()
709
710 # wait for them to all sleep
711 for i in xrange(6):
712 sleeping.acquire()
713
714 # check no process/thread has woken up
715 time.sleep(DELTA)
716 self.assertReturnsIfImplemented(0, get_value, woken)
717
718 # wake them all up
719 cond.acquire()
720 cond.notify_all()
721 cond.release()
722
723 # check they have all woken
724 time.sleep(DELTA)
725 self.assertReturnsIfImplemented(6, get_value, woken)
726
727 # check state is not mucked up
728 self.check_invariant(cond)
729
730 def test_timeout(self):
731 cond = self.Condition()
732 wait = TimingWrapper(cond.wait)
733 cond.acquire()
734 res = wait(TIMEOUT1)
735 cond.release()
736 self.assertEqual(res, None)
737 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
738
739
740class _TestEvent(BaseTestCase):
741
742 def _test_event(self, event):
743 time.sleep(TIMEOUT2)
744 event.set()
745
746 def test_event(self):
747 event = self.Event()
748 wait = TimingWrapper(event.wait)
749
750 # Removed temporaily, due to API shear, this does not
751 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000752 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000753
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000754 # Removed, threading.Event.wait() will return the value of the __flag
755 # instead of None. API Shear with the semaphore backed mp.Event
756 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000757 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000758 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000759 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
760
761 event.set()
762
763 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000764 self.assertEqual(event.is_set(), True)
765 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000766 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000767 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000768 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
769 # self.assertEqual(event.is_set(), True)
770
771 event.clear()
772
773 #self.assertEqual(event.is_set(), False)
774
775 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000776 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000777
778#
779#
780#
781
782class _TestValue(BaseTestCase):
783
784 codes_values = [
785 ('i', 4343, 24234),
786 ('d', 3.625, -4.25),
787 ('h', -232, 234),
788 ('c', latin('x'), latin('y'))
789 ]
790
791 def _test(self, values):
792 for sv, cv in zip(values, self.codes_values):
793 sv.value = cv[2]
794
795
796 def test_value(self, raw=False):
797 if self.TYPE != 'processes':
798 return
799
800 if raw:
801 values = [self.RawValue(code, value)
802 for code, value, _ in self.codes_values]
803 else:
804 values = [self.Value(code, value)
805 for code, value, _ in self.codes_values]
806
807 for sv, cv in zip(values, self.codes_values):
808 self.assertEqual(sv.value, cv[1])
809
810 proc = self.Process(target=self._test, args=(values,))
811 proc.start()
812 proc.join()
813
814 for sv, cv in zip(values, self.codes_values):
815 self.assertEqual(sv.value, cv[2])
816
817 def test_rawvalue(self):
818 self.test_value(raw=True)
819
820 def test_getobj_getlock(self):
821 if self.TYPE != 'processes':
822 return
823
824 val1 = self.Value('i', 5)
825 lock1 = val1.get_lock()
826 obj1 = val1.get_obj()
827
828 val2 = self.Value('i', 5, lock=None)
829 lock2 = val2.get_lock()
830 obj2 = val2.get_obj()
831
832 lock = self.Lock()
833 val3 = self.Value('i', 5, lock=lock)
834 lock3 = val3.get_lock()
835 obj3 = val3.get_obj()
836 self.assertEqual(lock, lock3)
837
Jesse Noller6ab22152009-01-18 02:45:38 +0000838 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000839 self.assertFalse(hasattr(arr4, 'get_lock'))
840 self.assertFalse(hasattr(arr4, 'get_obj'))
841
Jesse Noller6ab22152009-01-18 02:45:38 +0000842 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
843
844 arr5 = self.RawValue('i', 5)
845 self.assertFalse(hasattr(arr5, 'get_lock'))
846 self.assertFalse(hasattr(arr5, 'get_obj'))
847
Benjamin Petersondfd79492008-06-13 19:13:39 +0000848
849class _TestArray(BaseTestCase):
850
851 def f(self, seq):
852 for i in range(1, len(seq)):
853 seq[i] += seq[i-1]
854
855 def test_array(self, raw=False):
856 if self.TYPE != 'processes':
857 return
858
859 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
860 if raw:
861 arr = self.RawArray('i', seq)
862 else:
863 arr = self.Array('i', seq)
864
865 self.assertEqual(len(arr), len(seq))
866 self.assertEqual(arr[3], seq[3])
867 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
868
869 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
870
871 self.assertEqual(list(arr[:]), seq)
872
873 self.f(seq)
874
875 p = self.Process(target=self.f, args=(arr,))
876 p.start()
877 p.join()
878
879 self.assertEqual(list(arr[:]), seq)
880
881 def test_rawarray(self):
882 self.test_array(raw=True)
883
884 def test_getobj_getlock_obj(self):
885 if self.TYPE != 'processes':
886 return
887
888 arr1 = self.Array('i', range(10))
889 lock1 = arr1.get_lock()
890 obj1 = arr1.get_obj()
891
892 arr2 = self.Array('i', range(10), lock=None)
893 lock2 = arr2.get_lock()
894 obj2 = arr2.get_obj()
895
896 lock = self.Lock()
897 arr3 = self.Array('i', range(10), lock=lock)
898 lock3 = arr3.get_lock()
899 obj3 = arr3.get_obj()
900 self.assertEqual(lock, lock3)
901
Jesse Noller6ab22152009-01-18 02:45:38 +0000902 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000903 self.assertFalse(hasattr(arr4, 'get_lock'))
904 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000905 self.assertRaises(AttributeError,
906 self.Array, 'i', range(10), lock='notalock')
907
908 arr5 = self.RawArray('i', range(10))
909 self.assertFalse(hasattr(arr5, 'get_lock'))
910 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000911
912#
913#
914#
915
916class _TestContainers(BaseTestCase):
917
918 ALLOWED_TYPES = ('manager',)
919
920 def test_list(self):
921 a = self.list(range(10))
922 self.assertEqual(a[:], range(10))
923
924 b = self.list()
925 self.assertEqual(b[:], [])
926
927 b.extend(range(5))
928 self.assertEqual(b[:], range(5))
929
930 self.assertEqual(b[2], 2)
931 self.assertEqual(b[2:10], [2,3,4])
932
933 b *= 2
934 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
935
936 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
937
938 self.assertEqual(a[:], range(10))
939
940 d = [a, b]
941 e = self.list(d)
942 self.assertEqual(
943 e[:],
944 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
945 )
946
947 f = self.list([a])
948 a.append('hello')
949 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
950
951 def test_dict(self):
952 d = self.dict()
953 indices = range(65, 70)
954 for i in indices:
955 d[i] = chr(i)
956 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
957 self.assertEqual(sorted(d.keys()), indices)
958 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
959 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
960
961 def test_namespace(self):
962 n = self.Namespace()
963 n.name = 'Bob'
964 n.job = 'Builder'
965 n._hidden = 'hidden'
966 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
967 del n.job
968 self.assertEqual(str(n), "Namespace(name='Bob')")
969 self.assertTrue(hasattr(n, 'name'))
970 self.assertTrue(not hasattr(n, 'job'))
971
972#
973#
974#
975
976def sqr(x, wait=0.0):
977 time.sleep(wait)
978 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000979class _TestPool(BaseTestCase):
980
981 def test_apply(self):
982 papply = self.pool.apply
983 self.assertEqual(papply(sqr, (5,)), sqr(5))
984 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
985
986 def test_map(self):
987 pmap = self.pool.map
988 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
989 self.assertEqual(pmap(sqr, range(100), chunksize=20),
990 map(sqr, range(100)))
991
992 def test_async(self):
993 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
994 get = TimingWrapper(res.get)
995 self.assertEqual(get(), 49)
996 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
997
998 def test_async_timeout(self):
999 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1000 get = TimingWrapper(res.get)
1001 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1002 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1003
1004 def test_imap(self):
1005 it = self.pool.imap(sqr, range(10))
1006 self.assertEqual(list(it), map(sqr, range(10)))
1007
1008 it = self.pool.imap(sqr, range(10))
1009 for i in range(10):
1010 self.assertEqual(it.next(), i*i)
1011 self.assertRaises(StopIteration, it.next)
1012
1013 it = self.pool.imap(sqr, range(1000), chunksize=100)
1014 for i in range(1000):
1015 self.assertEqual(it.next(), i*i)
1016 self.assertRaises(StopIteration, it.next)
1017
1018 def test_imap_unordered(self):
1019 it = self.pool.imap_unordered(sqr, range(1000))
1020 self.assertEqual(sorted(it), map(sqr, range(1000)))
1021
1022 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1023 self.assertEqual(sorted(it), map(sqr, range(1000)))
1024
1025 def test_make_pool(self):
1026 p = multiprocessing.Pool(3)
1027 self.assertEqual(3, len(p._pool))
1028 p.close()
1029 p.join()
1030
1031 def test_terminate(self):
1032 if self.TYPE == 'manager':
1033 # On Unix a forked process increfs each shared object to
1034 # which its parent process held a reference. If the
1035 # forked process gets terminated then there is likely to
1036 # be a reference leak. So to prevent
1037 # _TestZZZNumberOfObjects from failing we skip this test
1038 # when using a manager.
1039 return
1040
1041 result = self.pool.map_async(
1042 time.sleep, [0.1 for i in range(10000)], chunksize=1
1043 )
1044 self.pool.terminate()
1045 join = TimingWrapper(self.pool.join)
1046 join()
1047 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001048#
1049# Test that manager has expected number of shared objects left
1050#
1051
1052class _TestZZZNumberOfObjects(BaseTestCase):
1053 # Because test cases are sorted alphabetically, this one will get
1054 # run after all the other tests for the manager. It tests that
1055 # there have been no "reference leaks" for the manager's shared
1056 # objects. Note the comment in _TestPool.test_terminate().
1057 ALLOWED_TYPES = ('manager',)
1058
1059 def test_number_of_objects(self):
1060 EXPECTED_NUMBER = 1 # the pool object is still alive
1061 multiprocessing.active_children() # discard dead process objs
1062 gc.collect() # do garbage collection
1063 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001064 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001065 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001066 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001067 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001068
1069 self.assertEqual(refs, EXPECTED_NUMBER)
1070
1071#
1072# Test of creating a customized manager class
1073#
1074
1075from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1076
1077class FooBar(object):
1078 def f(self):
1079 return 'f()'
1080 def g(self):
1081 raise ValueError
1082 def _h(self):
1083 return '_h()'
1084
1085def baz():
1086 for i in xrange(10):
1087 yield i*i
1088
1089class IteratorProxy(BaseProxy):
1090 _exposed_ = ('next', '__next__')
1091 def __iter__(self):
1092 return self
1093 def next(self):
1094 return self._callmethod('next')
1095 def __next__(self):
1096 return self._callmethod('__next__')
1097
1098class MyManager(BaseManager):
1099 pass
1100
1101MyManager.register('Foo', callable=FooBar)
1102MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1103MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1104
1105
1106class _TestMyManager(BaseTestCase):
1107
1108 ALLOWED_TYPES = ('manager',)
1109
1110 def test_mymanager(self):
1111 manager = MyManager()
1112 manager.start()
1113
1114 foo = manager.Foo()
1115 bar = manager.Bar()
1116 baz = manager.baz()
1117
1118 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1119 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1120
1121 self.assertEqual(foo_methods, ['f', 'g'])
1122 self.assertEqual(bar_methods, ['f', '_h'])
1123
1124 self.assertEqual(foo.f(), 'f()')
1125 self.assertRaises(ValueError, foo.g)
1126 self.assertEqual(foo._callmethod('f'), 'f()')
1127 self.assertRaises(RemoteError, foo._callmethod, '_h')
1128
1129 self.assertEqual(bar.f(), 'f()')
1130 self.assertEqual(bar._h(), '_h()')
1131 self.assertEqual(bar._callmethod('f'), 'f()')
1132 self.assertEqual(bar._callmethod('_h'), '_h()')
1133
1134 self.assertEqual(list(baz), [i*i for i in range(10)])
1135
1136 manager.shutdown()
1137
1138#
1139# Test of connecting to a remote server and using xmlrpclib for serialization
1140#
1141
1142_queue = Queue.Queue()
1143def get_queue():
1144 return _queue
1145
1146class QueueManager(BaseManager):
1147 '''manager class used by server process'''
1148QueueManager.register('get_queue', callable=get_queue)
1149
1150class QueueManager2(BaseManager):
1151 '''manager class which specifies the same interface as QueueManager'''
1152QueueManager2.register('get_queue')
1153
1154
1155SERIALIZER = 'xmlrpclib'
1156
1157class _TestRemoteManager(BaseTestCase):
1158
1159 ALLOWED_TYPES = ('manager',)
1160
1161 def _putter(self, address, authkey):
1162 manager = QueueManager2(
1163 address=address, authkey=authkey, serializer=SERIALIZER
1164 )
1165 manager.connect()
1166 queue = manager.get_queue()
1167 queue.put(('hello world', None, True, 2.25))
1168
1169 def test_remote(self):
1170 authkey = os.urandom(32)
1171
1172 manager = QueueManager(
1173 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1174 )
1175 manager.start()
1176
1177 p = self.Process(target=self._putter, args=(manager.address, authkey))
1178 p.start()
1179
1180 manager2 = QueueManager2(
1181 address=manager.address, authkey=authkey, serializer=SERIALIZER
1182 )
1183 manager2.connect()
1184 queue = manager2.get_queue()
1185
1186 # Note that xmlrpclib will deserialize object as a list not a tuple
1187 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1188
1189 # Because we are using xmlrpclib for serialization instead of
1190 # pickle this will cause a serialization error.
1191 self.assertRaises(Exception, queue.put, time.sleep)
1192
1193 # Make queue finalizer run before the server is stopped
1194 del queue
1195 manager.shutdown()
1196
Jesse Noller459a6482009-03-30 15:50:42 +00001197class _TestManagerRestart(BaseTestCase):
1198
1199 def _putter(self, address, authkey):
1200 manager = QueueManager(
1201 address=address, authkey=authkey, serializer=SERIALIZER)
1202 manager.connect()
1203 queue = manager.get_queue()
1204 queue.put('hello world')
1205
1206 def test_rapid_restart(self):
1207 authkey = os.urandom(32)
1208 manager = QueueManager(
1209 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1210 manager.start()
1211
1212 p = self.Process(target=self._putter, args=(manager.address, authkey))
1213 p.start()
1214 queue = manager.get_queue()
1215 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001216 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001217 manager.shutdown()
1218 manager = QueueManager(
1219 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1220 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001221 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001222
Benjamin Petersondfd79492008-06-13 19:13:39 +00001223#
1224#
1225#
1226
1227SENTINEL = latin('')
1228
1229class _TestConnection(BaseTestCase):
1230
1231 ALLOWED_TYPES = ('processes', 'threads')
1232
1233 def _echo(self, conn):
1234 for msg in iter(conn.recv_bytes, SENTINEL):
1235 conn.send_bytes(msg)
1236 conn.close()
1237
1238 def test_connection(self):
1239 conn, child_conn = self.Pipe()
1240
1241 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001242 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001243 p.start()
1244
1245 seq = [1, 2.25, None]
1246 msg = latin('hello world')
1247 longmsg = msg * 10
1248 arr = array.array('i', range(4))
1249
1250 if self.TYPE == 'processes':
1251 self.assertEqual(type(conn.fileno()), int)
1252
1253 self.assertEqual(conn.send(seq), None)
1254 self.assertEqual(conn.recv(), seq)
1255
1256 self.assertEqual(conn.send_bytes(msg), None)
1257 self.assertEqual(conn.recv_bytes(), msg)
1258
1259 if self.TYPE == 'processes':
1260 buffer = array.array('i', [0]*10)
1261 expected = list(arr) + [0] * (10 - len(arr))
1262 self.assertEqual(conn.send_bytes(arr), None)
1263 self.assertEqual(conn.recv_bytes_into(buffer),
1264 len(arr) * buffer.itemsize)
1265 self.assertEqual(list(buffer), expected)
1266
1267 buffer = array.array('i', [0]*10)
1268 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1269 self.assertEqual(conn.send_bytes(arr), None)
1270 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1271 len(arr) * buffer.itemsize)
1272 self.assertEqual(list(buffer), expected)
1273
1274 buffer = bytearray(latin(' ' * 40))
1275 self.assertEqual(conn.send_bytes(longmsg), None)
1276 try:
1277 res = conn.recv_bytes_into(buffer)
1278 except multiprocessing.BufferTooShort, e:
1279 self.assertEqual(e.args, (longmsg,))
1280 else:
1281 self.fail('expected BufferTooShort, got %s' % res)
1282
1283 poll = TimingWrapper(conn.poll)
1284
1285 self.assertEqual(poll(), False)
1286 self.assertTimingAlmostEqual(poll.elapsed, 0)
1287
1288 self.assertEqual(poll(TIMEOUT1), False)
1289 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1290
1291 conn.send(None)
1292
1293 self.assertEqual(poll(TIMEOUT1), True)
1294 self.assertTimingAlmostEqual(poll.elapsed, 0)
1295
1296 self.assertEqual(conn.recv(), None)
1297
1298 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1299 conn.send_bytes(really_big_msg)
1300 self.assertEqual(conn.recv_bytes(), really_big_msg)
1301
1302 conn.send_bytes(SENTINEL) # tell child to quit
1303 child_conn.close()
1304
1305 if self.TYPE == 'processes':
1306 self.assertEqual(conn.readable, True)
1307 self.assertEqual(conn.writable, True)
1308 self.assertRaises(EOFError, conn.recv)
1309 self.assertRaises(EOFError, conn.recv_bytes)
1310
1311 p.join()
1312
1313 def test_duplex_false(self):
1314 reader, writer = self.Pipe(duplex=False)
1315 self.assertEqual(writer.send(1), None)
1316 self.assertEqual(reader.recv(), 1)
1317 if self.TYPE == 'processes':
1318 self.assertEqual(reader.readable, True)
1319 self.assertEqual(reader.writable, False)
1320 self.assertEqual(writer.readable, False)
1321 self.assertEqual(writer.writable, True)
1322 self.assertRaises(IOError, reader.send, 2)
1323 self.assertRaises(IOError, writer.recv)
1324 self.assertRaises(IOError, writer.poll)
1325
1326 def test_spawn_close(self):
1327 # We test that a pipe connection can be closed by parent
1328 # process immediately after child is spawned. On Windows this
1329 # would have sometimes failed on old versions because
1330 # child_conn would be closed before the child got a chance to
1331 # duplicate it.
1332 conn, child_conn = self.Pipe()
1333
1334 p = self.Process(target=self._echo, args=(child_conn,))
1335 p.start()
1336 child_conn.close() # this might complete before child initializes
1337
1338 msg = latin('hello')
1339 conn.send_bytes(msg)
1340 self.assertEqual(conn.recv_bytes(), msg)
1341
1342 conn.send_bytes(SENTINEL)
1343 conn.close()
1344 p.join()
1345
1346 def test_sendbytes(self):
1347 if self.TYPE != 'processes':
1348 return
1349
1350 msg = latin('abcdefghijklmnopqrstuvwxyz')
1351 a, b = self.Pipe()
1352
1353 a.send_bytes(msg)
1354 self.assertEqual(b.recv_bytes(), msg)
1355
1356 a.send_bytes(msg, 5)
1357 self.assertEqual(b.recv_bytes(), msg[5:])
1358
1359 a.send_bytes(msg, 7, 8)
1360 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1361
1362 a.send_bytes(msg, 26)
1363 self.assertEqual(b.recv_bytes(), latin(''))
1364
1365 a.send_bytes(msg, 26, 0)
1366 self.assertEqual(b.recv_bytes(), latin(''))
1367
1368 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1369
1370 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1371
1372 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1373
1374 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1375
1376 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1377
Benjamin Petersondfd79492008-06-13 19:13:39 +00001378class _TestListenerClient(BaseTestCase):
1379
1380 ALLOWED_TYPES = ('processes', 'threads')
1381
1382 def _test(self, address):
1383 conn = self.connection.Client(address)
1384 conn.send('hello')
1385 conn.close()
1386
1387 def test_listener_client(self):
1388 for family in self.connection.families:
1389 l = self.connection.Listener(family=family)
1390 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001391 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001392 p.start()
1393 conn = l.accept()
1394 self.assertEqual(conn.recv(), 'hello')
1395 p.join()
1396 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001397#
1398# Test of sending connection and socket objects between processes
1399#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001400"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001401class _TestPicklingConnections(BaseTestCase):
1402
1403 ALLOWED_TYPES = ('processes',)
1404
1405 def _listener(self, conn, families):
1406 for fam in families:
1407 l = self.connection.Listener(family=fam)
1408 conn.send(l.address)
1409 new_conn = l.accept()
1410 conn.send(new_conn)
1411
1412 if self.TYPE == 'processes':
1413 l = socket.socket()
1414 l.bind(('localhost', 0))
1415 conn.send(l.getsockname())
1416 l.listen(1)
1417 new_conn, addr = l.accept()
1418 conn.send(new_conn)
1419
1420 conn.recv()
1421
1422 def _remote(self, conn):
1423 for (address, msg) in iter(conn.recv, None):
1424 client = self.connection.Client(address)
1425 client.send(msg.upper())
1426 client.close()
1427
1428 if self.TYPE == 'processes':
1429 address, msg = conn.recv()
1430 client = socket.socket()
1431 client.connect(address)
1432 client.sendall(msg.upper())
1433 client.close()
1434
1435 conn.close()
1436
1437 def test_pickling(self):
1438 try:
1439 multiprocessing.allow_connection_pickling()
1440 except ImportError:
1441 return
1442
1443 families = self.connection.families
1444
1445 lconn, lconn0 = self.Pipe()
1446 lp = self.Process(target=self._listener, args=(lconn0, families))
1447 lp.start()
1448 lconn0.close()
1449
1450 rconn, rconn0 = self.Pipe()
1451 rp = self.Process(target=self._remote, args=(rconn0,))
1452 rp.start()
1453 rconn0.close()
1454
1455 for fam in families:
1456 msg = ('This connection uses family %s' % fam).encode('ascii')
1457 address = lconn.recv()
1458 rconn.send((address, msg))
1459 new_conn = lconn.recv()
1460 self.assertEqual(new_conn.recv(), msg.upper())
1461
1462 rconn.send(None)
1463
1464 if self.TYPE == 'processes':
1465 msg = latin('This connection uses a normal socket')
1466 address = lconn.recv()
1467 rconn.send((address, msg))
1468 if hasattr(socket, 'fromfd'):
1469 new_conn = lconn.recv()
1470 self.assertEqual(new_conn.recv(100), msg.upper())
1471 else:
1472 # XXX On Windows with Py2.6 need to backport fromfd()
1473 discard = lconn.recv_bytes()
1474
1475 lconn.send(None)
1476
1477 rconn.close()
1478 lconn.close()
1479
1480 lp.join()
1481 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001482"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001483#
1484#
1485#
1486
1487class _TestHeap(BaseTestCase):
1488
1489 ALLOWED_TYPES = ('processes',)
1490
1491 def test_heap(self):
1492 iterations = 5000
1493 maxblocks = 50
1494 blocks = []
1495
1496 # create and destroy lots of blocks of different sizes
1497 for i in xrange(iterations):
1498 size = int(random.lognormvariate(0, 1) * 1000)
1499 b = multiprocessing.heap.BufferWrapper(size)
1500 blocks.append(b)
1501 if len(blocks) > maxblocks:
1502 i = random.randrange(maxblocks)
1503 del blocks[i]
1504
1505 # get the heap object
1506 heap = multiprocessing.heap.BufferWrapper._heap
1507
1508 # verify the state of the heap
1509 all = []
1510 occupied = 0
1511 for L in heap._len_to_seq.values():
1512 for arena, start, stop in L:
1513 all.append((heap._arenas.index(arena), start, stop,
1514 stop-start, 'free'))
1515 for arena, start, stop in heap._allocated_blocks:
1516 all.append((heap._arenas.index(arena), start, stop,
1517 stop-start, 'occupied'))
1518 occupied += (stop-start)
1519
1520 all.sort()
1521
1522 for i in range(len(all)-1):
1523 (arena, start, stop) = all[i][:3]
1524 (narena, nstart, nstop) = all[i+1][:3]
1525 self.assertTrue((arena != narena and nstart == 0) or
1526 (stop == nstart))
1527
1528#
1529#
1530#
1531
1532try:
1533 from ctypes import Structure, Value, copy, c_int, c_double
1534except ImportError:
1535 Structure = object
1536 c_int = c_double = None
1537
1538class _Foo(Structure):
1539 _fields_ = [
1540 ('x', c_int),
1541 ('y', c_double)
1542 ]
1543
1544class _TestSharedCTypes(BaseTestCase):
1545
1546 ALLOWED_TYPES = ('processes',)
1547
1548 def _double(self, x, y, foo, arr, string):
1549 x.value *= 2
1550 y.value *= 2
1551 foo.x *= 2
1552 foo.y *= 2
1553 string.value *= 2
1554 for i in range(len(arr)):
1555 arr[i] *= 2
1556
1557 def test_sharedctypes(self, lock=False):
1558 if c_int is None:
1559 return
1560
1561 x = Value('i', 7, lock=lock)
1562 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1563 foo = Value(_Foo, 3, 2, lock=lock)
1564 arr = Array('d', range(10), lock=lock)
1565 string = Array('c', 20, lock=lock)
1566 string.value = 'hello'
1567
1568 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1569 p.start()
1570 p.join()
1571
1572 self.assertEqual(x.value, 14)
1573 self.assertAlmostEqual(y.value, 2.0/3.0)
1574 self.assertEqual(foo.x, 6)
1575 self.assertAlmostEqual(foo.y, 4.0)
1576 for i in range(10):
1577 self.assertAlmostEqual(arr[i], i*2)
1578 self.assertEqual(string.value, latin('hellohello'))
1579
1580 def test_synchronize(self):
1581 self.test_sharedctypes(lock=True)
1582
1583 def test_copy(self):
1584 if c_int is None:
1585 return
1586
1587 foo = _Foo(2, 5.0)
1588 bar = copy(foo)
1589 foo.x = 0
1590 foo.y = 0
1591 self.assertEqual(bar.x, 2)
1592 self.assertAlmostEqual(bar.y, 5.0)
1593
1594#
1595#
1596#
1597
1598class _TestFinalize(BaseTestCase):
1599
1600 ALLOWED_TYPES = ('processes',)
1601
1602 def _test_finalize(self, conn):
1603 class Foo(object):
1604 pass
1605
1606 a = Foo()
1607 util.Finalize(a, conn.send, args=('a',))
1608 del a # triggers callback for a
1609
1610 b = Foo()
1611 close_b = util.Finalize(b, conn.send, args=('b',))
1612 close_b() # triggers callback for b
1613 close_b() # does nothing because callback has already been called
1614 del b # does nothing because callback has already been called
1615
1616 c = Foo()
1617 util.Finalize(c, conn.send, args=('c',))
1618
1619 d10 = Foo()
1620 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1621
1622 d01 = Foo()
1623 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1624 d02 = Foo()
1625 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1626 d03 = Foo()
1627 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1628
1629 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1630
1631 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1632
1633 # call mutliprocessing's cleanup function then exit process without
1634 # garbage collecting locals
1635 util._exit_function()
1636 conn.close()
1637 os._exit(0)
1638
1639 def test_finalize(self):
1640 conn, child_conn = self.Pipe()
1641
1642 p = self.Process(target=self._test_finalize, args=(child_conn,))
1643 p.start()
1644 p.join()
1645
1646 result = [obj for obj in iter(conn.recv, 'STOP')]
1647 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1648
1649#
1650# Test that from ... import * works for each module
1651#
1652
1653class _TestImportStar(BaseTestCase):
1654
1655 ALLOWED_TYPES = ('processes',)
1656
1657 def test_import(self):
1658 modules = (
1659 'multiprocessing', 'multiprocessing.connection',
1660 'multiprocessing.heap', 'multiprocessing.managers',
1661 'multiprocessing.pool', 'multiprocessing.process',
1662 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1663 'multiprocessing.synchronize', 'multiprocessing.util'
1664 )
1665
1666 for name in modules:
1667 __import__(name)
1668 mod = sys.modules[name]
1669
1670 for attr in getattr(mod, '__all__', ()):
1671 self.assertTrue(
1672 hasattr(mod, attr),
1673 '%r does not have attribute %r' % (mod, attr)
1674 )
1675
1676#
1677# Quick test that logging works -- does not test logging output
1678#
1679
1680class _TestLogging(BaseTestCase):
1681
1682 ALLOWED_TYPES = ('processes',)
1683
1684 def test_enable_logging(self):
1685 logger = multiprocessing.get_logger()
1686 logger.setLevel(util.SUBWARNING)
1687 self.assertTrue(logger is not None)
1688 logger.debug('this will not be printed')
1689 logger.info('nor will this')
1690 logger.setLevel(LOG_LEVEL)
1691
1692 def _test_level(self, conn):
1693 logger = multiprocessing.get_logger()
1694 conn.send(logger.getEffectiveLevel())
1695
1696 def test_level(self):
1697 LEVEL1 = 32
1698 LEVEL2 = 37
1699
1700 logger = multiprocessing.get_logger()
1701 root_logger = logging.getLogger()
1702 root_level = root_logger.level
1703
1704 reader, writer = multiprocessing.Pipe(duplex=False)
1705
1706 logger.setLevel(LEVEL1)
1707 self.Process(target=self._test_level, args=(writer,)).start()
1708 self.assertEqual(LEVEL1, reader.recv())
1709
1710 logger.setLevel(logging.NOTSET)
1711 root_logger.setLevel(LEVEL2)
1712 self.Process(target=self._test_level, args=(writer,)).start()
1713 self.assertEqual(LEVEL2, reader.recv())
1714
1715 root_logger.setLevel(root_level)
1716 logger.setLevel(level=LOG_LEVEL)
1717
1718#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001719# Test to verify handle verification, see issue 3321
1720#
1721
1722class TestInvalidHandle(unittest.TestCase):
1723
1724 def test_invalid_handles(self):
1725 if WIN32:
1726 return
1727 conn = _multiprocessing.Connection(44977608)
1728 self.assertRaises(IOError, conn.poll)
1729 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1730#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001731# Functions used to create test cases from the base ones in this module
1732#
1733
1734def get_attributes(Source, names):
1735 d = {}
1736 for name in names:
1737 obj = getattr(Source, name)
1738 if type(obj) == type(get_attributes):
1739 obj = staticmethod(obj)
1740 d[name] = obj
1741 return d
1742
1743def create_test_cases(Mixin, type):
1744 result = {}
1745 glob = globals()
1746 Type = type[0].upper() + type[1:]
1747
1748 for name in glob.keys():
1749 if name.startswith('_Test'):
1750 base = glob[name]
1751 if type in base.ALLOWED_TYPES:
1752 newname = 'With' + Type + name[1:]
1753 class Temp(base, unittest.TestCase, Mixin):
1754 pass
1755 result[newname] = Temp
1756 Temp.__name__ = newname
1757 Temp.__module__ = Mixin.__module__
1758 return result
1759
1760#
1761# Create test cases
1762#
1763
1764class ProcessesMixin(object):
1765 TYPE = 'processes'
1766 Process = multiprocessing.Process
1767 locals().update(get_attributes(multiprocessing, (
1768 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1769 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1770 'RawArray', 'current_process', 'active_children', 'Pipe',
1771 'connection', 'JoinableQueue'
1772 )))
1773
1774testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1775globals().update(testcases_processes)
1776
1777
1778class ManagerMixin(object):
1779 TYPE = 'manager'
1780 Process = multiprocessing.Process
1781 manager = object.__new__(multiprocessing.managers.SyncManager)
1782 locals().update(get_attributes(manager, (
1783 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1784 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1785 'Namespace', 'JoinableQueue'
1786 )))
1787
1788testcases_manager = create_test_cases(ManagerMixin, type='manager')
1789globals().update(testcases_manager)
1790
1791
1792class ThreadsMixin(object):
1793 TYPE = 'threads'
1794 Process = multiprocessing.dummy.Process
1795 locals().update(get_attributes(multiprocessing.dummy, (
1796 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1797 'Condition', 'Event', 'Value', 'Array', 'current_process',
1798 'active_children', 'Pipe', 'connection', 'dict', 'list',
1799 'Namespace', 'JoinableQueue'
1800 )))
1801
1802testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1803globals().update(testcases_threads)
1804
Neal Norwitz0c519b32008-08-25 01:50:24 +00001805class OtherTest(unittest.TestCase):
1806 # TODO: add more tests for deliver/answer challenge.
1807 def test_deliver_challenge_auth_failure(self):
1808 class _FakeConnection(object):
1809 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001810 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001811 def send_bytes(self, data):
1812 pass
1813 self.assertRaises(multiprocessing.AuthenticationError,
1814 multiprocessing.connection.deliver_challenge,
1815 _FakeConnection(), b'abc')
1816
1817 def test_answer_challenge_auth_failure(self):
1818 class _FakeConnection(object):
1819 def __init__(self):
1820 self.count = 0
1821 def recv_bytes(self, size):
1822 self.count += 1
1823 if self.count == 1:
1824 return multiprocessing.connection.CHALLENGE
1825 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001826 return b'something bogus'
1827 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001828 def send_bytes(self, data):
1829 pass
1830 self.assertRaises(multiprocessing.AuthenticationError,
1831 multiprocessing.connection.answer_challenge,
1832 _FakeConnection(), b'abc')
1833
Jesse Noller7152f6d2009-04-02 05:17:26 +00001834#
1835# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1836#
1837
1838def initializer(ns):
1839 ns.test += 1
1840
1841class TestInitializers(unittest.TestCase):
1842 def setUp(self):
1843 self.mgr = multiprocessing.Manager()
1844 self.ns = self.mgr.Namespace()
1845 self.ns.test = 0
1846
1847 def tearDown(self):
1848 self.mgr.shutdown()
1849
1850 def test_manager_initializer(self):
1851 m = multiprocessing.managers.SyncManager()
1852 self.assertRaises(TypeError, m.start, 1)
1853 m.start(initializer, (self.ns,))
1854 self.assertEqual(self.ns.test, 1)
1855 m.shutdown()
1856
1857 def test_pool_initializer(self):
1858 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1859 p = multiprocessing.Pool(1, initializer, (self.ns,))
1860 p.close()
1861 p.join()
1862 self.assertEqual(self.ns.test, 1)
1863
1864testcases_other = [OtherTest, TestInvalidHandle, TestInitializers]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001865
Benjamin Petersondfd79492008-06-13 19:13:39 +00001866#
1867#
1868#
1869
1870def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001871 if sys.platform.startswith("linux"):
1872 try:
1873 lock = multiprocessing.RLock()
1874 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00001875 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001876
Benjamin Petersondfd79492008-06-13 19:13:39 +00001877 if run is None:
1878 from test.test_support import run_unittest as run
1879
1880 util.get_temp_dir() # creates temp directory for use by all processes
1881
1882 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1883
Jesse Noller146b7ab2008-07-02 16:44:09 +00001884 ProcessesMixin.pool = multiprocessing.Pool(4)
1885 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1886 ManagerMixin.manager.__init__()
1887 ManagerMixin.manager.start()
1888 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001889
1890 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001891 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1892 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00001893 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1894 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00001895 )
1896
1897 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1898 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1899 run(suite)
1900
Jesse Noller146b7ab2008-07-02 16:44:09 +00001901 ThreadsMixin.pool.terminate()
1902 ProcessesMixin.pool.terminate()
1903 ManagerMixin.pool.terminate()
1904 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001905
Jesse Noller146b7ab2008-07-02 16:44:09 +00001906 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001907
1908def main():
1909 test_main(unittest.TextTestRunner(verbosity=2).run)
1910
1911if __name__ == '__main__':
1912 main()