blob: b0746e9f3b5dd9e6da3937e8ce4e678c9e93bfd7 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
Jesse Noller37040cd2008-09-30 00:15:45 +000021
22# Work around broken sem_open implementations
23try:
24 import multiprocessing.synchronize
25except ImportError, e:
26 from test.test_support import TestSkipped
27 raise TestSkipped(e)
28
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
34import _multiprocessing
35
36from multiprocessing import util
37
38#
39#
40#
41
Benjamin Petersone79edf52008-07-13 18:34:58 +000042latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Benjamin Petersondfd79492008-06-13 19:13:39 +000044#
45# Constants
46#
47
48LOG_LEVEL = util.SUBWARNING
49#LOG_LEVEL = logging.WARNING
50
51DELTA = 0.1
52CHECK_TIMINGS = False # making true makes tests take a lot longer
53 # and can sometimes cause some non-serious
54 # failures because some calls block a bit
55 # longer than expected
56if CHECK_TIMINGS:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
58else:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
60
61HAVE_GETVALUE = not getattr(_multiprocessing,
62 'HAVE_BROKEN_SEM_GETVALUE', False)
63
64#
65# Creates a wrapper for a function which records the time it takes to finish
66#
67
68class TimingWrapper(object):
69
70 def __init__(self, func):
71 self.func = func
72 self.elapsed = None
73
74 def __call__(self, *args, **kwds):
75 t = time.time()
76 try:
77 return self.func(*args, **kwds)
78 finally:
79 self.elapsed = time.time() - t
80
81#
82# Base class for test cases
83#
84
85class BaseTestCase(object):
86
87 ALLOWED_TYPES = ('processes', 'manager', 'threads')
88
89 def assertTimingAlmostEqual(self, a, b):
90 if CHECK_TIMINGS:
91 self.assertAlmostEqual(a, b, 1)
92
93 def assertReturnsIfImplemented(self, value, func, *args):
94 try:
95 res = func(*args)
96 except NotImplementedError:
97 pass
98 else:
99 return self.assertEqual(value, res)
100
101#
102# Return the value of a semaphore
103#
104
105def get_value(self):
106 try:
107 return self.get_value()
108 except AttributeError:
109 try:
110 return self._Semaphore__value
111 except AttributeError:
112 try:
113 return self._value
114 except AttributeError:
115 raise NotImplementedError
116
117#
118# Testcases
119#
120
121class _TestProcess(BaseTestCase):
122
123 ALLOWED_TYPES = ('processes', 'threads')
124
125 def test_current(self):
126 if self.TYPE == 'threads':
127 return
128
129 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000130 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000131
132 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000133 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000134 self.assertTrue(isinstance(authkey, bytes))
135 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000136 self.assertEqual(current.ident, os.getpid())
137 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000138
139 def _test(self, q, *args, **kwds):
140 current = self.current_process()
141 q.put(args)
142 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000143 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000144 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000145 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000146 q.put(current.pid)
147
148 def test_process(self):
149 q = self.Queue(1)
150 e = self.Event()
151 args = (q, 1, 2)
152 kwargs = {'hello':23, 'bye':2.54}
153 name = 'SomeProcess'
154 p = self.Process(
155 target=self._test, args=args, kwargs=kwargs, name=name
156 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158 current = self.current_process()
159
160 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000161 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000163 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000164 self.assertTrue(p not in self.active_children())
165 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000166 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000167
168 p.start()
169
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 self.assertEquals(p.is_alive(), True)
172 self.assertTrue(p in self.active_children())
173
174 self.assertEquals(q.get(), args[1:])
175 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000176 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000177 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000178 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179 self.assertEquals(q.get(), p.pid)
180
181 p.join()
182
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000183 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184 self.assertEquals(p.is_alive(), False)
185 self.assertTrue(p not in self.active_children())
186
187 def _test_terminate(self):
188 time.sleep(1000)
189
190 def test_terminate(self):
191 if self.TYPE == 'threads':
192 return
193
194 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000195 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196 p.start()
197
198 self.assertEqual(p.is_alive(), True)
199 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201
202 p.terminate()
203
204 join = TimingWrapper(p.join)
205 self.assertEqual(join(), None)
206 self.assertTimingAlmostEqual(join.elapsed, 0.0)
207
208 self.assertEqual(p.is_alive(), False)
209 self.assertTrue(p not in self.active_children())
210
211 p.join()
212
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000213 # XXX sometimes get p.exitcode == 0 on Windows ...
214 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215
216 def test_cpu_count(self):
217 try:
218 cpus = multiprocessing.cpu_count()
219 except NotImplementedError:
220 cpus = 1
221 self.assertTrue(type(cpus) is int)
222 self.assertTrue(cpus >= 1)
223
224 def test_active_children(self):
225 self.assertEqual(type(self.active_children()), list)
226
227 p = self.Process(target=time.sleep, args=(DELTA,))
228 self.assertTrue(p not in self.active_children())
229
230 p.start()
231 self.assertTrue(p in self.active_children())
232
233 p.join()
234 self.assertTrue(p not in self.active_children())
235
236 def _test_recursion(self, wconn, id):
237 from multiprocessing import forking
238 wconn.send(id)
239 if len(id) < 2:
240 for i in range(2):
241 p = self.Process(
242 target=self._test_recursion, args=(wconn, id+[i])
243 )
244 p.start()
245 p.join()
246
247 def test_recursion(self):
248 rconn, wconn = self.Pipe(duplex=False)
249 self._test_recursion(wconn, [])
250
251 time.sleep(DELTA)
252 result = []
253 while rconn.poll():
254 result.append(rconn.recv())
255
256 expected = [
257 [],
258 [0],
259 [0, 0],
260 [0, 1],
261 [1],
262 [1, 0],
263 [1, 1]
264 ]
265 self.assertEqual(result, expected)
266
267#
268#
269#
270
271class _UpperCaser(multiprocessing.Process):
272
273 def __init__(self):
274 multiprocessing.Process.__init__(self)
275 self.child_conn, self.parent_conn = multiprocessing.Pipe()
276
277 def run(self):
278 self.parent_conn.close()
279 for s in iter(self.child_conn.recv, None):
280 self.child_conn.send(s.upper())
281 self.child_conn.close()
282
283 def submit(self, s):
284 assert type(s) is str
285 self.parent_conn.send(s)
286 return self.parent_conn.recv()
287
288 def stop(self):
289 self.parent_conn.send(None)
290 self.parent_conn.close()
291 self.child_conn.close()
292
293class _TestSubclassingProcess(BaseTestCase):
294
295 ALLOWED_TYPES = ('processes',)
296
297 def test_subclassing(self):
298 uppercaser = _UpperCaser()
299 uppercaser.start()
300 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
301 self.assertEqual(uppercaser.submit('world'), 'WORLD')
302 uppercaser.stop()
303 uppercaser.join()
304
305#
306#
307#
308
309def queue_empty(q):
310 if hasattr(q, 'empty'):
311 return q.empty()
312 else:
313 return q.qsize() == 0
314
315def queue_full(q, maxsize):
316 if hasattr(q, 'full'):
317 return q.full()
318 else:
319 return q.qsize() == maxsize
320
321
322class _TestQueue(BaseTestCase):
323
324
325 def _test_put(self, queue, child_can_start, parent_can_continue):
326 child_can_start.wait()
327 for i in range(6):
328 queue.get()
329 parent_can_continue.set()
330
331 def test_put(self):
332 MAXSIZE = 6
333 queue = self.Queue(maxsize=MAXSIZE)
334 child_can_start = self.Event()
335 parent_can_continue = self.Event()
336
337 proc = self.Process(
338 target=self._test_put,
339 args=(queue, child_can_start, parent_can_continue)
340 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000341 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000342 proc.start()
343
344 self.assertEqual(queue_empty(queue), True)
345 self.assertEqual(queue_full(queue, MAXSIZE), False)
346
347 queue.put(1)
348 queue.put(2, True)
349 queue.put(3, True, None)
350 queue.put(4, False)
351 queue.put(5, False, None)
352 queue.put_nowait(6)
353
354 # the values may be in buffer but not yet in pipe so sleep a bit
355 time.sleep(DELTA)
356
357 self.assertEqual(queue_empty(queue), False)
358 self.assertEqual(queue_full(queue, MAXSIZE), True)
359
360 put = TimingWrapper(queue.put)
361 put_nowait = TimingWrapper(queue.put_nowait)
362
363 self.assertRaises(Queue.Full, put, 7, False)
364 self.assertTimingAlmostEqual(put.elapsed, 0)
365
366 self.assertRaises(Queue.Full, put, 7, False, None)
367 self.assertTimingAlmostEqual(put.elapsed, 0)
368
369 self.assertRaises(Queue.Full, put_nowait, 7)
370 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
371
372 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
373 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
374
375 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
376 self.assertTimingAlmostEqual(put.elapsed, 0)
377
378 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
379 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
380
381 child_can_start.set()
382 parent_can_continue.wait()
383
384 self.assertEqual(queue_empty(queue), True)
385 self.assertEqual(queue_full(queue, MAXSIZE), False)
386
387 proc.join()
388
389 def _test_get(self, queue, child_can_start, parent_can_continue):
390 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000391 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000392 queue.put(2)
393 queue.put(3)
394 queue.put(4)
395 queue.put(5)
396 parent_can_continue.set()
397
398 def test_get(self):
399 queue = self.Queue()
400 child_can_start = self.Event()
401 parent_can_continue = self.Event()
402
403 proc = self.Process(
404 target=self._test_get,
405 args=(queue, child_can_start, parent_can_continue)
406 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000407 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000408 proc.start()
409
410 self.assertEqual(queue_empty(queue), True)
411
412 child_can_start.set()
413 parent_can_continue.wait()
414
415 time.sleep(DELTA)
416 self.assertEqual(queue_empty(queue), False)
417
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000418 # Hangs unexpectedly, remove for now
419 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000420 self.assertEqual(queue.get(True, None), 2)
421 self.assertEqual(queue.get(True), 3)
422 self.assertEqual(queue.get(timeout=1), 4)
423 self.assertEqual(queue.get_nowait(), 5)
424
425 self.assertEqual(queue_empty(queue), True)
426
427 get = TimingWrapper(queue.get)
428 get_nowait = TimingWrapper(queue.get_nowait)
429
430 self.assertRaises(Queue.Empty, get, False)
431 self.assertTimingAlmostEqual(get.elapsed, 0)
432
433 self.assertRaises(Queue.Empty, get, False, None)
434 self.assertTimingAlmostEqual(get.elapsed, 0)
435
436 self.assertRaises(Queue.Empty, get_nowait)
437 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
438
439 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
440 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
441
442 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
443 self.assertTimingAlmostEqual(get.elapsed, 0)
444
445 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
446 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
447
448 proc.join()
449
450 def _test_fork(self, queue):
451 for i in range(10, 20):
452 queue.put(i)
453 # note that at this point the items may only be buffered, so the
454 # process cannot shutdown until the feeder thread has finished
455 # pushing items onto the pipe.
456
457 def test_fork(self):
458 # Old versions of Queue would fail to create a new feeder
459 # thread for a forked process if the original process had its
460 # own feeder thread. This test checks that this no longer
461 # happens.
462
463 queue = self.Queue()
464
465 # put items on queue so that main process starts a feeder thread
466 for i in range(10):
467 queue.put(i)
468
469 # wait to make sure thread starts before we fork a new process
470 time.sleep(DELTA)
471
472 # fork process
473 p = self.Process(target=self._test_fork, args=(queue,))
474 p.start()
475
476 # check that all expected items are in the queue
477 for i in range(20):
478 self.assertEqual(queue.get(), i)
479 self.assertRaises(Queue.Empty, queue.get, False)
480
481 p.join()
482
483 def test_qsize(self):
484 q = self.Queue()
485 try:
486 self.assertEqual(q.qsize(), 0)
487 except NotImplementedError:
488 return
489 q.put(1)
490 self.assertEqual(q.qsize(), 1)
491 q.put(5)
492 self.assertEqual(q.qsize(), 2)
493 q.get()
494 self.assertEqual(q.qsize(), 1)
495 q.get()
496 self.assertEqual(q.qsize(), 0)
497
498 def _test_task_done(self, q):
499 for obj in iter(q.get, None):
500 time.sleep(DELTA)
501 q.task_done()
502
503 def test_task_done(self):
504 queue = self.JoinableQueue()
505
506 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
507 return
508
509 workers = [self.Process(target=self._test_task_done, args=(queue,))
510 for i in xrange(4)]
511
512 for p in workers:
513 p.start()
514
515 for i in xrange(10):
516 queue.put(i)
517
518 queue.join()
519
520 for p in workers:
521 queue.put(None)
522
523 for p in workers:
524 p.join()
525
526#
527#
528#
529
530class _TestLock(BaseTestCase):
531
532 def test_lock(self):
533 lock = self.Lock()
534 self.assertEqual(lock.acquire(), True)
535 self.assertEqual(lock.acquire(False), False)
536 self.assertEqual(lock.release(), None)
537 self.assertRaises((ValueError, threading.ThreadError), lock.release)
538
539 def test_rlock(self):
540 lock = self.RLock()
541 self.assertEqual(lock.acquire(), True)
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.release(), None)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertRaises((AssertionError, RuntimeError), lock.release)
548
549
550class _TestSemaphore(BaseTestCase):
551
552 def _test_semaphore(self, sem):
553 self.assertReturnsIfImplemented(2, get_value, sem)
554 self.assertEqual(sem.acquire(), True)
555 self.assertReturnsIfImplemented(1, get_value, sem)
556 self.assertEqual(sem.acquire(), True)
557 self.assertReturnsIfImplemented(0, get_value, sem)
558 self.assertEqual(sem.acquire(False), False)
559 self.assertReturnsIfImplemented(0, get_value, sem)
560 self.assertEqual(sem.release(), None)
561 self.assertReturnsIfImplemented(1, get_value, sem)
562 self.assertEqual(sem.release(), None)
563 self.assertReturnsIfImplemented(2, get_value, sem)
564
565 def test_semaphore(self):
566 sem = self.Semaphore(2)
567 self._test_semaphore(sem)
568 self.assertEqual(sem.release(), None)
569 self.assertReturnsIfImplemented(3, get_value, sem)
570 self.assertEqual(sem.release(), None)
571 self.assertReturnsIfImplemented(4, get_value, sem)
572
573 def test_bounded_semaphore(self):
574 sem = self.BoundedSemaphore(2)
575 self._test_semaphore(sem)
576 # Currently fails on OS/X
577 #if HAVE_GETVALUE:
578 # self.assertRaises(ValueError, sem.release)
579 # self.assertReturnsIfImplemented(2, get_value, sem)
580
581 def test_timeout(self):
582 if self.TYPE != 'processes':
583 return
584
585 sem = self.Semaphore(0)
586 acquire = TimingWrapper(sem.acquire)
587
588 self.assertEqual(acquire(False), False)
589 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
590
591 self.assertEqual(acquire(False, None), False)
592 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
593
594 self.assertEqual(acquire(False, TIMEOUT1), False)
595 self.assertTimingAlmostEqual(acquire.elapsed, 0)
596
597 self.assertEqual(acquire(True, TIMEOUT2), False)
598 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
599
600 self.assertEqual(acquire(timeout=TIMEOUT3), False)
601 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
602
603
604class _TestCondition(BaseTestCase):
605
606 def f(self, cond, sleeping, woken, timeout=None):
607 cond.acquire()
608 sleeping.release()
609 cond.wait(timeout)
610 woken.release()
611 cond.release()
612
613 def check_invariant(self, cond):
614 # this is only supposed to succeed when there are no sleepers
615 if self.TYPE == 'processes':
616 try:
617 sleepers = (cond._sleeping_count.get_value() -
618 cond._woken_count.get_value())
619 self.assertEqual(sleepers, 0)
620 self.assertEqual(cond._wait_semaphore.get_value(), 0)
621 except NotImplementedError:
622 pass
623
624 def test_notify(self):
625 cond = self.Condition()
626 sleeping = self.Semaphore(0)
627 woken = self.Semaphore(0)
628
629 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000630 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000631 p.start()
632
633 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000634 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000635 p.start()
636
637 # wait for both children to start sleeping
638 sleeping.acquire()
639 sleeping.acquire()
640
641 # check no process/thread has woken up
642 time.sleep(DELTA)
643 self.assertReturnsIfImplemented(0, get_value, woken)
644
645 # wake up one process/thread
646 cond.acquire()
647 cond.notify()
648 cond.release()
649
650 # check one process/thread has woken up
651 time.sleep(DELTA)
652 self.assertReturnsIfImplemented(1, get_value, woken)
653
654 # wake up another
655 cond.acquire()
656 cond.notify()
657 cond.release()
658
659 # check other has woken up
660 time.sleep(DELTA)
661 self.assertReturnsIfImplemented(2, get_value, woken)
662
663 # check state is not mucked up
664 self.check_invariant(cond)
665 p.join()
666
667 def test_notify_all(self):
668 cond = self.Condition()
669 sleeping = self.Semaphore(0)
670 woken = self.Semaphore(0)
671
672 # start some threads/processes which will timeout
673 for i in range(3):
674 p = self.Process(target=self.f,
675 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000676 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000677 p.start()
678
679 t = threading.Thread(target=self.f,
680 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000681 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000682 t.start()
683
684 # wait for them all to sleep
685 for i in xrange(6):
686 sleeping.acquire()
687
688 # check they have all timed out
689 for i in xrange(6):
690 woken.acquire()
691 self.assertReturnsIfImplemented(0, get_value, woken)
692
693 # check state is not mucked up
694 self.check_invariant(cond)
695
696 # start some more threads/processes
697 for i in range(3):
698 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000699 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000700 p.start()
701
702 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000703 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704 t.start()
705
706 # wait for them to all sleep
707 for i in xrange(6):
708 sleeping.acquire()
709
710 # check no process/thread has woken up
711 time.sleep(DELTA)
712 self.assertReturnsIfImplemented(0, get_value, woken)
713
714 # wake them all up
715 cond.acquire()
716 cond.notify_all()
717 cond.release()
718
719 # check they have all woken
720 time.sleep(DELTA)
721 self.assertReturnsIfImplemented(6, get_value, woken)
722
723 # check state is not mucked up
724 self.check_invariant(cond)
725
726 def test_timeout(self):
727 cond = self.Condition()
728 wait = TimingWrapper(cond.wait)
729 cond.acquire()
730 res = wait(TIMEOUT1)
731 cond.release()
732 self.assertEqual(res, None)
733 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
734
735
736class _TestEvent(BaseTestCase):
737
738 def _test_event(self, event):
739 time.sleep(TIMEOUT2)
740 event.set()
741
742 def test_event(self):
743 event = self.Event()
744 wait = TimingWrapper(event.wait)
745
746 # Removed temporaily, due to API shear, this does not
747 # work with threading._Event objects. is_set == isSet
748 #self.assertEqual(event.is_set(), False)
749
750 self.assertEqual(wait(0.0), None)
751 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
752 self.assertEqual(wait(TIMEOUT1), None)
753 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
754
755 event.set()
756
757 # See note above on the API differences
758 # self.assertEqual(event.is_set(), True)
759 self.assertEqual(wait(), None)
760 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
761 self.assertEqual(wait(TIMEOUT1), None)
762 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
763 # self.assertEqual(event.is_set(), True)
764
765 event.clear()
766
767 #self.assertEqual(event.is_set(), False)
768
769 self.Process(target=self._test_event, args=(event,)).start()
770 self.assertEqual(wait(), None)
771
772#
773#
774#
775
776class _TestValue(BaseTestCase):
777
778 codes_values = [
779 ('i', 4343, 24234),
780 ('d', 3.625, -4.25),
781 ('h', -232, 234),
782 ('c', latin('x'), latin('y'))
783 ]
784
785 def _test(self, values):
786 for sv, cv in zip(values, self.codes_values):
787 sv.value = cv[2]
788
789
790 def test_value(self, raw=False):
791 if self.TYPE != 'processes':
792 return
793
794 if raw:
795 values = [self.RawValue(code, value)
796 for code, value, _ in self.codes_values]
797 else:
798 values = [self.Value(code, value)
799 for code, value, _ in self.codes_values]
800
801 for sv, cv in zip(values, self.codes_values):
802 self.assertEqual(sv.value, cv[1])
803
804 proc = self.Process(target=self._test, args=(values,))
805 proc.start()
806 proc.join()
807
808 for sv, cv in zip(values, self.codes_values):
809 self.assertEqual(sv.value, cv[2])
810
811 def test_rawvalue(self):
812 self.test_value(raw=True)
813
814 def test_getobj_getlock(self):
815 if self.TYPE != 'processes':
816 return
817
818 val1 = self.Value('i', 5)
819 lock1 = val1.get_lock()
820 obj1 = val1.get_obj()
821
822 val2 = self.Value('i', 5, lock=None)
823 lock2 = val2.get_lock()
824 obj2 = val2.get_obj()
825
826 lock = self.Lock()
827 val3 = self.Value('i', 5, lock=lock)
828 lock3 = val3.get_lock()
829 obj3 = val3.get_obj()
830 self.assertEqual(lock, lock3)
831
832 arr4 = self.RawValue('i', 5)
833 self.assertFalse(hasattr(arr4, 'get_lock'))
834 self.assertFalse(hasattr(arr4, 'get_obj'))
835
836
837class _TestArray(BaseTestCase):
838
839 def f(self, seq):
840 for i in range(1, len(seq)):
841 seq[i] += seq[i-1]
842
843 def test_array(self, raw=False):
844 if self.TYPE != 'processes':
845 return
846
847 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
848 if raw:
849 arr = self.RawArray('i', seq)
850 else:
851 arr = self.Array('i', seq)
852
853 self.assertEqual(len(arr), len(seq))
854 self.assertEqual(arr[3], seq[3])
855 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
856
857 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
858
859 self.assertEqual(list(arr[:]), seq)
860
861 self.f(seq)
862
863 p = self.Process(target=self.f, args=(arr,))
864 p.start()
865 p.join()
866
867 self.assertEqual(list(arr[:]), seq)
868
869 def test_rawarray(self):
870 self.test_array(raw=True)
871
872 def test_getobj_getlock_obj(self):
873 if self.TYPE != 'processes':
874 return
875
876 arr1 = self.Array('i', range(10))
877 lock1 = arr1.get_lock()
878 obj1 = arr1.get_obj()
879
880 arr2 = self.Array('i', range(10), lock=None)
881 lock2 = arr2.get_lock()
882 obj2 = arr2.get_obj()
883
884 lock = self.Lock()
885 arr3 = self.Array('i', range(10), lock=lock)
886 lock3 = arr3.get_lock()
887 obj3 = arr3.get_obj()
888 self.assertEqual(lock, lock3)
889
890 arr4 = self.RawArray('i', range(10))
891 self.assertFalse(hasattr(arr4, 'get_lock'))
892 self.assertFalse(hasattr(arr4, 'get_obj'))
893
894#
895#
896#
897
898class _TestContainers(BaseTestCase):
899
900 ALLOWED_TYPES = ('manager',)
901
902 def test_list(self):
903 a = self.list(range(10))
904 self.assertEqual(a[:], range(10))
905
906 b = self.list()
907 self.assertEqual(b[:], [])
908
909 b.extend(range(5))
910 self.assertEqual(b[:], range(5))
911
912 self.assertEqual(b[2], 2)
913 self.assertEqual(b[2:10], [2,3,4])
914
915 b *= 2
916 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
917
918 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
919
920 self.assertEqual(a[:], range(10))
921
922 d = [a, b]
923 e = self.list(d)
924 self.assertEqual(
925 e[:],
926 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
927 )
928
929 f = self.list([a])
930 a.append('hello')
931 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
932
933 def test_dict(self):
934 d = self.dict()
935 indices = range(65, 70)
936 for i in indices:
937 d[i] = chr(i)
938 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
939 self.assertEqual(sorted(d.keys()), indices)
940 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
941 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
942
943 def test_namespace(self):
944 n = self.Namespace()
945 n.name = 'Bob'
946 n.job = 'Builder'
947 n._hidden = 'hidden'
948 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
949 del n.job
950 self.assertEqual(str(n), "Namespace(name='Bob')")
951 self.assertTrue(hasattr(n, 'name'))
952 self.assertTrue(not hasattr(n, 'job'))
953
954#
955#
956#
957
958def sqr(x, wait=0.0):
959 time.sleep(wait)
960 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000961class _TestPool(BaseTestCase):
962
963 def test_apply(self):
964 papply = self.pool.apply
965 self.assertEqual(papply(sqr, (5,)), sqr(5))
966 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
967
968 def test_map(self):
969 pmap = self.pool.map
970 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
971 self.assertEqual(pmap(sqr, range(100), chunksize=20),
972 map(sqr, range(100)))
973
974 def test_async(self):
975 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
976 get = TimingWrapper(res.get)
977 self.assertEqual(get(), 49)
978 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
979
980 def test_async_timeout(self):
981 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
982 get = TimingWrapper(res.get)
983 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
984 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
985
986 def test_imap(self):
987 it = self.pool.imap(sqr, range(10))
988 self.assertEqual(list(it), map(sqr, range(10)))
989
990 it = self.pool.imap(sqr, range(10))
991 for i in range(10):
992 self.assertEqual(it.next(), i*i)
993 self.assertRaises(StopIteration, it.next)
994
995 it = self.pool.imap(sqr, range(1000), chunksize=100)
996 for i in range(1000):
997 self.assertEqual(it.next(), i*i)
998 self.assertRaises(StopIteration, it.next)
999
1000 def test_imap_unordered(self):
1001 it = self.pool.imap_unordered(sqr, range(1000))
1002 self.assertEqual(sorted(it), map(sqr, range(1000)))
1003
1004 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1005 self.assertEqual(sorted(it), map(sqr, range(1000)))
1006
1007 def test_make_pool(self):
1008 p = multiprocessing.Pool(3)
1009 self.assertEqual(3, len(p._pool))
1010 p.close()
1011 p.join()
1012
1013 def test_terminate(self):
1014 if self.TYPE == 'manager':
1015 # On Unix a forked process increfs each shared object to
1016 # which its parent process held a reference. If the
1017 # forked process gets terminated then there is likely to
1018 # be a reference leak. So to prevent
1019 # _TestZZZNumberOfObjects from failing we skip this test
1020 # when using a manager.
1021 return
1022
1023 result = self.pool.map_async(
1024 time.sleep, [0.1 for i in range(10000)], chunksize=1
1025 )
1026 self.pool.terminate()
1027 join = TimingWrapper(self.pool.join)
1028 join()
1029 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001030#
1031# Test that manager has expected number of shared objects left
1032#
1033
1034class _TestZZZNumberOfObjects(BaseTestCase):
1035 # Because test cases are sorted alphabetically, this one will get
1036 # run after all the other tests for the manager. It tests that
1037 # there have been no "reference leaks" for the manager's shared
1038 # objects. Note the comment in _TestPool.test_terminate().
1039 ALLOWED_TYPES = ('manager',)
1040
1041 def test_number_of_objects(self):
1042 EXPECTED_NUMBER = 1 # the pool object is still alive
1043 multiprocessing.active_children() # discard dead process objs
1044 gc.collect() # do garbage collection
1045 refs = self.manager._number_of_objects()
1046 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001047 print self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001048
1049 self.assertEqual(refs, EXPECTED_NUMBER)
1050
1051#
1052# Test of creating a customized manager class
1053#
1054
1055from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1056
1057class FooBar(object):
1058 def f(self):
1059 return 'f()'
1060 def g(self):
1061 raise ValueError
1062 def _h(self):
1063 return '_h()'
1064
1065def baz():
1066 for i in xrange(10):
1067 yield i*i
1068
1069class IteratorProxy(BaseProxy):
1070 _exposed_ = ('next', '__next__')
1071 def __iter__(self):
1072 return self
1073 def next(self):
1074 return self._callmethod('next')
1075 def __next__(self):
1076 return self._callmethod('__next__')
1077
1078class MyManager(BaseManager):
1079 pass
1080
1081MyManager.register('Foo', callable=FooBar)
1082MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1083MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1084
1085
1086class _TestMyManager(BaseTestCase):
1087
1088 ALLOWED_TYPES = ('manager',)
1089
1090 def test_mymanager(self):
1091 manager = MyManager()
1092 manager.start()
1093
1094 foo = manager.Foo()
1095 bar = manager.Bar()
1096 baz = manager.baz()
1097
1098 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1099 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1100
1101 self.assertEqual(foo_methods, ['f', 'g'])
1102 self.assertEqual(bar_methods, ['f', '_h'])
1103
1104 self.assertEqual(foo.f(), 'f()')
1105 self.assertRaises(ValueError, foo.g)
1106 self.assertEqual(foo._callmethod('f'), 'f()')
1107 self.assertRaises(RemoteError, foo._callmethod, '_h')
1108
1109 self.assertEqual(bar.f(), 'f()')
1110 self.assertEqual(bar._h(), '_h()')
1111 self.assertEqual(bar._callmethod('f'), 'f()')
1112 self.assertEqual(bar._callmethod('_h'), '_h()')
1113
1114 self.assertEqual(list(baz), [i*i for i in range(10)])
1115
1116 manager.shutdown()
1117
1118#
1119# Test of connecting to a remote server and using xmlrpclib for serialization
1120#
1121
1122_queue = Queue.Queue()
1123def get_queue():
1124 return _queue
1125
1126class QueueManager(BaseManager):
1127 '''manager class used by server process'''
1128QueueManager.register('get_queue', callable=get_queue)
1129
1130class QueueManager2(BaseManager):
1131 '''manager class which specifies the same interface as QueueManager'''
1132QueueManager2.register('get_queue')
1133
1134
1135SERIALIZER = 'xmlrpclib'
1136
1137class _TestRemoteManager(BaseTestCase):
1138
1139 ALLOWED_TYPES = ('manager',)
1140
1141 def _putter(self, address, authkey):
1142 manager = QueueManager2(
1143 address=address, authkey=authkey, serializer=SERIALIZER
1144 )
1145 manager.connect()
1146 queue = manager.get_queue()
1147 queue.put(('hello world', None, True, 2.25))
1148
1149 def test_remote(self):
1150 authkey = os.urandom(32)
1151
1152 manager = QueueManager(
1153 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1154 )
1155 manager.start()
1156
1157 p = self.Process(target=self._putter, args=(manager.address, authkey))
1158 p.start()
1159
1160 manager2 = QueueManager2(
1161 address=manager.address, authkey=authkey, serializer=SERIALIZER
1162 )
1163 manager2.connect()
1164 queue = manager2.get_queue()
1165
1166 # Note that xmlrpclib will deserialize object as a list not a tuple
1167 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1168
1169 # Because we are using xmlrpclib for serialization instead of
1170 # pickle this will cause a serialization error.
1171 self.assertRaises(Exception, queue.put, time.sleep)
1172
1173 # Make queue finalizer run before the server is stopped
1174 del queue
1175 manager.shutdown()
1176
1177#
1178#
1179#
1180
1181SENTINEL = latin('')
1182
1183class _TestConnection(BaseTestCase):
1184
1185 ALLOWED_TYPES = ('processes', 'threads')
1186
1187 def _echo(self, conn):
1188 for msg in iter(conn.recv_bytes, SENTINEL):
1189 conn.send_bytes(msg)
1190 conn.close()
1191
1192 def test_connection(self):
1193 conn, child_conn = self.Pipe()
1194
1195 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001196 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001197 p.start()
1198
1199 seq = [1, 2.25, None]
1200 msg = latin('hello world')
1201 longmsg = msg * 10
1202 arr = array.array('i', range(4))
1203
1204 if self.TYPE == 'processes':
1205 self.assertEqual(type(conn.fileno()), int)
1206
1207 self.assertEqual(conn.send(seq), None)
1208 self.assertEqual(conn.recv(), seq)
1209
1210 self.assertEqual(conn.send_bytes(msg), None)
1211 self.assertEqual(conn.recv_bytes(), msg)
1212
1213 if self.TYPE == 'processes':
1214 buffer = array.array('i', [0]*10)
1215 expected = list(arr) + [0] * (10 - len(arr))
1216 self.assertEqual(conn.send_bytes(arr), None)
1217 self.assertEqual(conn.recv_bytes_into(buffer),
1218 len(arr) * buffer.itemsize)
1219 self.assertEqual(list(buffer), expected)
1220
1221 buffer = array.array('i', [0]*10)
1222 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1223 self.assertEqual(conn.send_bytes(arr), None)
1224 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1225 len(arr) * buffer.itemsize)
1226 self.assertEqual(list(buffer), expected)
1227
1228 buffer = bytearray(latin(' ' * 40))
1229 self.assertEqual(conn.send_bytes(longmsg), None)
1230 try:
1231 res = conn.recv_bytes_into(buffer)
1232 except multiprocessing.BufferTooShort, e:
1233 self.assertEqual(e.args, (longmsg,))
1234 else:
1235 self.fail('expected BufferTooShort, got %s' % res)
1236
1237 poll = TimingWrapper(conn.poll)
1238
1239 self.assertEqual(poll(), False)
1240 self.assertTimingAlmostEqual(poll.elapsed, 0)
1241
1242 self.assertEqual(poll(TIMEOUT1), False)
1243 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1244
1245 conn.send(None)
1246
1247 self.assertEqual(poll(TIMEOUT1), True)
1248 self.assertTimingAlmostEqual(poll.elapsed, 0)
1249
1250 self.assertEqual(conn.recv(), None)
1251
1252 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1253 conn.send_bytes(really_big_msg)
1254 self.assertEqual(conn.recv_bytes(), really_big_msg)
1255
1256 conn.send_bytes(SENTINEL) # tell child to quit
1257 child_conn.close()
1258
1259 if self.TYPE == 'processes':
1260 self.assertEqual(conn.readable, True)
1261 self.assertEqual(conn.writable, True)
1262 self.assertRaises(EOFError, conn.recv)
1263 self.assertRaises(EOFError, conn.recv_bytes)
1264
1265 p.join()
1266
1267 def test_duplex_false(self):
1268 reader, writer = self.Pipe(duplex=False)
1269 self.assertEqual(writer.send(1), None)
1270 self.assertEqual(reader.recv(), 1)
1271 if self.TYPE == 'processes':
1272 self.assertEqual(reader.readable, True)
1273 self.assertEqual(reader.writable, False)
1274 self.assertEqual(writer.readable, False)
1275 self.assertEqual(writer.writable, True)
1276 self.assertRaises(IOError, reader.send, 2)
1277 self.assertRaises(IOError, writer.recv)
1278 self.assertRaises(IOError, writer.poll)
1279
1280 def test_spawn_close(self):
1281 # We test that a pipe connection can be closed by parent
1282 # process immediately after child is spawned. On Windows this
1283 # would have sometimes failed on old versions because
1284 # child_conn would be closed before the child got a chance to
1285 # duplicate it.
1286 conn, child_conn = self.Pipe()
1287
1288 p = self.Process(target=self._echo, args=(child_conn,))
1289 p.start()
1290 child_conn.close() # this might complete before child initializes
1291
1292 msg = latin('hello')
1293 conn.send_bytes(msg)
1294 self.assertEqual(conn.recv_bytes(), msg)
1295
1296 conn.send_bytes(SENTINEL)
1297 conn.close()
1298 p.join()
1299
1300 def test_sendbytes(self):
1301 if self.TYPE != 'processes':
1302 return
1303
1304 msg = latin('abcdefghijklmnopqrstuvwxyz')
1305 a, b = self.Pipe()
1306
1307 a.send_bytes(msg)
1308 self.assertEqual(b.recv_bytes(), msg)
1309
1310 a.send_bytes(msg, 5)
1311 self.assertEqual(b.recv_bytes(), msg[5:])
1312
1313 a.send_bytes(msg, 7, 8)
1314 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1315
1316 a.send_bytes(msg, 26)
1317 self.assertEqual(b.recv_bytes(), latin(''))
1318
1319 a.send_bytes(msg, 26, 0)
1320 self.assertEqual(b.recv_bytes(), latin(''))
1321
1322 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1323
1324 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1325
1326 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1327
1328 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1329
1330 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1331
Benjamin Petersondfd79492008-06-13 19:13:39 +00001332class _TestListenerClient(BaseTestCase):
1333
1334 ALLOWED_TYPES = ('processes', 'threads')
1335
1336 def _test(self, address):
1337 conn = self.connection.Client(address)
1338 conn.send('hello')
1339 conn.close()
1340
1341 def test_listener_client(self):
1342 for family in self.connection.families:
1343 l = self.connection.Listener(family=family)
1344 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001345 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001346 p.start()
1347 conn = l.accept()
1348 self.assertEqual(conn.recv(), 'hello')
1349 p.join()
1350 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001351#
1352# Test of sending connection and socket objects between processes
1353#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001354"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001355class _TestPicklingConnections(BaseTestCase):
1356
1357 ALLOWED_TYPES = ('processes',)
1358
1359 def _listener(self, conn, families):
1360 for fam in families:
1361 l = self.connection.Listener(family=fam)
1362 conn.send(l.address)
1363 new_conn = l.accept()
1364 conn.send(new_conn)
1365
1366 if self.TYPE == 'processes':
1367 l = socket.socket()
1368 l.bind(('localhost', 0))
1369 conn.send(l.getsockname())
1370 l.listen(1)
1371 new_conn, addr = l.accept()
1372 conn.send(new_conn)
1373
1374 conn.recv()
1375
1376 def _remote(self, conn):
1377 for (address, msg) in iter(conn.recv, None):
1378 client = self.connection.Client(address)
1379 client.send(msg.upper())
1380 client.close()
1381
1382 if self.TYPE == 'processes':
1383 address, msg = conn.recv()
1384 client = socket.socket()
1385 client.connect(address)
1386 client.sendall(msg.upper())
1387 client.close()
1388
1389 conn.close()
1390
1391 def test_pickling(self):
1392 try:
1393 multiprocessing.allow_connection_pickling()
1394 except ImportError:
1395 return
1396
1397 families = self.connection.families
1398
1399 lconn, lconn0 = self.Pipe()
1400 lp = self.Process(target=self._listener, args=(lconn0, families))
1401 lp.start()
1402 lconn0.close()
1403
1404 rconn, rconn0 = self.Pipe()
1405 rp = self.Process(target=self._remote, args=(rconn0,))
1406 rp.start()
1407 rconn0.close()
1408
1409 for fam in families:
1410 msg = ('This connection uses family %s' % fam).encode('ascii')
1411 address = lconn.recv()
1412 rconn.send((address, msg))
1413 new_conn = lconn.recv()
1414 self.assertEqual(new_conn.recv(), msg.upper())
1415
1416 rconn.send(None)
1417
1418 if self.TYPE == 'processes':
1419 msg = latin('This connection uses a normal socket')
1420 address = lconn.recv()
1421 rconn.send((address, msg))
1422 if hasattr(socket, 'fromfd'):
1423 new_conn = lconn.recv()
1424 self.assertEqual(new_conn.recv(100), msg.upper())
1425 else:
1426 # XXX On Windows with Py2.6 need to backport fromfd()
1427 discard = lconn.recv_bytes()
1428
1429 lconn.send(None)
1430
1431 rconn.close()
1432 lconn.close()
1433
1434 lp.join()
1435 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001436"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001437#
1438#
1439#
1440
1441class _TestHeap(BaseTestCase):
1442
1443 ALLOWED_TYPES = ('processes',)
1444
1445 def test_heap(self):
1446 iterations = 5000
1447 maxblocks = 50
1448 blocks = []
1449
1450 # create and destroy lots of blocks of different sizes
1451 for i in xrange(iterations):
1452 size = int(random.lognormvariate(0, 1) * 1000)
1453 b = multiprocessing.heap.BufferWrapper(size)
1454 blocks.append(b)
1455 if len(blocks) > maxblocks:
1456 i = random.randrange(maxblocks)
1457 del blocks[i]
1458
1459 # get the heap object
1460 heap = multiprocessing.heap.BufferWrapper._heap
1461
1462 # verify the state of the heap
1463 all = []
1464 occupied = 0
1465 for L in heap._len_to_seq.values():
1466 for arena, start, stop in L:
1467 all.append((heap._arenas.index(arena), start, stop,
1468 stop-start, 'free'))
1469 for arena, start, stop in heap._allocated_blocks:
1470 all.append((heap._arenas.index(arena), start, stop,
1471 stop-start, 'occupied'))
1472 occupied += (stop-start)
1473
1474 all.sort()
1475
1476 for i in range(len(all)-1):
1477 (arena, start, stop) = all[i][:3]
1478 (narena, nstart, nstop) = all[i+1][:3]
1479 self.assertTrue((arena != narena and nstart == 0) or
1480 (stop == nstart))
1481
1482#
1483#
1484#
1485
1486try:
1487 from ctypes import Structure, Value, copy, c_int, c_double
1488except ImportError:
1489 Structure = object
1490 c_int = c_double = None
1491
1492class _Foo(Structure):
1493 _fields_ = [
1494 ('x', c_int),
1495 ('y', c_double)
1496 ]
1497
1498class _TestSharedCTypes(BaseTestCase):
1499
1500 ALLOWED_TYPES = ('processes',)
1501
1502 def _double(self, x, y, foo, arr, string):
1503 x.value *= 2
1504 y.value *= 2
1505 foo.x *= 2
1506 foo.y *= 2
1507 string.value *= 2
1508 for i in range(len(arr)):
1509 arr[i] *= 2
1510
1511 def test_sharedctypes(self, lock=False):
1512 if c_int is None:
1513 return
1514
1515 x = Value('i', 7, lock=lock)
1516 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1517 foo = Value(_Foo, 3, 2, lock=lock)
1518 arr = Array('d', range(10), lock=lock)
1519 string = Array('c', 20, lock=lock)
1520 string.value = 'hello'
1521
1522 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1523 p.start()
1524 p.join()
1525
1526 self.assertEqual(x.value, 14)
1527 self.assertAlmostEqual(y.value, 2.0/3.0)
1528 self.assertEqual(foo.x, 6)
1529 self.assertAlmostEqual(foo.y, 4.0)
1530 for i in range(10):
1531 self.assertAlmostEqual(arr[i], i*2)
1532 self.assertEqual(string.value, latin('hellohello'))
1533
1534 def test_synchronize(self):
1535 self.test_sharedctypes(lock=True)
1536
1537 def test_copy(self):
1538 if c_int is None:
1539 return
1540
1541 foo = _Foo(2, 5.0)
1542 bar = copy(foo)
1543 foo.x = 0
1544 foo.y = 0
1545 self.assertEqual(bar.x, 2)
1546 self.assertAlmostEqual(bar.y, 5.0)
1547
1548#
1549#
1550#
1551
1552class _TestFinalize(BaseTestCase):
1553
1554 ALLOWED_TYPES = ('processes',)
1555
1556 def _test_finalize(self, conn):
1557 class Foo(object):
1558 pass
1559
1560 a = Foo()
1561 util.Finalize(a, conn.send, args=('a',))
1562 del a # triggers callback for a
1563
1564 b = Foo()
1565 close_b = util.Finalize(b, conn.send, args=('b',))
1566 close_b() # triggers callback for b
1567 close_b() # does nothing because callback has already been called
1568 del b # does nothing because callback has already been called
1569
1570 c = Foo()
1571 util.Finalize(c, conn.send, args=('c',))
1572
1573 d10 = Foo()
1574 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1575
1576 d01 = Foo()
1577 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1578 d02 = Foo()
1579 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1580 d03 = Foo()
1581 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1582
1583 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1584
1585 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1586
1587 # call mutliprocessing's cleanup function then exit process without
1588 # garbage collecting locals
1589 util._exit_function()
1590 conn.close()
1591 os._exit(0)
1592
1593 def test_finalize(self):
1594 conn, child_conn = self.Pipe()
1595
1596 p = self.Process(target=self._test_finalize, args=(child_conn,))
1597 p.start()
1598 p.join()
1599
1600 result = [obj for obj in iter(conn.recv, 'STOP')]
1601 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1602
1603#
1604# Test that from ... import * works for each module
1605#
1606
1607class _TestImportStar(BaseTestCase):
1608
1609 ALLOWED_TYPES = ('processes',)
1610
1611 def test_import(self):
1612 modules = (
1613 'multiprocessing', 'multiprocessing.connection',
1614 'multiprocessing.heap', 'multiprocessing.managers',
1615 'multiprocessing.pool', 'multiprocessing.process',
1616 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1617 'multiprocessing.synchronize', 'multiprocessing.util'
1618 )
1619
1620 for name in modules:
1621 __import__(name)
1622 mod = sys.modules[name]
1623
1624 for attr in getattr(mod, '__all__', ()):
1625 self.assertTrue(
1626 hasattr(mod, attr),
1627 '%r does not have attribute %r' % (mod, attr)
1628 )
1629
1630#
1631# Quick test that logging works -- does not test logging output
1632#
1633
1634class _TestLogging(BaseTestCase):
1635
1636 ALLOWED_TYPES = ('processes',)
1637
1638 def test_enable_logging(self):
1639 logger = multiprocessing.get_logger()
1640 logger.setLevel(util.SUBWARNING)
1641 self.assertTrue(logger is not None)
1642 logger.debug('this will not be printed')
1643 logger.info('nor will this')
1644 logger.setLevel(LOG_LEVEL)
1645
1646 def _test_level(self, conn):
1647 logger = multiprocessing.get_logger()
1648 conn.send(logger.getEffectiveLevel())
1649
1650 def test_level(self):
1651 LEVEL1 = 32
1652 LEVEL2 = 37
1653
1654 logger = multiprocessing.get_logger()
1655 root_logger = logging.getLogger()
1656 root_level = root_logger.level
1657
1658 reader, writer = multiprocessing.Pipe(duplex=False)
1659
1660 logger.setLevel(LEVEL1)
1661 self.Process(target=self._test_level, args=(writer,)).start()
1662 self.assertEqual(LEVEL1, reader.recv())
1663
1664 logger.setLevel(logging.NOTSET)
1665 root_logger.setLevel(LEVEL2)
1666 self.Process(target=self._test_level, args=(writer,)).start()
1667 self.assertEqual(LEVEL2, reader.recv())
1668
1669 root_logger.setLevel(root_level)
1670 logger.setLevel(level=LOG_LEVEL)
1671
1672#
1673# Functions used to create test cases from the base ones in this module
1674#
1675
1676def get_attributes(Source, names):
1677 d = {}
1678 for name in names:
1679 obj = getattr(Source, name)
1680 if type(obj) == type(get_attributes):
1681 obj = staticmethod(obj)
1682 d[name] = obj
1683 return d
1684
1685def create_test_cases(Mixin, type):
1686 result = {}
1687 glob = globals()
1688 Type = type[0].upper() + type[1:]
1689
1690 for name in glob.keys():
1691 if name.startswith('_Test'):
1692 base = glob[name]
1693 if type in base.ALLOWED_TYPES:
1694 newname = 'With' + Type + name[1:]
1695 class Temp(base, unittest.TestCase, Mixin):
1696 pass
1697 result[newname] = Temp
1698 Temp.__name__ = newname
1699 Temp.__module__ = Mixin.__module__
1700 return result
1701
1702#
1703# Create test cases
1704#
1705
1706class ProcessesMixin(object):
1707 TYPE = 'processes'
1708 Process = multiprocessing.Process
1709 locals().update(get_attributes(multiprocessing, (
1710 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1711 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1712 'RawArray', 'current_process', 'active_children', 'Pipe',
1713 'connection', 'JoinableQueue'
1714 )))
1715
1716testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1717globals().update(testcases_processes)
1718
1719
1720class ManagerMixin(object):
1721 TYPE = 'manager'
1722 Process = multiprocessing.Process
1723 manager = object.__new__(multiprocessing.managers.SyncManager)
1724 locals().update(get_attributes(manager, (
1725 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1726 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1727 'Namespace', 'JoinableQueue'
1728 )))
1729
1730testcases_manager = create_test_cases(ManagerMixin, type='manager')
1731globals().update(testcases_manager)
1732
1733
1734class ThreadsMixin(object):
1735 TYPE = 'threads'
1736 Process = multiprocessing.dummy.Process
1737 locals().update(get_attributes(multiprocessing.dummy, (
1738 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1739 'Condition', 'Event', 'Value', 'Array', 'current_process',
1740 'active_children', 'Pipe', 'connection', 'dict', 'list',
1741 'Namespace', 'JoinableQueue'
1742 )))
1743
1744testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1745globals().update(testcases_threads)
1746
Neal Norwitz0c519b32008-08-25 01:50:24 +00001747class OtherTest(unittest.TestCase):
1748 # TODO: add more tests for deliver/answer challenge.
1749 def test_deliver_challenge_auth_failure(self):
1750 class _FakeConnection(object):
1751 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001752 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001753 def send_bytes(self, data):
1754 pass
1755 self.assertRaises(multiprocessing.AuthenticationError,
1756 multiprocessing.connection.deliver_challenge,
1757 _FakeConnection(), b'abc')
1758
1759 def test_answer_challenge_auth_failure(self):
1760 class _FakeConnection(object):
1761 def __init__(self):
1762 self.count = 0
1763 def recv_bytes(self, size):
1764 self.count += 1
1765 if self.count == 1:
1766 return multiprocessing.connection.CHALLENGE
1767 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001768 return b'something bogus'
1769 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001770 def send_bytes(self, data):
1771 pass
1772 self.assertRaises(multiprocessing.AuthenticationError,
1773 multiprocessing.connection.answer_challenge,
1774 _FakeConnection(), b'abc')
1775
1776testcases_other = [OtherTest]
1777
Benjamin Petersondfd79492008-06-13 19:13:39 +00001778#
1779#
1780#
1781
1782def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001783 if sys.platform.startswith("linux"):
1784 try:
1785 lock = multiprocessing.RLock()
1786 except OSError:
1787 from test.test_support import TestSkipped
1788 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001789
Benjamin Petersondfd79492008-06-13 19:13:39 +00001790 if run is None:
1791 from test.test_support import run_unittest as run
1792
1793 util.get_temp_dir() # creates temp directory for use by all processes
1794
1795 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1796
Jesse Noller146b7ab2008-07-02 16:44:09 +00001797 ProcessesMixin.pool = multiprocessing.Pool(4)
1798 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1799 ManagerMixin.manager.__init__()
1800 ManagerMixin.manager.start()
1801 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001802
1803 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001804 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1805 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00001806 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1807 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00001808 )
1809
1810 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1811 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1812 run(suite)
1813
Jesse Noller146b7ab2008-07-02 16:44:09 +00001814 ThreadsMixin.pool.terminate()
1815 ProcessesMixin.pool.terminate()
1816 ManagerMixin.pool.terminate()
1817 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001818
Jesse Noller146b7ab2008-07-02 16:44:09 +00001819 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001820
1821def main():
1822 test_main(unittest.TextTestRunner(verbosity=2).run)
1823
1824if __name__ == '__main__':
1825 main()