blob: 2daff7edfdb2605689aa9a80ad9aabfdd1a1f964 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
Jesse Noller37040cd2008-09-30 00:15:45 +000021
22# Work around broken sem_open implementations
23try:
24 import multiprocessing.synchronize
25except ImportError, e:
26 from test.test_support import TestSkipped
27 raise TestSkipped(e)
28
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
34import _multiprocessing
35
36from multiprocessing import util
37
38#
39#
40#
41
Benjamin Petersone79edf52008-07-13 18:34:58 +000042latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Benjamin Petersondfd79492008-06-13 19:13:39 +000044#
45# Constants
46#
47
48LOG_LEVEL = util.SUBWARNING
49#LOG_LEVEL = logging.WARNING
50
51DELTA = 0.1
52CHECK_TIMINGS = False # making true makes tests take a lot longer
53 # and can sometimes cause some non-serious
54 # failures because some calls block a bit
55 # longer than expected
56if CHECK_TIMINGS:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
58else:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
60
61HAVE_GETVALUE = not getattr(_multiprocessing,
62 'HAVE_BROKEN_SEM_GETVALUE', False)
63
64#
65# Creates a wrapper for a function which records the time it takes to finish
66#
67
68class TimingWrapper(object):
69
70 def __init__(self, func):
71 self.func = func
72 self.elapsed = None
73
74 def __call__(self, *args, **kwds):
75 t = time.time()
76 try:
77 return self.func(*args, **kwds)
78 finally:
79 self.elapsed = time.time() - t
80
81#
82# Base class for test cases
83#
84
85class BaseTestCase(object):
86
87 ALLOWED_TYPES = ('processes', 'manager', 'threads')
88
89 def assertTimingAlmostEqual(self, a, b):
90 if CHECK_TIMINGS:
91 self.assertAlmostEqual(a, b, 1)
92
93 def assertReturnsIfImplemented(self, value, func, *args):
94 try:
95 res = func(*args)
96 except NotImplementedError:
97 pass
98 else:
99 return self.assertEqual(value, res)
100
101#
102# Return the value of a semaphore
103#
104
105def get_value(self):
106 try:
107 return self.get_value()
108 except AttributeError:
109 try:
110 return self._Semaphore__value
111 except AttributeError:
112 try:
113 return self._value
114 except AttributeError:
115 raise NotImplementedError
116
117#
118# Testcases
119#
120
121class _TestProcess(BaseTestCase):
122
123 ALLOWED_TYPES = ('processes', 'threads')
124
125 def test_current(self):
126 if self.TYPE == 'threads':
127 return
128
129 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000130 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000131
132 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000133 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000134 self.assertTrue(isinstance(authkey, bytes))
135 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000136 self.assertEqual(current.ident, os.getpid())
137 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000138
139 def _test(self, q, *args, **kwds):
140 current = self.current_process()
141 q.put(args)
142 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000143 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000144 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000145 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000146 q.put(current.pid)
147
148 def test_process(self):
149 q = self.Queue(1)
150 e = self.Event()
151 args = (q, 1, 2)
152 kwargs = {'hello':23, 'bye':2.54}
153 name = 'SomeProcess'
154 p = self.Process(
155 target=self._test, args=args, kwargs=kwargs, name=name
156 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158 current = self.current_process()
159
160 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000161 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000163 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000164 self.assertTrue(p not in self.active_children())
165 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000166 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000167
168 p.start()
169
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 self.assertEquals(p.is_alive(), True)
172 self.assertTrue(p in self.active_children())
173
174 self.assertEquals(q.get(), args[1:])
175 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000176 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000177 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000178 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179 self.assertEquals(q.get(), p.pid)
180
181 p.join()
182
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000183 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184 self.assertEquals(p.is_alive(), False)
185 self.assertTrue(p not in self.active_children())
186
187 def _test_terminate(self):
188 time.sleep(1000)
189
190 def test_terminate(self):
191 if self.TYPE == 'threads':
192 return
193
194 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000195 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196 p.start()
197
198 self.assertEqual(p.is_alive(), True)
199 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201
202 p.terminate()
203
204 join = TimingWrapper(p.join)
205 self.assertEqual(join(), None)
206 self.assertTimingAlmostEqual(join.elapsed, 0.0)
207
208 self.assertEqual(p.is_alive(), False)
209 self.assertTrue(p not in self.active_children())
210
211 p.join()
212
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000213 # XXX sometimes get p.exitcode == 0 on Windows ...
214 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215
216 def test_cpu_count(self):
217 try:
218 cpus = multiprocessing.cpu_count()
219 except NotImplementedError:
220 cpus = 1
221 self.assertTrue(type(cpus) is int)
222 self.assertTrue(cpus >= 1)
223
224 def test_active_children(self):
225 self.assertEqual(type(self.active_children()), list)
226
227 p = self.Process(target=time.sleep, args=(DELTA,))
228 self.assertTrue(p not in self.active_children())
229
230 p.start()
231 self.assertTrue(p in self.active_children())
232
233 p.join()
234 self.assertTrue(p not in self.active_children())
235
236 def _test_recursion(self, wconn, id):
237 from multiprocessing import forking
238 wconn.send(id)
239 if len(id) < 2:
240 for i in range(2):
241 p = self.Process(
242 target=self._test_recursion, args=(wconn, id+[i])
243 )
244 p.start()
245 p.join()
246
247 def test_recursion(self):
248 rconn, wconn = self.Pipe(duplex=False)
249 self._test_recursion(wconn, [])
250
251 time.sleep(DELTA)
252 result = []
253 while rconn.poll():
254 result.append(rconn.recv())
255
256 expected = [
257 [],
258 [0],
259 [0, 0],
260 [0, 1],
261 [1],
262 [1, 0],
263 [1, 1]
264 ]
265 self.assertEqual(result, expected)
266
267#
268#
269#
270
271class _UpperCaser(multiprocessing.Process):
272
273 def __init__(self):
274 multiprocessing.Process.__init__(self)
275 self.child_conn, self.parent_conn = multiprocessing.Pipe()
276
277 def run(self):
278 self.parent_conn.close()
279 for s in iter(self.child_conn.recv, None):
280 self.child_conn.send(s.upper())
281 self.child_conn.close()
282
283 def submit(self, s):
284 assert type(s) is str
285 self.parent_conn.send(s)
286 return self.parent_conn.recv()
287
288 def stop(self):
289 self.parent_conn.send(None)
290 self.parent_conn.close()
291 self.child_conn.close()
292
293class _TestSubclassingProcess(BaseTestCase):
294
295 ALLOWED_TYPES = ('processes',)
296
297 def test_subclassing(self):
298 uppercaser = _UpperCaser()
299 uppercaser.start()
300 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
301 self.assertEqual(uppercaser.submit('world'), 'WORLD')
302 uppercaser.stop()
303 uppercaser.join()
304
305#
306#
307#
308
309def queue_empty(q):
310 if hasattr(q, 'empty'):
311 return q.empty()
312 else:
313 return q.qsize() == 0
314
315def queue_full(q, maxsize):
316 if hasattr(q, 'full'):
317 return q.full()
318 else:
319 return q.qsize() == maxsize
320
321
322class _TestQueue(BaseTestCase):
323
324
325 def _test_put(self, queue, child_can_start, parent_can_continue):
326 child_can_start.wait()
327 for i in range(6):
328 queue.get()
329 parent_can_continue.set()
330
331 def test_put(self):
332 MAXSIZE = 6
333 queue = self.Queue(maxsize=MAXSIZE)
334 child_can_start = self.Event()
335 parent_can_continue = self.Event()
336
337 proc = self.Process(
338 target=self._test_put,
339 args=(queue, child_can_start, parent_can_continue)
340 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000341 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000342 proc.start()
343
344 self.assertEqual(queue_empty(queue), True)
345 self.assertEqual(queue_full(queue, MAXSIZE), False)
346
347 queue.put(1)
348 queue.put(2, True)
349 queue.put(3, True, None)
350 queue.put(4, False)
351 queue.put(5, False, None)
352 queue.put_nowait(6)
353
354 # the values may be in buffer but not yet in pipe so sleep a bit
355 time.sleep(DELTA)
356
357 self.assertEqual(queue_empty(queue), False)
358 self.assertEqual(queue_full(queue, MAXSIZE), True)
359
360 put = TimingWrapper(queue.put)
361 put_nowait = TimingWrapper(queue.put_nowait)
362
363 self.assertRaises(Queue.Full, put, 7, False)
364 self.assertTimingAlmostEqual(put.elapsed, 0)
365
366 self.assertRaises(Queue.Full, put, 7, False, None)
367 self.assertTimingAlmostEqual(put.elapsed, 0)
368
369 self.assertRaises(Queue.Full, put_nowait, 7)
370 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
371
372 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
373 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
374
375 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
376 self.assertTimingAlmostEqual(put.elapsed, 0)
377
378 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
379 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
380
381 child_can_start.set()
382 parent_can_continue.wait()
383
384 self.assertEqual(queue_empty(queue), True)
385 self.assertEqual(queue_full(queue, MAXSIZE), False)
386
387 proc.join()
388
389 def _test_get(self, queue, child_can_start, parent_can_continue):
390 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000391 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000392 queue.put(2)
393 queue.put(3)
394 queue.put(4)
395 queue.put(5)
396 parent_can_continue.set()
397
398 def test_get(self):
399 queue = self.Queue()
400 child_can_start = self.Event()
401 parent_can_continue = self.Event()
402
403 proc = self.Process(
404 target=self._test_get,
405 args=(queue, child_can_start, parent_can_continue)
406 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000407 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000408 proc.start()
409
410 self.assertEqual(queue_empty(queue), True)
411
412 child_can_start.set()
413 parent_can_continue.wait()
414
415 time.sleep(DELTA)
416 self.assertEqual(queue_empty(queue), False)
417
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000418 # Hangs unexpectedly, remove for now
419 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000420 self.assertEqual(queue.get(True, None), 2)
421 self.assertEqual(queue.get(True), 3)
422 self.assertEqual(queue.get(timeout=1), 4)
423 self.assertEqual(queue.get_nowait(), 5)
424
425 self.assertEqual(queue_empty(queue), True)
426
427 get = TimingWrapper(queue.get)
428 get_nowait = TimingWrapper(queue.get_nowait)
429
430 self.assertRaises(Queue.Empty, get, False)
431 self.assertTimingAlmostEqual(get.elapsed, 0)
432
433 self.assertRaises(Queue.Empty, get, False, None)
434 self.assertTimingAlmostEqual(get.elapsed, 0)
435
436 self.assertRaises(Queue.Empty, get_nowait)
437 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
438
439 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
440 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
441
442 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
443 self.assertTimingAlmostEqual(get.elapsed, 0)
444
445 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
446 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
447
448 proc.join()
449
450 def _test_fork(self, queue):
451 for i in range(10, 20):
452 queue.put(i)
453 # note that at this point the items may only be buffered, so the
454 # process cannot shutdown until the feeder thread has finished
455 # pushing items onto the pipe.
456
457 def test_fork(self):
458 # Old versions of Queue would fail to create a new feeder
459 # thread for a forked process if the original process had its
460 # own feeder thread. This test checks that this no longer
461 # happens.
462
463 queue = self.Queue()
464
465 # put items on queue so that main process starts a feeder thread
466 for i in range(10):
467 queue.put(i)
468
469 # wait to make sure thread starts before we fork a new process
470 time.sleep(DELTA)
471
472 # fork process
473 p = self.Process(target=self._test_fork, args=(queue,))
474 p.start()
475
476 # check that all expected items are in the queue
477 for i in range(20):
478 self.assertEqual(queue.get(), i)
479 self.assertRaises(Queue.Empty, queue.get, False)
480
481 p.join()
482
483 def test_qsize(self):
484 q = self.Queue()
485 try:
486 self.assertEqual(q.qsize(), 0)
487 except NotImplementedError:
488 return
489 q.put(1)
490 self.assertEqual(q.qsize(), 1)
491 q.put(5)
492 self.assertEqual(q.qsize(), 2)
493 q.get()
494 self.assertEqual(q.qsize(), 1)
495 q.get()
496 self.assertEqual(q.qsize(), 0)
497
498 def _test_task_done(self, q):
499 for obj in iter(q.get, None):
500 time.sleep(DELTA)
501 q.task_done()
502
503 def test_task_done(self):
504 queue = self.JoinableQueue()
505
506 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
507 return
508
509 workers = [self.Process(target=self._test_task_done, args=(queue,))
510 for i in xrange(4)]
511
512 for p in workers:
513 p.start()
514
515 for i in xrange(10):
516 queue.put(i)
517
518 queue.join()
519
520 for p in workers:
521 queue.put(None)
522
523 for p in workers:
524 p.join()
525
526#
527#
528#
529
530class _TestLock(BaseTestCase):
531
532 def test_lock(self):
533 lock = self.Lock()
534 self.assertEqual(lock.acquire(), True)
535 self.assertEqual(lock.acquire(False), False)
536 self.assertEqual(lock.release(), None)
537 self.assertRaises((ValueError, threading.ThreadError), lock.release)
538
539 def test_rlock(self):
540 lock = self.RLock()
541 self.assertEqual(lock.acquire(), True)
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.release(), None)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertRaises((AssertionError, RuntimeError), lock.release)
548
549
550class _TestSemaphore(BaseTestCase):
551
552 def _test_semaphore(self, sem):
553 self.assertReturnsIfImplemented(2, get_value, sem)
554 self.assertEqual(sem.acquire(), True)
555 self.assertReturnsIfImplemented(1, get_value, sem)
556 self.assertEqual(sem.acquire(), True)
557 self.assertReturnsIfImplemented(0, get_value, sem)
558 self.assertEqual(sem.acquire(False), False)
559 self.assertReturnsIfImplemented(0, get_value, sem)
560 self.assertEqual(sem.release(), None)
561 self.assertReturnsIfImplemented(1, get_value, sem)
562 self.assertEqual(sem.release(), None)
563 self.assertReturnsIfImplemented(2, get_value, sem)
564
565 def test_semaphore(self):
566 sem = self.Semaphore(2)
567 self._test_semaphore(sem)
568 self.assertEqual(sem.release(), None)
569 self.assertReturnsIfImplemented(3, get_value, sem)
570 self.assertEqual(sem.release(), None)
571 self.assertReturnsIfImplemented(4, get_value, sem)
572
573 def test_bounded_semaphore(self):
574 sem = self.BoundedSemaphore(2)
575 self._test_semaphore(sem)
576 # Currently fails on OS/X
577 #if HAVE_GETVALUE:
578 # self.assertRaises(ValueError, sem.release)
579 # self.assertReturnsIfImplemented(2, get_value, sem)
580
581 def test_timeout(self):
582 if self.TYPE != 'processes':
583 return
584
585 sem = self.Semaphore(0)
586 acquire = TimingWrapper(sem.acquire)
587
588 self.assertEqual(acquire(False), False)
589 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
590
591 self.assertEqual(acquire(False, None), False)
592 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
593
594 self.assertEqual(acquire(False, TIMEOUT1), False)
595 self.assertTimingAlmostEqual(acquire.elapsed, 0)
596
597 self.assertEqual(acquire(True, TIMEOUT2), False)
598 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
599
600 self.assertEqual(acquire(timeout=TIMEOUT3), False)
601 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
602
603
604class _TestCondition(BaseTestCase):
605
606 def f(self, cond, sleeping, woken, timeout=None):
607 cond.acquire()
608 sleeping.release()
609 cond.wait(timeout)
610 woken.release()
611 cond.release()
612
613 def check_invariant(self, cond):
614 # this is only supposed to succeed when there are no sleepers
615 if self.TYPE == 'processes':
616 try:
617 sleepers = (cond._sleeping_count.get_value() -
618 cond._woken_count.get_value())
619 self.assertEqual(sleepers, 0)
620 self.assertEqual(cond._wait_semaphore.get_value(), 0)
621 except NotImplementedError:
622 pass
623
624 def test_notify(self):
625 cond = self.Condition()
626 sleeping = self.Semaphore(0)
627 woken = self.Semaphore(0)
628
629 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000630 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000631 p.start()
632
633 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000634 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000635 p.start()
636
637 # wait for both children to start sleeping
638 sleeping.acquire()
639 sleeping.acquire()
640
641 # check no process/thread has woken up
642 time.sleep(DELTA)
643 self.assertReturnsIfImplemented(0, get_value, woken)
644
645 # wake up one process/thread
646 cond.acquire()
647 cond.notify()
648 cond.release()
649
650 # check one process/thread has woken up
651 time.sleep(DELTA)
652 self.assertReturnsIfImplemented(1, get_value, woken)
653
654 # wake up another
655 cond.acquire()
656 cond.notify()
657 cond.release()
658
659 # check other has woken up
660 time.sleep(DELTA)
661 self.assertReturnsIfImplemented(2, get_value, woken)
662
663 # check state is not mucked up
664 self.check_invariant(cond)
665 p.join()
666
667 def test_notify_all(self):
668 cond = self.Condition()
669 sleeping = self.Semaphore(0)
670 woken = self.Semaphore(0)
671
672 # start some threads/processes which will timeout
673 for i in range(3):
674 p = self.Process(target=self.f,
675 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000676 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000677 p.start()
678
679 t = threading.Thread(target=self.f,
680 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000681 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000682 t.start()
683
684 # wait for them all to sleep
685 for i in xrange(6):
686 sleeping.acquire()
687
688 # check they have all timed out
689 for i in xrange(6):
690 woken.acquire()
691 self.assertReturnsIfImplemented(0, get_value, woken)
692
693 # check state is not mucked up
694 self.check_invariant(cond)
695
696 # start some more threads/processes
697 for i in range(3):
698 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000699 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000700 p.start()
701
702 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000703 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704 t.start()
705
706 # wait for them to all sleep
707 for i in xrange(6):
708 sleeping.acquire()
709
710 # check no process/thread has woken up
711 time.sleep(DELTA)
712 self.assertReturnsIfImplemented(0, get_value, woken)
713
714 # wake them all up
715 cond.acquire()
716 cond.notify_all()
717 cond.release()
718
719 # check they have all woken
720 time.sleep(DELTA)
721 self.assertReturnsIfImplemented(6, get_value, woken)
722
723 # check state is not mucked up
724 self.check_invariant(cond)
725
726 def test_timeout(self):
727 cond = self.Condition()
728 wait = TimingWrapper(cond.wait)
729 cond.acquire()
730 res = wait(TIMEOUT1)
731 cond.release()
732 self.assertEqual(res, None)
733 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
734
735
736class _TestEvent(BaseTestCase):
737
738 def _test_event(self, event):
739 time.sleep(TIMEOUT2)
740 event.set()
741
742 def test_event(self):
743 event = self.Event()
744 wait = TimingWrapper(event.wait)
745
746 # Removed temporaily, due to API shear, this does not
747 # work with threading._Event objects. is_set == isSet
748 #self.assertEqual(event.is_set(), False)
749
750 self.assertEqual(wait(0.0), None)
751 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
752 self.assertEqual(wait(TIMEOUT1), None)
753 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
754
755 event.set()
756
757 # See note above on the API differences
758 # self.assertEqual(event.is_set(), True)
759 self.assertEqual(wait(), None)
760 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
761 self.assertEqual(wait(TIMEOUT1), None)
762 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
763 # self.assertEqual(event.is_set(), True)
764
765 event.clear()
766
767 #self.assertEqual(event.is_set(), False)
768
769 self.Process(target=self._test_event, args=(event,)).start()
770 self.assertEqual(wait(), None)
771
772#
773#
774#
775
776class _TestValue(BaseTestCase):
777
778 codes_values = [
779 ('i', 4343, 24234),
780 ('d', 3.625, -4.25),
781 ('h', -232, 234),
782 ('c', latin('x'), latin('y'))
783 ]
784
785 def _test(self, values):
786 for sv, cv in zip(values, self.codes_values):
787 sv.value = cv[2]
788
789
790 def test_value(self, raw=False):
791 if self.TYPE != 'processes':
792 return
793
794 if raw:
795 values = [self.RawValue(code, value)
796 for code, value, _ in self.codes_values]
797 else:
798 values = [self.Value(code, value)
799 for code, value, _ in self.codes_values]
800
801 for sv, cv in zip(values, self.codes_values):
802 self.assertEqual(sv.value, cv[1])
803
804 proc = self.Process(target=self._test, args=(values,))
805 proc.start()
806 proc.join()
807
808 for sv, cv in zip(values, self.codes_values):
809 self.assertEqual(sv.value, cv[2])
810
811 def test_rawvalue(self):
812 self.test_value(raw=True)
813
814 def test_getobj_getlock(self):
815 if self.TYPE != 'processes':
816 return
817
818 val1 = self.Value('i', 5)
819 lock1 = val1.get_lock()
820 obj1 = val1.get_obj()
821
822 val2 = self.Value('i', 5, lock=None)
823 lock2 = val2.get_lock()
824 obj2 = val2.get_obj()
825
826 lock = self.Lock()
827 val3 = self.Value('i', 5, lock=lock)
828 lock3 = val3.get_lock()
829 obj3 = val3.get_obj()
830 self.assertEqual(lock, lock3)
831
Jesse Noller6ab22152009-01-18 02:45:38 +0000832 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833 self.assertFalse(hasattr(arr4, 'get_lock'))
834 self.assertFalse(hasattr(arr4, 'get_obj'))
835
Jesse Noller6ab22152009-01-18 02:45:38 +0000836 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
837
838 arr5 = self.RawValue('i', 5)
839 self.assertFalse(hasattr(arr5, 'get_lock'))
840 self.assertFalse(hasattr(arr5, 'get_obj'))
841
Benjamin Petersondfd79492008-06-13 19:13:39 +0000842
843class _TestArray(BaseTestCase):
844
845 def f(self, seq):
846 for i in range(1, len(seq)):
847 seq[i] += seq[i-1]
848
849 def test_array(self, raw=False):
850 if self.TYPE != 'processes':
851 return
852
853 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
854 if raw:
855 arr = self.RawArray('i', seq)
856 else:
857 arr = self.Array('i', seq)
858
859 self.assertEqual(len(arr), len(seq))
860 self.assertEqual(arr[3], seq[3])
861 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
862
863 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
864
865 self.assertEqual(list(arr[:]), seq)
866
867 self.f(seq)
868
869 p = self.Process(target=self.f, args=(arr,))
870 p.start()
871 p.join()
872
873 self.assertEqual(list(arr[:]), seq)
874
875 def test_rawarray(self):
876 self.test_array(raw=True)
877
878 def test_getobj_getlock_obj(self):
879 if self.TYPE != 'processes':
880 return
881
882 arr1 = self.Array('i', range(10))
883 lock1 = arr1.get_lock()
884 obj1 = arr1.get_obj()
885
886 arr2 = self.Array('i', range(10), lock=None)
887 lock2 = arr2.get_lock()
888 obj2 = arr2.get_obj()
889
890 lock = self.Lock()
891 arr3 = self.Array('i', range(10), lock=lock)
892 lock3 = arr3.get_lock()
893 obj3 = arr3.get_obj()
894 self.assertEqual(lock, lock3)
895
Jesse Noller6ab22152009-01-18 02:45:38 +0000896 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000897 self.assertFalse(hasattr(arr4, 'get_lock'))
898 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000899 self.assertRaises(AttributeError,
900 self.Array, 'i', range(10), lock='notalock')
901
902 arr5 = self.RawArray('i', range(10))
903 self.assertFalse(hasattr(arr5, 'get_lock'))
904 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000905
906#
907#
908#
909
910class _TestContainers(BaseTestCase):
911
912 ALLOWED_TYPES = ('manager',)
913
914 def test_list(self):
915 a = self.list(range(10))
916 self.assertEqual(a[:], range(10))
917
918 b = self.list()
919 self.assertEqual(b[:], [])
920
921 b.extend(range(5))
922 self.assertEqual(b[:], range(5))
923
924 self.assertEqual(b[2], 2)
925 self.assertEqual(b[2:10], [2,3,4])
926
927 b *= 2
928 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
929
930 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
931
932 self.assertEqual(a[:], range(10))
933
934 d = [a, b]
935 e = self.list(d)
936 self.assertEqual(
937 e[:],
938 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
939 )
940
941 f = self.list([a])
942 a.append('hello')
943 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
944
945 def test_dict(self):
946 d = self.dict()
947 indices = range(65, 70)
948 for i in indices:
949 d[i] = chr(i)
950 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
951 self.assertEqual(sorted(d.keys()), indices)
952 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
953 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
954
955 def test_namespace(self):
956 n = self.Namespace()
957 n.name = 'Bob'
958 n.job = 'Builder'
959 n._hidden = 'hidden'
960 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
961 del n.job
962 self.assertEqual(str(n), "Namespace(name='Bob')")
963 self.assertTrue(hasattr(n, 'name'))
964 self.assertTrue(not hasattr(n, 'job'))
965
966#
967#
968#
969
970def sqr(x, wait=0.0):
971 time.sleep(wait)
972 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000973class _TestPool(BaseTestCase):
974
975 def test_apply(self):
976 papply = self.pool.apply
977 self.assertEqual(papply(sqr, (5,)), sqr(5))
978 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
979
980 def test_map(self):
981 pmap = self.pool.map
982 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
983 self.assertEqual(pmap(sqr, range(100), chunksize=20),
984 map(sqr, range(100)))
985
986 def test_async(self):
987 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
988 get = TimingWrapper(res.get)
989 self.assertEqual(get(), 49)
990 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
991
992 def test_async_timeout(self):
993 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
994 get = TimingWrapper(res.get)
995 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
996 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
997
998 def test_imap(self):
999 it = self.pool.imap(sqr, range(10))
1000 self.assertEqual(list(it), map(sqr, range(10)))
1001
1002 it = self.pool.imap(sqr, range(10))
1003 for i in range(10):
1004 self.assertEqual(it.next(), i*i)
1005 self.assertRaises(StopIteration, it.next)
1006
1007 it = self.pool.imap(sqr, range(1000), chunksize=100)
1008 for i in range(1000):
1009 self.assertEqual(it.next(), i*i)
1010 self.assertRaises(StopIteration, it.next)
1011
1012 def test_imap_unordered(self):
1013 it = self.pool.imap_unordered(sqr, range(1000))
1014 self.assertEqual(sorted(it), map(sqr, range(1000)))
1015
1016 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1017 self.assertEqual(sorted(it), map(sqr, range(1000)))
1018
1019 def test_make_pool(self):
1020 p = multiprocessing.Pool(3)
1021 self.assertEqual(3, len(p._pool))
1022 p.close()
1023 p.join()
1024
1025 def test_terminate(self):
1026 if self.TYPE == 'manager':
1027 # On Unix a forked process increfs each shared object to
1028 # which its parent process held a reference. If the
1029 # forked process gets terminated then there is likely to
1030 # be a reference leak. So to prevent
1031 # _TestZZZNumberOfObjects from failing we skip this test
1032 # when using a manager.
1033 return
1034
1035 result = self.pool.map_async(
1036 time.sleep, [0.1 for i in range(10000)], chunksize=1
1037 )
1038 self.pool.terminate()
1039 join = TimingWrapper(self.pool.join)
1040 join()
1041 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001042#
1043# Test that manager has expected number of shared objects left
1044#
1045
1046class _TestZZZNumberOfObjects(BaseTestCase):
1047 # Because test cases are sorted alphabetically, this one will get
1048 # run after all the other tests for the manager. It tests that
1049 # there have been no "reference leaks" for the manager's shared
1050 # objects. Note the comment in _TestPool.test_terminate().
1051 ALLOWED_TYPES = ('manager',)
1052
1053 def test_number_of_objects(self):
1054 EXPECTED_NUMBER = 1 # the pool object is still alive
1055 multiprocessing.active_children() # discard dead process objs
1056 gc.collect() # do garbage collection
1057 refs = self.manager._number_of_objects()
1058 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001059 print self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001060
1061 self.assertEqual(refs, EXPECTED_NUMBER)
1062
1063#
1064# Test of creating a customized manager class
1065#
1066
1067from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1068
1069class FooBar(object):
1070 def f(self):
1071 return 'f()'
1072 def g(self):
1073 raise ValueError
1074 def _h(self):
1075 return '_h()'
1076
1077def baz():
1078 for i in xrange(10):
1079 yield i*i
1080
1081class IteratorProxy(BaseProxy):
1082 _exposed_ = ('next', '__next__')
1083 def __iter__(self):
1084 return self
1085 def next(self):
1086 return self._callmethod('next')
1087 def __next__(self):
1088 return self._callmethod('__next__')
1089
1090class MyManager(BaseManager):
1091 pass
1092
1093MyManager.register('Foo', callable=FooBar)
1094MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1095MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1096
1097
1098class _TestMyManager(BaseTestCase):
1099
1100 ALLOWED_TYPES = ('manager',)
1101
1102 def test_mymanager(self):
1103 manager = MyManager()
1104 manager.start()
1105
1106 foo = manager.Foo()
1107 bar = manager.Bar()
1108 baz = manager.baz()
1109
1110 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1111 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1112
1113 self.assertEqual(foo_methods, ['f', 'g'])
1114 self.assertEqual(bar_methods, ['f', '_h'])
1115
1116 self.assertEqual(foo.f(), 'f()')
1117 self.assertRaises(ValueError, foo.g)
1118 self.assertEqual(foo._callmethod('f'), 'f()')
1119 self.assertRaises(RemoteError, foo._callmethod, '_h')
1120
1121 self.assertEqual(bar.f(), 'f()')
1122 self.assertEqual(bar._h(), '_h()')
1123 self.assertEqual(bar._callmethod('f'), 'f()')
1124 self.assertEqual(bar._callmethod('_h'), '_h()')
1125
1126 self.assertEqual(list(baz), [i*i for i in range(10)])
1127
1128 manager.shutdown()
1129
1130#
1131# Test of connecting to a remote server and using xmlrpclib for serialization
1132#
1133
1134_queue = Queue.Queue()
1135def get_queue():
1136 return _queue
1137
1138class QueueManager(BaseManager):
1139 '''manager class used by server process'''
1140QueueManager.register('get_queue', callable=get_queue)
1141
1142class QueueManager2(BaseManager):
1143 '''manager class which specifies the same interface as QueueManager'''
1144QueueManager2.register('get_queue')
1145
1146
1147SERIALIZER = 'xmlrpclib'
1148
1149class _TestRemoteManager(BaseTestCase):
1150
1151 ALLOWED_TYPES = ('manager',)
1152
1153 def _putter(self, address, authkey):
1154 manager = QueueManager2(
1155 address=address, authkey=authkey, serializer=SERIALIZER
1156 )
1157 manager.connect()
1158 queue = manager.get_queue()
1159 queue.put(('hello world', None, True, 2.25))
1160
1161 def test_remote(self):
1162 authkey = os.urandom(32)
1163
1164 manager = QueueManager(
1165 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1166 )
1167 manager.start()
1168
1169 p = self.Process(target=self._putter, args=(manager.address, authkey))
1170 p.start()
1171
1172 manager2 = QueueManager2(
1173 address=manager.address, authkey=authkey, serializer=SERIALIZER
1174 )
1175 manager2.connect()
1176 queue = manager2.get_queue()
1177
1178 # Note that xmlrpclib will deserialize object as a list not a tuple
1179 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1180
1181 # Because we are using xmlrpclib for serialization instead of
1182 # pickle this will cause a serialization error.
1183 self.assertRaises(Exception, queue.put, time.sleep)
1184
1185 # Make queue finalizer run before the server is stopped
1186 del queue
1187 manager.shutdown()
1188
1189#
1190#
1191#
1192
1193SENTINEL = latin('')
1194
1195class _TestConnection(BaseTestCase):
1196
1197 ALLOWED_TYPES = ('processes', 'threads')
1198
1199 def _echo(self, conn):
1200 for msg in iter(conn.recv_bytes, SENTINEL):
1201 conn.send_bytes(msg)
1202 conn.close()
1203
1204 def test_connection(self):
1205 conn, child_conn = self.Pipe()
1206
1207 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001208 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001209 p.start()
1210
1211 seq = [1, 2.25, None]
1212 msg = latin('hello world')
1213 longmsg = msg * 10
1214 arr = array.array('i', range(4))
1215
1216 if self.TYPE == 'processes':
1217 self.assertEqual(type(conn.fileno()), int)
1218
1219 self.assertEqual(conn.send(seq), None)
1220 self.assertEqual(conn.recv(), seq)
1221
1222 self.assertEqual(conn.send_bytes(msg), None)
1223 self.assertEqual(conn.recv_bytes(), msg)
1224
1225 if self.TYPE == 'processes':
1226 buffer = array.array('i', [0]*10)
1227 expected = list(arr) + [0] * (10 - len(arr))
1228 self.assertEqual(conn.send_bytes(arr), None)
1229 self.assertEqual(conn.recv_bytes_into(buffer),
1230 len(arr) * buffer.itemsize)
1231 self.assertEqual(list(buffer), expected)
1232
1233 buffer = array.array('i', [0]*10)
1234 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1235 self.assertEqual(conn.send_bytes(arr), None)
1236 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1237 len(arr) * buffer.itemsize)
1238 self.assertEqual(list(buffer), expected)
1239
1240 buffer = bytearray(latin(' ' * 40))
1241 self.assertEqual(conn.send_bytes(longmsg), None)
1242 try:
1243 res = conn.recv_bytes_into(buffer)
1244 except multiprocessing.BufferTooShort, e:
1245 self.assertEqual(e.args, (longmsg,))
1246 else:
1247 self.fail('expected BufferTooShort, got %s' % res)
1248
1249 poll = TimingWrapper(conn.poll)
1250
1251 self.assertEqual(poll(), False)
1252 self.assertTimingAlmostEqual(poll.elapsed, 0)
1253
1254 self.assertEqual(poll(TIMEOUT1), False)
1255 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1256
1257 conn.send(None)
1258
1259 self.assertEqual(poll(TIMEOUT1), True)
1260 self.assertTimingAlmostEqual(poll.elapsed, 0)
1261
1262 self.assertEqual(conn.recv(), None)
1263
1264 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1265 conn.send_bytes(really_big_msg)
1266 self.assertEqual(conn.recv_bytes(), really_big_msg)
1267
1268 conn.send_bytes(SENTINEL) # tell child to quit
1269 child_conn.close()
1270
1271 if self.TYPE == 'processes':
1272 self.assertEqual(conn.readable, True)
1273 self.assertEqual(conn.writable, True)
1274 self.assertRaises(EOFError, conn.recv)
1275 self.assertRaises(EOFError, conn.recv_bytes)
1276
1277 p.join()
1278
1279 def test_duplex_false(self):
1280 reader, writer = self.Pipe(duplex=False)
1281 self.assertEqual(writer.send(1), None)
1282 self.assertEqual(reader.recv(), 1)
1283 if self.TYPE == 'processes':
1284 self.assertEqual(reader.readable, True)
1285 self.assertEqual(reader.writable, False)
1286 self.assertEqual(writer.readable, False)
1287 self.assertEqual(writer.writable, True)
1288 self.assertRaises(IOError, reader.send, 2)
1289 self.assertRaises(IOError, writer.recv)
1290 self.assertRaises(IOError, writer.poll)
1291
1292 def test_spawn_close(self):
1293 # We test that a pipe connection can be closed by parent
1294 # process immediately after child is spawned. On Windows this
1295 # would have sometimes failed on old versions because
1296 # child_conn would be closed before the child got a chance to
1297 # duplicate it.
1298 conn, child_conn = self.Pipe()
1299
1300 p = self.Process(target=self._echo, args=(child_conn,))
1301 p.start()
1302 child_conn.close() # this might complete before child initializes
1303
1304 msg = latin('hello')
1305 conn.send_bytes(msg)
1306 self.assertEqual(conn.recv_bytes(), msg)
1307
1308 conn.send_bytes(SENTINEL)
1309 conn.close()
1310 p.join()
1311
1312 def test_sendbytes(self):
1313 if self.TYPE != 'processes':
1314 return
1315
1316 msg = latin('abcdefghijklmnopqrstuvwxyz')
1317 a, b = self.Pipe()
1318
1319 a.send_bytes(msg)
1320 self.assertEqual(b.recv_bytes(), msg)
1321
1322 a.send_bytes(msg, 5)
1323 self.assertEqual(b.recv_bytes(), msg[5:])
1324
1325 a.send_bytes(msg, 7, 8)
1326 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1327
1328 a.send_bytes(msg, 26)
1329 self.assertEqual(b.recv_bytes(), latin(''))
1330
1331 a.send_bytes(msg, 26, 0)
1332 self.assertEqual(b.recv_bytes(), latin(''))
1333
1334 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1335
1336 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1337
1338 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1339
1340 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1341
1342 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1343
Benjamin Petersondfd79492008-06-13 19:13:39 +00001344class _TestListenerClient(BaseTestCase):
1345
1346 ALLOWED_TYPES = ('processes', 'threads')
1347
1348 def _test(self, address):
1349 conn = self.connection.Client(address)
1350 conn.send('hello')
1351 conn.close()
1352
1353 def test_listener_client(self):
1354 for family in self.connection.families:
1355 l = self.connection.Listener(family=family)
1356 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001357 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001358 p.start()
1359 conn = l.accept()
1360 self.assertEqual(conn.recv(), 'hello')
1361 p.join()
1362 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001363#
1364# Test of sending connection and socket objects between processes
1365#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001366"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001367class _TestPicklingConnections(BaseTestCase):
1368
1369 ALLOWED_TYPES = ('processes',)
1370
1371 def _listener(self, conn, families):
1372 for fam in families:
1373 l = self.connection.Listener(family=fam)
1374 conn.send(l.address)
1375 new_conn = l.accept()
1376 conn.send(new_conn)
1377
1378 if self.TYPE == 'processes':
1379 l = socket.socket()
1380 l.bind(('localhost', 0))
1381 conn.send(l.getsockname())
1382 l.listen(1)
1383 new_conn, addr = l.accept()
1384 conn.send(new_conn)
1385
1386 conn.recv()
1387
1388 def _remote(self, conn):
1389 for (address, msg) in iter(conn.recv, None):
1390 client = self.connection.Client(address)
1391 client.send(msg.upper())
1392 client.close()
1393
1394 if self.TYPE == 'processes':
1395 address, msg = conn.recv()
1396 client = socket.socket()
1397 client.connect(address)
1398 client.sendall(msg.upper())
1399 client.close()
1400
1401 conn.close()
1402
1403 def test_pickling(self):
1404 try:
1405 multiprocessing.allow_connection_pickling()
1406 except ImportError:
1407 return
1408
1409 families = self.connection.families
1410
1411 lconn, lconn0 = self.Pipe()
1412 lp = self.Process(target=self._listener, args=(lconn0, families))
1413 lp.start()
1414 lconn0.close()
1415
1416 rconn, rconn0 = self.Pipe()
1417 rp = self.Process(target=self._remote, args=(rconn0,))
1418 rp.start()
1419 rconn0.close()
1420
1421 for fam in families:
1422 msg = ('This connection uses family %s' % fam).encode('ascii')
1423 address = lconn.recv()
1424 rconn.send((address, msg))
1425 new_conn = lconn.recv()
1426 self.assertEqual(new_conn.recv(), msg.upper())
1427
1428 rconn.send(None)
1429
1430 if self.TYPE == 'processes':
1431 msg = latin('This connection uses a normal socket')
1432 address = lconn.recv()
1433 rconn.send((address, msg))
1434 if hasattr(socket, 'fromfd'):
1435 new_conn = lconn.recv()
1436 self.assertEqual(new_conn.recv(100), msg.upper())
1437 else:
1438 # XXX On Windows with Py2.6 need to backport fromfd()
1439 discard = lconn.recv_bytes()
1440
1441 lconn.send(None)
1442
1443 rconn.close()
1444 lconn.close()
1445
1446 lp.join()
1447 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001448"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001449#
1450#
1451#
1452
1453class _TestHeap(BaseTestCase):
1454
1455 ALLOWED_TYPES = ('processes',)
1456
1457 def test_heap(self):
1458 iterations = 5000
1459 maxblocks = 50
1460 blocks = []
1461
1462 # create and destroy lots of blocks of different sizes
1463 for i in xrange(iterations):
1464 size = int(random.lognormvariate(0, 1) * 1000)
1465 b = multiprocessing.heap.BufferWrapper(size)
1466 blocks.append(b)
1467 if len(blocks) > maxblocks:
1468 i = random.randrange(maxblocks)
1469 del blocks[i]
1470
1471 # get the heap object
1472 heap = multiprocessing.heap.BufferWrapper._heap
1473
1474 # verify the state of the heap
1475 all = []
1476 occupied = 0
1477 for L in heap._len_to_seq.values():
1478 for arena, start, stop in L:
1479 all.append((heap._arenas.index(arena), start, stop,
1480 stop-start, 'free'))
1481 for arena, start, stop in heap._allocated_blocks:
1482 all.append((heap._arenas.index(arena), start, stop,
1483 stop-start, 'occupied'))
1484 occupied += (stop-start)
1485
1486 all.sort()
1487
1488 for i in range(len(all)-1):
1489 (arena, start, stop) = all[i][:3]
1490 (narena, nstart, nstop) = all[i+1][:3]
1491 self.assertTrue((arena != narena and nstart == 0) or
1492 (stop == nstart))
1493
1494#
1495#
1496#
1497
1498try:
1499 from ctypes import Structure, Value, copy, c_int, c_double
1500except ImportError:
1501 Structure = object
1502 c_int = c_double = None
1503
1504class _Foo(Structure):
1505 _fields_ = [
1506 ('x', c_int),
1507 ('y', c_double)
1508 ]
1509
1510class _TestSharedCTypes(BaseTestCase):
1511
1512 ALLOWED_TYPES = ('processes',)
1513
1514 def _double(self, x, y, foo, arr, string):
1515 x.value *= 2
1516 y.value *= 2
1517 foo.x *= 2
1518 foo.y *= 2
1519 string.value *= 2
1520 for i in range(len(arr)):
1521 arr[i] *= 2
1522
1523 def test_sharedctypes(self, lock=False):
1524 if c_int is None:
1525 return
1526
1527 x = Value('i', 7, lock=lock)
1528 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1529 foo = Value(_Foo, 3, 2, lock=lock)
1530 arr = Array('d', range(10), lock=lock)
1531 string = Array('c', 20, lock=lock)
1532 string.value = 'hello'
1533
1534 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1535 p.start()
1536 p.join()
1537
1538 self.assertEqual(x.value, 14)
1539 self.assertAlmostEqual(y.value, 2.0/3.0)
1540 self.assertEqual(foo.x, 6)
1541 self.assertAlmostEqual(foo.y, 4.0)
1542 for i in range(10):
1543 self.assertAlmostEqual(arr[i], i*2)
1544 self.assertEqual(string.value, latin('hellohello'))
1545
1546 def test_synchronize(self):
1547 self.test_sharedctypes(lock=True)
1548
1549 def test_copy(self):
1550 if c_int is None:
1551 return
1552
1553 foo = _Foo(2, 5.0)
1554 bar = copy(foo)
1555 foo.x = 0
1556 foo.y = 0
1557 self.assertEqual(bar.x, 2)
1558 self.assertAlmostEqual(bar.y, 5.0)
1559
1560#
1561#
1562#
1563
1564class _TestFinalize(BaseTestCase):
1565
1566 ALLOWED_TYPES = ('processes',)
1567
1568 def _test_finalize(self, conn):
1569 class Foo(object):
1570 pass
1571
1572 a = Foo()
1573 util.Finalize(a, conn.send, args=('a',))
1574 del a # triggers callback for a
1575
1576 b = Foo()
1577 close_b = util.Finalize(b, conn.send, args=('b',))
1578 close_b() # triggers callback for b
1579 close_b() # does nothing because callback has already been called
1580 del b # does nothing because callback has already been called
1581
1582 c = Foo()
1583 util.Finalize(c, conn.send, args=('c',))
1584
1585 d10 = Foo()
1586 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1587
1588 d01 = Foo()
1589 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1590 d02 = Foo()
1591 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1592 d03 = Foo()
1593 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1594
1595 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1596
1597 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1598
1599 # call mutliprocessing's cleanup function then exit process without
1600 # garbage collecting locals
1601 util._exit_function()
1602 conn.close()
1603 os._exit(0)
1604
1605 def test_finalize(self):
1606 conn, child_conn = self.Pipe()
1607
1608 p = self.Process(target=self._test_finalize, args=(child_conn,))
1609 p.start()
1610 p.join()
1611
1612 result = [obj for obj in iter(conn.recv, 'STOP')]
1613 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1614
1615#
1616# Test that from ... import * works for each module
1617#
1618
1619class _TestImportStar(BaseTestCase):
1620
1621 ALLOWED_TYPES = ('processes',)
1622
1623 def test_import(self):
1624 modules = (
1625 'multiprocessing', 'multiprocessing.connection',
1626 'multiprocessing.heap', 'multiprocessing.managers',
1627 'multiprocessing.pool', 'multiprocessing.process',
1628 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1629 'multiprocessing.synchronize', 'multiprocessing.util'
1630 )
1631
1632 for name in modules:
1633 __import__(name)
1634 mod = sys.modules[name]
1635
1636 for attr in getattr(mod, '__all__', ()):
1637 self.assertTrue(
1638 hasattr(mod, attr),
1639 '%r does not have attribute %r' % (mod, attr)
1640 )
1641
1642#
1643# Quick test that logging works -- does not test logging output
1644#
1645
1646class _TestLogging(BaseTestCase):
1647
1648 ALLOWED_TYPES = ('processes',)
1649
1650 def test_enable_logging(self):
1651 logger = multiprocessing.get_logger()
1652 logger.setLevel(util.SUBWARNING)
1653 self.assertTrue(logger is not None)
1654 logger.debug('this will not be printed')
1655 logger.info('nor will this')
1656 logger.setLevel(LOG_LEVEL)
1657
1658 def _test_level(self, conn):
1659 logger = multiprocessing.get_logger()
1660 conn.send(logger.getEffectiveLevel())
1661
1662 def test_level(self):
1663 LEVEL1 = 32
1664 LEVEL2 = 37
1665
1666 logger = multiprocessing.get_logger()
1667 root_logger = logging.getLogger()
1668 root_level = root_logger.level
1669
1670 reader, writer = multiprocessing.Pipe(duplex=False)
1671
1672 logger.setLevel(LEVEL1)
1673 self.Process(target=self._test_level, args=(writer,)).start()
1674 self.assertEqual(LEVEL1, reader.recv())
1675
1676 logger.setLevel(logging.NOTSET)
1677 root_logger.setLevel(LEVEL2)
1678 self.Process(target=self._test_level, args=(writer,)).start()
1679 self.assertEqual(LEVEL2, reader.recv())
1680
1681 root_logger.setLevel(root_level)
1682 logger.setLevel(level=LOG_LEVEL)
1683
1684#
1685# Functions used to create test cases from the base ones in this module
1686#
1687
1688def get_attributes(Source, names):
1689 d = {}
1690 for name in names:
1691 obj = getattr(Source, name)
1692 if type(obj) == type(get_attributes):
1693 obj = staticmethod(obj)
1694 d[name] = obj
1695 return d
1696
1697def create_test_cases(Mixin, type):
1698 result = {}
1699 glob = globals()
1700 Type = type[0].upper() + type[1:]
1701
1702 for name in glob.keys():
1703 if name.startswith('_Test'):
1704 base = glob[name]
1705 if type in base.ALLOWED_TYPES:
1706 newname = 'With' + Type + name[1:]
1707 class Temp(base, unittest.TestCase, Mixin):
1708 pass
1709 result[newname] = Temp
1710 Temp.__name__ = newname
1711 Temp.__module__ = Mixin.__module__
1712 return result
1713
1714#
1715# Create test cases
1716#
1717
1718class ProcessesMixin(object):
1719 TYPE = 'processes'
1720 Process = multiprocessing.Process
1721 locals().update(get_attributes(multiprocessing, (
1722 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1723 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1724 'RawArray', 'current_process', 'active_children', 'Pipe',
1725 'connection', 'JoinableQueue'
1726 )))
1727
1728testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1729globals().update(testcases_processes)
1730
1731
1732class ManagerMixin(object):
1733 TYPE = 'manager'
1734 Process = multiprocessing.Process
1735 manager = object.__new__(multiprocessing.managers.SyncManager)
1736 locals().update(get_attributes(manager, (
1737 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1738 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1739 'Namespace', 'JoinableQueue'
1740 )))
1741
1742testcases_manager = create_test_cases(ManagerMixin, type='manager')
1743globals().update(testcases_manager)
1744
1745
1746class ThreadsMixin(object):
1747 TYPE = 'threads'
1748 Process = multiprocessing.dummy.Process
1749 locals().update(get_attributes(multiprocessing.dummy, (
1750 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1751 'Condition', 'Event', 'Value', 'Array', 'current_process',
1752 'active_children', 'Pipe', 'connection', 'dict', 'list',
1753 'Namespace', 'JoinableQueue'
1754 )))
1755
1756testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1757globals().update(testcases_threads)
1758
Neal Norwitz0c519b32008-08-25 01:50:24 +00001759class OtherTest(unittest.TestCase):
1760 # TODO: add more tests for deliver/answer challenge.
1761 def test_deliver_challenge_auth_failure(self):
1762 class _FakeConnection(object):
1763 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001764 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001765 def send_bytes(self, data):
1766 pass
1767 self.assertRaises(multiprocessing.AuthenticationError,
1768 multiprocessing.connection.deliver_challenge,
1769 _FakeConnection(), b'abc')
1770
1771 def test_answer_challenge_auth_failure(self):
1772 class _FakeConnection(object):
1773 def __init__(self):
1774 self.count = 0
1775 def recv_bytes(self, size):
1776 self.count += 1
1777 if self.count == 1:
1778 return multiprocessing.connection.CHALLENGE
1779 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001780 return b'something bogus'
1781 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001782 def send_bytes(self, data):
1783 pass
1784 self.assertRaises(multiprocessing.AuthenticationError,
1785 multiprocessing.connection.answer_challenge,
1786 _FakeConnection(), b'abc')
1787
1788testcases_other = [OtherTest]
1789
Benjamin Petersondfd79492008-06-13 19:13:39 +00001790#
1791#
1792#
1793
1794def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001795 if sys.platform.startswith("linux"):
1796 try:
1797 lock = multiprocessing.RLock()
1798 except OSError:
1799 from test.test_support import TestSkipped
1800 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001801
Benjamin Petersondfd79492008-06-13 19:13:39 +00001802 if run is None:
1803 from test.test_support import run_unittest as run
1804
1805 util.get_temp_dir() # creates temp directory for use by all processes
1806
1807 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1808
Jesse Noller146b7ab2008-07-02 16:44:09 +00001809 ProcessesMixin.pool = multiprocessing.Pool(4)
1810 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1811 ManagerMixin.manager.__init__()
1812 ManagerMixin.manager.start()
1813 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001814
1815 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001816 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1817 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00001818 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1819 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00001820 )
1821
1822 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1823 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1824 run(suite)
1825
Jesse Noller146b7ab2008-07-02 16:44:09 +00001826 ThreadsMixin.pool.terminate()
1827 ProcessesMixin.pool.terminate()
1828 ManagerMixin.pool.terminate()
1829 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001830
Jesse Noller146b7ab2008-07-02 16:44:09 +00001831 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001832
1833def main():
1834 test_main(unittest.TextTestRunner(verbosity=2).run)
1835
1836if __name__ == '__main__':
1837 main()