blob: b13d1daee20a8a848c3ff6866715b4e17e570192 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
21import multiprocessing.dummy
22import multiprocessing.connection
23import multiprocessing.managers
24import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000025import multiprocessing.pool
26import _multiprocessing
27
28from multiprocessing import util
29
30#
31#
32#
33
Benjamin Petersone79edf52008-07-13 18:34:58 +000034latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Benjamin Petersondfd79492008-06-13 19:13:39 +000036#
37# Constants
38#
39
40LOG_LEVEL = util.SUBWARNING
41#LOG_LEVEL = logging.WARNING
42
43DELTA = 0.1
44CHECK_TIMINGS = False # making true makes tests take a lot longer
45 # and can sometimes cause some non-serious
46 # failures because some calls block a bit
47 # longer than expected
48if CHECK_TIMINGS:
49 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
50else:
51 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
52
53HAVE_GETVALUE = not getattr(_multiprocessing,
54 'HAVE_BROKEN_SEM_GETVALUE', False)
55
56#
57# Creates a wrapper for a function which records the time it takes to finish
58#
59
60class TimingWrapper(object):
61
62 def __init__(self, func):
63 self.func = func
64 self.elapsed = None
65
66 def __call__(self, *args, **kwds):
67 t = time.time()
68 try:
69 return self.func(*args, **kwds)
70 finally:
71 self.elapsed = time.time() - t
72
73#
74# Base class for test cases
75#
76
77class BaseTestCase(object):
78
79 ALLOWED_TYPES = ('processes', 'manager', 'threads')
80
81 def assertTimingAlmostEqual(self, a, b):
82 if CHECK_TIMINGS:
83 self.assertAlmostEqual(a, b, 1)
84
85 def assertReturnsIfImplemented(self, value, func, *args):
86 try:
87 res = func(*args)
88 except NotImplementedError:
89 pass
90 else:
91 return self.assertEqual(value, res)
92
93#
94# Return the value of a semaphore
95#
96
97def get_value(self):
98 try:
99 return self.get_value()
100 except AttributeError:
101 try:
102 return self._Semaphore__value
103 except AttributeError:
104 try:
105 return self._value
106 except AttributeError:
107 raise NotImplementedError
108
109#
110# Testcases
111#
112
113class _TestProcess(BaseTestCase):
114
115 ALLOWED_TYPES = ('processes', 'threads')
116
117 def test_current(self):
118 if self.TYPE == 'threads':
119 return
120
121 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000122 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000123
124 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000125 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000126 self.assertTrue(isinstance(authkey, bytes))
127 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000128 self.assertEqual(current.ident, os.getpid())
129 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000130
131 def _test(self, q, *args, **kwds):
132 current = self.current_process()
133 q.put(args)
134 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000135 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000136 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000137 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000138 q.put(current.pid)
139
140 def test_process(self):
141 q = self.Queue(1)
142 e = self.Event()
143 args = (q, 1, 2)
144 kwargs = {'hello':23, 'bye':2.54}
145 name = 'SomeProcess'
146 p = self.Process(
147 target=self._test, args=args, kwargs=kwargs, name=name
148 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000149 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000150 current = self.current_process()
151
152 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000153 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000154 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000155 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000156 self.assertTrue(p not in self.active_children())
157 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000158 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159
160 p.start()
161
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000162 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000163 self.assertEquals(p.is_alive(), True)
164 self.assertTrue(p in self.active_children())
165
166 self.assertEquals(q.get(), args[1:])
167 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000169 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 self.assertEquals(q.get(), p.pid)
172
173 p.join()
174
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000175 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000176 self.assertEquals(p.is_alive(), False)
177 self.assertTrue(p not in self.active_children())
178
179 def _test_terminate(self):
180 time.sleep(1000)
181
182 def test_terminate(self):
183 if self.TYPE == 'threads':
184 return
185
186 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188 p.start()
189
190 self.assertEqual(p.is_alive(), True)
191 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000192 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000193
194 p.terminate()
195
196 join = TimingWrapper(p.join)
197 self.assertEqual(join(), None)
198 self.assertTimingAlmostEqual(join.elapsed, 0.0)
199
200 self.assertEqual(p.is_alive(), False)
201 self.assertTrue(p not in self.active_children())
202
203 p.join()
204
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000205 # XXX sometimes get p.exitcode == 0 on Windows ...
206 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000207
208 def test_cpu_count(self):
209 try:
210 cpus = multiprocessing.cpu_count()
211 except NotImplementedError:
212 cpus = 1
213 self.assertTrue(type(cpus) is int)
214 self.assertTrue(cpus >= 1)
215
216 def test_active_children(self):
217 self.assertEqual(type(self.active_children()), list)
218
219 p = self.Process(target=time.sleep, args=(DELTA,))
220 self.assertTrue(p not in self.active_children())
221
222 p.start()
223 self.assertTrue(p in self.active_children())
224
225 p.join()
226 self.assertTrue(p not in self.active_children())
227
228 def _test_recursion(self, wconn, id):
229 from multiprocessing import forking
230 wconn.send(id)
231 if len(id) < 2:
232 for i in range(2):
233 p = self.Process(
234 target=self._test_recursion, args=(wconn, id+[i])
235 )
236 p.start()
237 p.join()
238
239 def test_recursion(self):
240 rconn, wconn = self.Pipe(duplex=False)
241 self._test_recursion(wconn, [])
242
243 time.sleep(DELTA)
244 result = []
245 while rconn.poll():
246 result.append(rconn.recv())
247
248 expected = [
249 [],
250 [0],
251 [0, 0],
252 [0, 1],
253 [1],
254 [1, 0],
255 [1, 1]
256 ]
257 self.assertEqual(result, expected)
258
259#
260#
261#
262
263class _UpperCaser(multiprocessing.Process):
264
265 def __init__(self):
266 multiprocessing.Process.__init__(self)
267 self.child_conn, self.parent_conn = multiprocessing.Pipe()
268
269 def run(self):
270 self.parent_conn.close()
271 for s in iter(self.child_conn.recv, None):
272 self.child_conn.send(s.upper())
273 self.child_conn.close()
274
275 def submit(self, s):
276 assert type(s) is str
277 self.parent_conn.send(s)
278 return self.parent_conn.recv()
279
280 def stop(self):
281 self.parent_conn.send(None)
282 self.parent_conn.close()
283 self.child_conn.close()
284
285class _TestSubclassingProcess(BaseTestCase):
286
287 ALLOWED_TYPES = ('processes',)
288
289 def test_subclassing(self):
290 uppercaser = _UpperCaser()
291 uppercaser.start()
292 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
293 self.assertEqual(uppercaser.submit('world'), 'WORLD')
294 uppercaser.stop()
295 uppercaser.join()
296
297#
298#
299#
300
301def queue_empty(q):
302 if hasattr(q, 'empty'):
303 return q.empty()
304 else:
305 return q.qsize() == 0
306
307def queue_full(q, maxsize):
308 if hasattr(q, 'full'):
309 return q.full()
310 else:
311 return q.qsize() == maxsize
312
313
314class _TestQueue(BaseTestCase):
315
316
317 def _test_put(self, queue, child_can_start, parent_can_continue):
318 child_can_start.wait()
319 for i in range(6):
320 queue.get()
321 parent_can_continue.set()
322
323 def test_put(self):
324 MAXSIZE = 6
325 queue = self.Queue(maxsize=MAXSIZE)
326 child_can_start = self.Event()
327 parent_can_continue = self.Event()
328
329 proc = self.Process(
330 target=self._test_put,
331 args=(queue, child_can_start, parent_can_continue)
332 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000333 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000334 proc.start()
335
336 self.assertEqual(queue_empty(queue), True)
337 self.assertEqual(queue_full(queue, MAXSIZE), False)
338
339 queue.put(1)
340 queue.put(2, True)
341 queue.put(3, True, None)
342 queue.put(4, False)
343 queue.put(5, False, None)
344 queue.put_nowait(6)
345
346 # the values may be in buffer but not yet in pipe so sleep a bit
347 time.sleep(DELTA)
348
349 self.assertEqual(queue_empty(queue), False)
350 self.assertEqual(queue_full(queue, MAXSIZE), True)
351
352 put = TimingWrapper(queue.put)
353 put_nowait = TimingWrapper(queue.put_nowait)
354
355 self.assertRaises(Queue.Full, put, 7, False)
356 self.assertTimingAlmostEqual(put.elapsed, 0)
357
358 self.assertRaises(Queue.Full, put, 7, False, None)
359 self.assertTimingAlmostEqual(put.elapsed, 0)
360
361 self.assertRaises(Queue.Full, put_nowait, 7)
362 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
363
364 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
365 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
366
367 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
371 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
372
373 child_can_start.set()
374 parent_can_continue.wait()
375
376 self.assertEqual(queue_empty(queue), True)
377 self.assertEqual(queue_full(queue, MAXSIZE), False)
378
379 proc.join()
380
381 def _test_get(self, queue, child_can_start, parent_can_continue):
382 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000383 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000384 queue.put(2)
385 queue.put(3)
386 queue.put(4)
387 queue.put(5)
388 parent_can_continue.set()
389
390 def test_get(self):
391 queue = self.Queue()
392 child_can_start = self.Event()
393 parent_can_continue = self.Event()
394
395 proc = self.Process(
396 target=self._test_get,
397 args=(queue, child_can_start, parent_can_continue)
398 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000399 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000400 proc.start()
401
402 self.assertEqual(queue_empty(queue), True)
403
404 child_can_start.set()
405 parent_can_continue.wait()
406
407 time.sleep(DELTA)
408 self.assertEqual(queue_empty(queue), False)
409
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000410 # Hangs unexpectedly, remove for now
411 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000412 self.assertEqual(queue.get(True, None), 2)
413 self.assertEqual(queue.get(True), 3)
414 self.assertEqual(queue.get(timeout=1), 4)
415 self.assertEqual(queue.get_nowait(), 5)
416
417 self.assertEqual(queue_empty(queue), True)
418
419 get = TimingWrapper(queue.get)
420 get_nowait = TimingWrapper(queue.get_nowait)
421
422 self.assertRaises(Queue.Empty, get, False)
423 self.assertTimingAlmostEqual(get.elapsed, 0)
424
425 self.assertRaises(Queue.Empty, get, False, None)
426 self.assertTimingAlmostEqual(get.elapsed, 0)
427
428 self.assertRaises(Queue.Empty, get_nowait)
429 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
430
431 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
432 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
433
434 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
438 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
439
440 proc.join()
441
442 def _test_fork(self, queue):
443 for i in range(10, 20):
444 queue.put(i)
445 # note that at this point the items may only be buffered, so the
446 # process cannot shutdown until the feeder thread has finished
447 # pushing items onto the pipe.
448
449 def test_fork(self):
450 # Old versions of Queue would fail to create a new feeder
451 # thread for a forked process if the original process had its
452 # own feeder thread. This test checks that this no longer
453 # happens.
454
455 queue = self.Queue()
456
457 # put items on queue so that main process starts a feeder thread
458 for i in range(10):
459 queue.put(i)
460
461 # wait to make sure thread starts before we fork a new process
462 time.sleep(DELTA)
463
464 # fork process
465 p = self.Process(target=self._test_fork, args=(queue,))
466 p.start()
467
468 # check that all expected items are in the queue
469 for i in range(20):
470 self.assertEqual(queue.get(), i)
471 self.assertRaises(Queue.Empty, queue.get, False)
472
473 p.join()
474
475 def test_qsize(self):
476 q = self.Queue()
477 try:
478 self.assertEqual(q.qsize(), 0)
479 except NotImplementedError:
480 return
481 q.put(1)
482 self.assertEqual(q.qsize(), 1)
483 q.put(5)
484 self.assertEqual(q.qsize(), 2)
485 q.get()
486 self.assertEqual(q.qsize(), 1)
487 q.get()
488 self.assertEqual(q.qsize(), 0)
489
490 def _test_task_done(self, q):
491 for obj in iter(q.get, None):
492 time.sleep(DELTA)
493 q.task_done()
494
495 def test_task_done(self):
496 queue = self.JoinableQueue()
497
498 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
499 return
500
501 workers = [self.Process(target=self._test_task_done, args=(queue,))
502 for i in xrange(4)]
503
504 for p in workers:
505 p.start()
506
507 for i in xrange(10):
508 queue.put(i)
509
510 queue.join()
511
512 for p in workers:
513 queue.put(None)
514
515 for p in workers:
516 p.join()
517
518#
519#
520#
521
522class _TestLock(BaseTestCase):
523
524 def test_lock(self):
525 lock = self.Lock()
526 self.assertEqual(lock.acquire(), True)
527 self.assertEqual(lock.acquire(False), False)
528 self.assertEqual(lock.release(), None)
529 self.assertRaises((ValueError, threading.ThreadError), lock.release)
530
531 def test_rlock(self):
532 lock = self.RLock()
533 self.assertEqual(lock.acquire(), True)
534 self.assertEqual(lock.acquire(), True)
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.release(), None)
537 self.assertEqual(lock.release(), None)
538 self.assertEqual(lock.release(), None)
539 self.assertRaises((AssertionError, RuntimeError), lock.release)
540
541
542class _TestSemaphore(BaseTestCase):
543
544 def _test_semaphore(self, sem):
545 self.assertReturnsIfImplemented(2, get_value, sem)
546 self.assertEqual(sem.acquire(), True)
547 self.assertReturnsIfImplemented(1, get_value, sem)
548 self.assertEqual(sem.acquire(), True)
549 self.assertReturnsIfImplemented(0, get_value, sem)
550 self.assertEqual(sem.acquire(False), False)
551 self.assertReturnsIfImplemented(0, get_value, sem)
552 self.assertEqual(sem.release(), None)
553 self.assertReturnsIfImplemented(1, get_value, sem)
554 self.assertEqual(sem.release(), None)
555 self.assertReturnsIfImplemented(2, get_value, sem)
556
557 def test_semaphore(self):
558 sem = self.Semaphore(2)
559 self._test_semaphore(sem)
560 self.assertEqual(sem.release(), None)
561 self.assertReturnsIfImplemented(3, get_value, sem)
562 self.assertEqual(sem.release(), None)
563 self.assertReturnsIfImplemented(4, get_value, sem)
564
565 def test_bounded_semaphore(self):
566 sem = self.BoundedSemaphore(2)
567 self._test_semaphore(sem)
568 # Currently fails on OS/X
569 #if HAVE_GETVALUE:
570 # self.assertRaises(ValueError, sem.release)
571 # self.assertReturnsIfImplemented(2, get_value, sem)
572
573 def test_timeout(self):
574 if self.TYPE != 'processes':
575 return
576
577 sem = self.Semaphore(0)
578 acquire = TimingWrapper(sem.acquire)
579
580 self.assertEqual(acquire(False), False)
581 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
582
583 self.assertEqual(acquire(False, None), False)
584 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
585
586 self.assertEqual(acquire(False, TIMEOUT1), False)
587 self.assertTimingAlmostEqual(acquire.elapsed, 0)
588
589 self.assertEqual(acquire(True, TIMEOUT2), False)
590 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
591
592 self.assertEqual(acquire(timeout=TIMEOUT3), False)
593 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
594
595
596class _TestCondition(BaseTestCase):
597
598 def f(self, cond, sleeping, woken, timeout=None):
599 cond.acquire()
600 sleeping.release()
601 cond.wait(timeout)
602 woken.release()
603 cond.release()
604
605 def check_invariant(self, cond):
606 # this is only supposed to succeed when there are no sleepers
607 if self.TYPE == 'processes':
608 try:
609 sleepers = (cond._sleeping_count.get_value() -
610 cond._woken_count.get_value())
611 self.assertEqual(sleepers, 0)
612 self.assertEqual(cond._wait_semaphore.get_value(), 0)
613 except NotImplementedError:
614 pass
615
616 def test_notify(self):
617 cond = self.Condition()
618 sleeping = self.Semaphore(0)
619 woken = self.Semaphore(0)
620
621 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000622 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000623 p.start()
624
625 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000626 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000627 p.start()
628
629 # wait for both children to start sleeping
630 sleeping.acquire()
631 sleeping.acquire()
632
633 # check no process/thread has woken up
634 time.sleep(DELTA)
635 self.assertReturnsIfImplemented(0, get_value, woken)
636
637 # wake up one process/thread
638 cond.acquire()
639 cond.notify()
640 cond.release()
641
642 # check one process/thread has woken up
643 time.sleep(DELTA)
644 self.assertReturnsIfImplemented(1, get_value, woken)
645
646 # wake up another
647 cond.acquire()
648 cond.notify()
649 cond.release()
650
651 # check other has woken up
652 time.sleep(DELTA)
653 self.assertReturnsIfImplemented(2, get_value, woken)
654
655 # check state is not mucked up
656 self.check_invariant(cond)
657 p.join()
658
659 def test_notify_all(self):
660 cond = self.Condition()
661 sleeping = self.Semaphore(0)
662 woken = self.Semaphore(0)
663
664 # start some threads/processes which will timeout
665 for i in range(3):
666 p = self.Process(target=self.f,
667 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000668 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000669 p.start()
670
671 t = threading.Thread(target=self.f,
672 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000673 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000674 t.start()
675
676 # wait for them all to sleep
677 for i in xrange(6):
678 sleeping.acquire()
679
680 # check they have all timed out
681 for i in xrange(6):
682 woken.acquire()
683 self.assertReturnsIfImplemented(0, get_value, woken)
684
685 # check state is not mucked up
686 self.check_invariant(cond)
687
688 # start some more threads/processes
689 for i in range(3):
690 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000691 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000692 p.start()
693
694 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000695 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000696 t.start()
697
698 # wait for them to all sleep
699 for i in xrange(6):
700 sleeping.acquire()
701
702 # check no process/thread has woken up
703 time.sleep(DELTA)
704 self.assertReturnsIfImplemented(0, get_value, woken)
705
706 # wake them all up
707 cond.acquire()
708 cond.notify_all()
709 cond.release()
710
711 # check they have all woken
712 time.sleep(DELTA)
713 self.assertReturnsIfImplemented(6, get_value, woken)
714
715 # check state is not mucked up
716 self.check_invariant(cond)
717
718 def test_timeout(self):
719 cond = self.Condition()
720 wait = TimingWrapper(cond.wait)
721 cond.acquire()
722 res = wait(TIMEOUT1)
723 cond.release()
724 self.assertEqual(res, None)
725 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
726
727
728class _TestEvent(BaseTestCase):
729
730 def _test_event(self, event):
731 time.sleep(TIMEOUT2)
732 event.set()
733
734 def test_event(self):
735 event = self.Event()
736 wait = TimingWrapper(event.wait)
737
738 # Removed temporaily, due to API shear, this does not
739 # work with threading._Event objects. is_set == isSet
740 #self.assertEqual(event.is_set(), False)
741
742 self.assertEqual(wait(0.0), None)
743 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
744 self.assertEqual(wait(TIMEOUT1), None)
745 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
746
747 event.set()
748
749 # See note above on the API differences
750 # self.assertEqual(event.is_set(), True)
751 self.assertEqual(wait(), None)
752 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
753 self.assertEqual(wait(TIMEOUT1), None)
754 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
755 # self.assertEqual(event.is_set(), True)
756
757 event.clear()
758
759 #self.assertEqual(event.is_set(), False)
760
761 self.Process(target=self._test_event, args=(event,)).start()
762 self.assertEqual(wait(), None)
763
764#
765#
766#
767
768class _TestValue(BaseTestCase):
769
770 codes_values = [
771 ('i', 4343, 24234),
772 ('d', 3.625, -4.25),
773 ('h', -232, 234),
774 ('c', latin('x'), latin('y'))
775 ]
776
777 def _test(self, values):
778 for sv, cv in zip(values, self.codes_values):
779 sv.value = cv[2]
780
781
782 def test_value(self, raw=False):
783 if self.TYPE != 'processes':
784 return
785
786 if raw:
787 values = [self.RawValue(code, value)
788 for code, value, _ in self.codes_values]
789 else:
790 values = [self.Value(code, value)
791 for code, value, _ in self.codes_values]
792
793 for sv, cv in zip(values, self.codes_values):
794 self.assertEqual(sv.value, cv[1])
795
796 proc = self.Process(target=self._test, args=(values,))
797 proc.start()
798 proc.join()
799
800 for sv, cv in zip(values, self.codes_values):
801 self.assertEqual(sv.value, cv[2])
802
803 def test_rawvalue(self):
804 self.test_value(raw=True)
805
806 def test_getobj_getlock(self):
807 if self.TYPE != 'processes':
808 return
809
810 val1 = self.Value('i', 5)
811 lock1 = val1.get_lock()
812 obj1 = val1.get_obj()
813
814 val2 = self.Value('i', 5, lock=None)
815 lock2 = val2.get_lock()
816 obj2 = val2.get_obj()
817
818 lock = self.Lock()
819 val3 = self.Value('i', 5, lock=lock)
820 lock3 = val3.get_lock()
821 obj3 = val3.get_obj()
822 self.assertEqual(lock, lock3)
823
824 arr4 = self.RawValue('i', 5)
825 self.assertFalse(hasattr(arr4, 'get_lock'))
826 self.assertFalse(hasattr(arr4, 'get_obj'))
827
828
829class _TestArray(BaseTestCase):
830
831 def f(self, seq):
832 for i in range(1, len(seq)):
833 seq[i] += seq[i-1]
834
835 def test_array(self, raw=False):
836 if self.TYPE != 'processes':
837 return
838
839 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
840 if raw:
841 arr = self.RawArray('i', seq)
842 else:
843 arr = self.Array('i', seq)
844
845 self.assertEqual(len(arr), len(seq))
846 self.assertEqual(arr[3], seq[3])
847 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
848
849 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
850
851 self.assertEqual(list(arr[:]), seq)
852
853 self.f(seq)
854
855 p = self.Process(target=self.f, args=(arr,))
856 p.start()
857 p.join()
858
859 self.assertEqual(list(arr[:]), seq)
860
861 def test_rawarray(self):
862 self.test_array(raw=True)
863
864 def test_getobj_getlock_obj(self):
865 if self.TYPE != 'processes':
866 return
867
868 arr1 = self.Array('i', range(10))
869 lock1 = arr1.get_lock()
870 obj1 = arr1.get_obj()
871
872 arr2 = self.Array('i', range(10), lock=None)
873 lock2 = arr2.get_lock()
874 obj2 = arr2.get_obj()
875
876 lock = self.Lock()
877 arr3 = self.Array('i', range(10), lock=lock)
878 lock3 = arr3.get_lock()
879 obj3 = arr3.get_obj()
880 self.assertEqual(lock, lock3)
881
882 arr4 = self.RawArray('i', range(10))
883 self.assertFalse(hasattr(arr4, 'get_lock'))
884 self.assertFalse(hasattr(arr4, 'get_obj'))
885
886#
887#
888#
889
890class _TestContainers(BaseTestCase):
891
892 ALLOWED_TYPES = ('manager',)
893
894 def test_list(self):
895 a = self.list(range(10))
896 self.assertEqual(a[:], range(10))
897
898 b = self.list()
899 self.assertEqual(b[:], [])
900
901 b.extend(range(5))
902 self.assertEqual(b[:], range(5))
903
904 self.assertEqual(b[2], 2)
905 self.assertEqual(b[2:10], [2,3,4])
906
907 b *= 2
908 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
909
910 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
911
912 self.assertEqual(a[:], range(10))
913
914 d = [a, b]
915 e = self.list(d)
916 self.assertEqual(
917 e[:],
918 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
919 )
920
921 f = self.list([a])
922 a.append('hello')
923 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
924
925 def test_dict(self):
926 d = self.dict()
927 indices = range(65, 70)
928 for i in indices:
929 d[i] = chr(i)
930 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
931 self.assertEqual(sorted(d.keys()), indices)
932 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
933 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
934
935 def test_namespace(self):
936 n = self.Namespace()
937 n.name = 'Bob'
938 n.job = 'Builder'
939 n._hidden = 'hidden'
940 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
941 del n.job
942 self.assertEqual(str(n), "Namespace(name='Bob')")
943 self.assertTrue(hasattr(n, 'name'))
944 self.assertTrue(not hasattr(n, 'job'))
945
946#
947#
948#
949
950def sqr(x, wait=0.0):
951 time.sleep(wait)
952 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000953class _TestPool(BaseTestCase):
954
955 def test_apply(self):
956 papply = self.pool.apply
957 self.assertEqual(papply(sqr, (5,)), sqr(5))
958 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
959
960 def test_map(self):
961 pmap = self.pool.map
962 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
963 self.assertEqual(pmap(sqr, range(100), chunksize=20),
964 map(sqr, range(100)))
965
966 def test_async(self):
967 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
968 get = TimingWrapper(res.get)
969 self.assertEqual(get(), 49)
970 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
971
972 def test_async_timeout(self):
973 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
974 get = TimingWrapper(res.get)
975 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
976 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
977
978 def test_imap(self):
979 it = self.pool.imap(sqr, range(10))
980 self.assertEqual(list(it), map(sqr, range(10)))
981
982 it = self.pool.imap(sqr, range(10))
983 for i in range(10):
984 self.assertEqual(it.next(), i*i)
985 self.assertRaises(StopIteration, it.next)
986
987 it = self.pool.imap(sqr, range(1000), chunksize=100)
988 for i in range(1000):
989 self.assertEqual(it.next(), i*i)
990 self.assertRaises(StopIteration, it.next)
991
992 def test_imap_unordered(self):
993 it = self.pool.imap_unordered(sqr, range(1000))
994 self.assertEqual(sorted(it), map(sqr, range(1000)))
995
996 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
997 self.assertEqual(sorted(it), map(sqr, range(1000)))
998
999 def test_make_pool(self):
1000 p = multiprocessing.Pool(3)
1001 self.assertEqual(3, len(p._pool))
1002 p.close()
1003 p.join()
1004
1005 def test_terminate(self):
1006 if self.TYPE == 'manager':
1007 # On Unix a forked process increfs each shared object to
1008 # which its parent process held a reference. If the
1009 # forked process gets terminated then there is likely to
1010 # be a reference leak. So to prevent
1011 # _TestZZZNumberOfObjects from failing we skip this test
1012 # when using a manager.
1013 return
1014
1015 result = self.pool.map_async(
1016 time.sleep, [0.1 for i in range(10000)], chunksize=1
1017 )
1018 self.pool.terminate()
1019 join = TimingWrapper(self.pool.join)
1020 join()
1021 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001022#
1023# Test that manager has expected number of shared objects left
1024#
1025
1026class _TestZZZNumberOfObjects(BaseTestCase):
1027 # Because test cases are sorted alphabetically, this one will get
1028 # run after all the other tests for the manager. It tests that
1029 # there have been no "reference leaks" for the manager's shared
1030 # objects. Note the comment in _TestPool.test_terminate().
1031 ALLOWED_TYPES = ('manager',)
1032
1033 def test_number_of_objects(self):
1034 EXPECTED_NUMBER = 1 # the pool object is still alive
1035 multiprocessing.active_children() # discard dead process objs
1036 gc.collect() # do garbage collection
1037 refs = self.manager._number_of_objects()
1038 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001039 print self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001040
1041 self.assertEqual(refs, EXPECTED_NUMBER)
1042
1043#
1044# Test of creating a customized manager class
1045#
1046
1047from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1048
1049class FooBar(object):
1050 def f(self):
1051 return 'f()'
1052 def g(self):
1053 raise ValueError
1054 def _h(self):
1055 return '_h()'
1056
1057def baz():
1058 for i in xrange(10):
1059 yield i*i
1060
1061class IteratorProxy(BaseProxy):
1062 _exposed_ = ('next', '__next__')
1063 def __iter__(self):
1064 return self
1065 def next(self):
1066 return self._callmethod('next')
1067 def __next__(self):
1068 return self._callmethod('__next__')
1069
1070class MyManager(BaseManager):
1071 pass
1072
1073MyManager.register('Foo', callable=FooBar)
1074MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1075MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1076
1077
1078class _TestMyManager(BaseTestCase):
1079
1080 ALLOWED_TYPES = ('manager',)
1081
1082 def test_mymanager(self):
1083 manager = MyManager()
1084 manager.start()
1085
1086 foo = manager.Foo()
1087 bar = manager.Bar()
1088 baz = manager.baz()
1089
1090 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1091 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1092
1093 self.assertEqual(foo_methods, ['f', 'g'])
1094 self.assertEqual(bar_methods, ['f', '_h'])
1095
1096 self.assertEqual(foo.f(), 'f()')
1097 self.assertRaises(ValueError, foo.g)
1098 self.assertEqual(foo._callmethod('f'), 'f()')
1099 self.assertRaises(RemoteError, foo._callmethod, '_h')
1100
1101 self.assertEqual(bar.f(), 'f()')
1102 self.assertEqual(bar._h(), '_h()')
1103 self.assertEqual(bar._callmethod('f'), 'f()')
1104 self.assertEqual(bar._callmethod('_h'), '_h()')
1105
1106 self.assertEqual(list(baz), [i*i for i in range(10)])
1107
1108 manager.shutdown()
1109
1110#
1111# Test of connecting to a remote server and using xmlrpclib for serialization
1112#
1113
1114_queue = Queue.Queue()
1115def get_queue():
1116 return _queue
1117
1118class QueueManager(BaseManager):
1119 '''manager class used by server process'''
1120QueueManager.register('get_queue', callable=get_queue)
1121
1122class QueueManager2(BaseManager):
1123 '''manager class which specifies the same interface as QueueManager'''
1124QueueManager2.register('get_queue')
1125
1126
1127SERIALIZER = 'xmlrpclib'
1128
1129class _TestRemoteManager(BaseTestCase):
1130
1131 ALLOWED_TYPES = ('manager',)
1132
1133 def _putter(self, address, authkey):
1134 manager = QueueManager2(
1135 address=address, authkey=authkey, serializer=SERIALIZER
1136 )
1137 manager.connect()
1138 queue = manager.get_queue()
1139 queue.put(('hello world', None, True, 2.25))
1140
1141 def test_remote(self):
1142 authkey = os.urandom(32)
1143
1144 manager = QueueManager(
1145 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1146 )
1147 manager.start()
1148
1149 p = self.Process(target=self._putter, args=(manager.address, authkey))
1150 p.start()
1151
1152 manager2 = QueueManager2(
1153 address=manager.address, authkey=authkey, serializer=SERIALIZER
1154 )
1155 manager2.connect()
1156 queue = manager2.get_queue()
1157
1158 # Note that xmlrpclib will deserialize object as a list not a tuple
1159 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1160
1161 # Because we are using xmlrpclib for serialization instead of
1162 # pickle this will cause a serialization error.
1163 self.assertRaises(Exception, queue.put, time.sleep)
1164
1165 # Make queue finalizer run before the server is stopped
1166 del queue
1167 manager.shutdown()
1168
1169#
1170#
1171#
1172
1173SENTINEL = latin('')
1174
1175class _TestConnection(BaseTestCase):
1176
1177 ALLOWED_TYPES = ('processes', 'threads')
1178
1179 def _echo(self, conn):
1180 for msg in iter(conn.recv_bytes, SENTINEL):
1181 conn.send_bytes(msg)
1182 conn.close()
1183
1184 def test_connection(self):
1185 conn, child_conn = self.Pipe()
1186
1187 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001188 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001189 p.start()
1190
1191 seq = [1, 2.25, None]
1192 msg = latin('hello world')
1193 longmsg = msg * 10
1194 arr = array.array('i', range(4))
1195
1196 if self.TYPE == 'processes':
1197 self.assertEqual(type(conn.fileno()), int)
1198
1199 self.assertEqual(conn.send(seq), None)
1200 self.assertEqual(conn.recv(), seq)
1201
1202 self.assertEqual(conn.send_bytes(msg), None)
1203 self.assertEqual(conn.recv_bytes(), msg)
1204
1205 if self.TYPE == 'processes':
1206 buffer = array.array('i', [0]*10)
1207 expected = list(arr) + [0] * (10 - len(arr))
1208 self.assertEqual(conn.send_bytes(arr), None)
1209 self.assertEqual(conn.recv_bytes_into(buffer),
1210 len(arr) * buffer.itemsize)
1211 self.assertEqual(list(buffer), expected)
1212
1213 buffer = array.array('i', [0]*10)
1214 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1215 self.assertEqual(conn.send_bytes(arr), None)
1216 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1217 len(arr) * buffer.itemsize)
1218 self.assertEqual(list(buffer), expected)
1219
1220 buffer = bytearray(latin(' ' * 40))
1221 self.assertEqual(conn.send_bytes(longmsg), None)
1222 try:
1223 res = conn.recv_bytes_into(buffer)
1224 except multiprocessing.BufferTooShort, e:
1225 self.assertEqual(e.args, (longmsg,))
1226 else:
1227 self.fail('expected BufferTooShort, got %s' % res)
1228
1229 poll = TimingWrapper(conn.poll)
1230
1231 self.assertEqual(poll(), False)
1232 self.assertTimingAlmostEqual(poll.elapsed, 0)
1233
1234 self.assertEqual(poll(TIMEOUT1), False)
1235 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1236
1237 conn.send(None)
1238
1239 self.assertEqual(poll(TIMEOUT1), True)
1240 self.assertTimingAlmostEqual(poll.elapsed, 0)
1241
1242 self.assertEqual(conn.recv(), None)
1243
1244 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1245 conn.send_bytes(really_big_msg)
1246 self.assertEqual(conn.recv_bytes(), really_big_msg)
1247
1248 conn.send_bytes(SENTINEL) # tell child to quit
1249 child_conn.close()
1250
1251 if self.TYPE == 'processes':
1252 self.assertEqual(conn.readable, True)
1253 self.assertEqual(conn.writable, True)
1254 self.assertRaises(EOFError, conn.recv)
1255 self.assertRaises(EOFError, conn.recv_bytes)
1256
1257 p.join()
1258
1259 def test_duplex_false(self):
1260 reader, writer = self.Pipe(duplex=False)
1261 self.assertEqual(writer.send(1), None)
1262 self.assertEqual(reader.recv(), 1)
1263 if self.TYPE == 'processes':
1264 self.assertEqual(reader.readable, True)
1265 self.assertEqual(reader.writable, False)
1266 self.assertEqual(writer.readable, False)
1267 self.assertEqual(writer.writable, True)
1268 self.assertRaises(IOError, reader.send, 2)
1269 self.assertRaises(IOError, writer.recv)
1270 self.assertRaises(IOError, writer.poll)
1271
1272 def test_spawn_close(self):
1273 # We test that a pipe connection can be closed by parent
1274 # process immediately after child is spawned. On Windows this
1275 # would have sometimes failed on old versions because
1276 # child_conn would be closed before the child got a chance to
1277 # duplicate it.
1278 conn, child_conn = self.Pipe()
1279
1280 p = self.Process(target=self._echo, args=(child_conn,))
1281 p.start()
1282 child_conn.close() # this might complete before child initializes
1283
1284 msg = latin('hello')
1285 conn.send_bytes(msg)
1286 self.assertEqual(conn.recv_bytes(), msg)
1287
1288 conn.send_bytes(SENTINEL)
1289 conn.close()
1290 p.join()
1291
1292 def test_sendbytes(self):
1293 if self.TYPE != 'processes':
1294 return
1295
1296 msg = latin('abcdefghijklmnopqrstuvwxyz')
1297 a, b = self.Pipe()
1298
1299 a.send_bytes(msg)
1300 self.assertEqual(b.recv_bytes(), msg)
1301
1302 a.send_bytes(msg, 5)
1303 self.assertEqual(b.recv_bytes(), msg[5:])
1304
1305 a.send_bytes(msg, 7, 8)
1306 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1307
1308 a.send_bytes(msg, 26)
1309 self.assertEqual(b.recv_bytes(), latin(''))
1310
1311 a.send_bytes(msg, 26, 0)
1312 self.assertEqual(b.recv_bytes(), latin(''))
1313
1314 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1315
1316 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1317
1318 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1319
1320 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1321
1322 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1323
Benjamin Petersondfd79492008-06-13 19:13:39 +00001324class _TestListenerClient(BaseTestCase):
1325
1326 ALLOWED_TYPES = ('processes', 'threads')
1327
1328 def _test(self, address):
1329 conn = self.connection.Client(address)
1330 conn.send('hello')
1331 conn.close()
1332
1333 def test_listener_client(self):
1334 for family in self.connection.families:
1335 l = self.connection.Listener(family=family)
1336 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001337 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001338 p.start()
1339 conn = l.accept()
1340 self.assertEqual(conn.recv(), 'hello')
1341 p.join()
1342 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001343#
1344# Test of sending connection and socket objects between processes
1345#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001346"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001347class _TestPicklingConnections(BaseTestCase):
1348
1349 ALLOWED_TYPES = ('processes',)
1350
1351 def _listener(self, conn, families):
1352 for fam in families:
1353 l = self.connection.Listener(family=fam)
1354 conn.send(l.address)
1355 new_conn = l.accept()
1356 conn.send(new_conn)
1357
1358 if self.TYPE == 'processes':
1359 l = socket.socket()
1360 l.bind(('localhost', 0))
1361 conn.send(l.getsockname())
1362 l.listen(1)
1363 new_conn, addr = l.accept()
1364 conn.send(new_conn)
1365
1366 conn.recv()
1367
1368 def _remote(self, conn):
1369 for (address, msg) in iter(conn.recv, None):
1370 client = self.connection.Client(address)
1371 client.send(msg.upper())
1372 client.close()
1373
1374 if self.TYPE == 'processes':
1375 address, msg = conn.recv()
1376 client = socket.socket()
1377 client.connect(address)
1378 client.sendall(msg.upper())
1379 client.close()
1380
1381 conn.close()
1382
1383 def test_pickling(self):
1384 try:
1385 multiprocessing.allow_connection_pickling()
1386 except ImportError:
1387 return
1388
1389 families = self.connection.families
1390
1391 lconn, lconn0 = self.Pipe()
1392 lp = self.Process(target=self._listener, args=(lconn0, families))
1393 lp.start()
1394 lconn0.close()
1395
1396 rconn, rconn0 = self.Pipe()
1397 rp = self.Process(target=self._remote, args=(rconn0,))
1398 rp.start()
1399 rconn0.close()
1400
1401 for fam in families:
1402 msg = ('This connection uses family %s' % fam).encode('ascii')
1403 address = lconn.recv()
1404 rconn.send((address, msg))
1405 new_conn = lconn.recv()
1406 self.assertEqual(new_conn.recv(), msg.upper())
1407
1408 rconn.send(None)
1409
1410 if self.TYPE == 'processes':
1411 msg = latin('This connection uses a normal socket')
1412 address = lconn.recv()
1413 rconn.send((address, msg))
1414 if hasattr(socket, 'fromfd'):
1415 new_conn = lconn.recv()
1416 self.assertEqual(new_conn.recv(100), msg.upper())
1417 else:
1418 # XXX On Windows with Py2.6 need to backport fromfd()
1419 discard = lconn.recv_bytes()
1420
1421 lconn.send(None)
1422
1423 rconn.close()
1424 lconn.close()
1425
1426 lp.join()
1427 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001428"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001429#
1430#
1431#
1432
1433class _TestHeap(BaseTestCase):
1434
1435 ALLOWED_TYPES = ('processes',)
1436
1437 def test_heap(self):
1438 iterations = 5000
1439 maxblocks = 50
1440 blocks = []
1441
1442 # create and destroy lots of blocks of different sizes
1443 for i in xrange(iterations):
1444 size = int(random.lognormvariate(0, 1) * 1000)
1445 b = multiprocessing.heap.BufferWrapper(size)
1446 blocks.append(b)
1447 if len(blocks) > maxblocks:
1448 i = random.randrange(maxblocks)
1449 del blocks[i]
1450
1451 # get the heap object
1452 heap = multiprocessing.heap.BufferWrapper._heap
1453
1454 # verify the state of the heap
1455 all = []
1456 occupied = 0
1457 for L in heap._len_to_seq.values():
1458 for arena, start, stop in L:
1459 all.append((heap._arenas.index(arena), start, stop,
1460 stop-start, 'free'))
1461 for arena, start, stop in heap._allocated_blocks:
1462 all.append((heap._arenas.index(arena), start, stop,
1463 stop-start, 'occupied'))
1464 occupied += (stop-start)
1465
1466 all.sort()
1467
1468 for i in range(len(all)-1):
1469 (arena, start, stop) = all[i][:3]
1470 (narena, nstart, nstop) = all[i+1][:3]
1471 self.assertTrue((arena != narena and nstart == 0) or
1472 (stop == nstart))
1473
1474#
1475#
1476#
1477
1478try:
1479 from ctypes import Structure, Value, copy, c_int, c_double
1480except ImportError:
1481 Structure = object
1482 c_int = c_double = None
1483
1484class _Foo(Structure):
1485 _fields_ = [
1486 ('x', c_int),
1487 ('y', c_double)
1488 ]
1489
1490class _TestSharedCTypes(BaseTestCase):
1491
1492 ALLOWED_TYPES = ('processes',)
1493
1494 def _double(self, x, y, foo, arr, string):
1495 x.value *= 2
1496 y.value *= 2
1497 foo.x *= 2
1498 foo.y *= 2
1499 string.value *= 2
1500 for i in range(len(arr)):
1501 arr[i] *= 2
1502
1503 def test_sharedctypes(self, lock=False):
1504 if c_int is None:
1505 return
1506
1507 x = Value('i', 7, lock=lock)
1508 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1509 foo = Value(_Foo, 3, 2, lock=lock)
1510 arr = Array('d', range(10), lock=lock)
1511 string = Array('c', 20, lock=lock)
1512 string.value = 'hello'
1513
1514 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1515 p.start()
1516 p.join()
1517
1518 self.assertEqual(x.value, 14)
1519 self.assertAlmostEqual(y.value, 2.0/3.0)
1520 self.assertEqual(foo.x, 6)
1521 self.assertAlmostEqual(foo.y, 4.0)
1522 for i in range(10):
1523 self.assertAlmostEqual(arr[i], i*2)
1524 self.assertEqual(string.value, latin('hellohello'))
1525
1526 def test_synchronize(self):
1527 self.test_sharedctypes(lock=True)
1528
1529 def test_copy(self):
1530 if c_int is None:
1531 return
1532
1533 foo = _Foo(2, 5.0)
1534 bar = copy(foo)
1535 foo.x = 0
1536 foo.y = 0
1537 self.assertEqual(bar.x, 2)
1538 self.assertAlmostEqual(bar.y, 5.0)
1539
1540#
1541#
1542#
1543
1544class _TestFinalize(BaseTestCase):
1545
1546 ALLOWED_TYPES = ('processes',)
1547
1548 def _test_finalize(self, conn):
1549 class Foo(object):
1550 pass
1551
1552 a = Foo()
1553 util.Finalize(a, conn.send, args=('a',))
1554 del a # triggers callback for a
1555
1556 b = Foo()
1557 close_b = util.Finalize(b, conn.send, args=('b',))
1558 close_b() # triggers callback for b
1559 close_b() # does nothing because callback has already been called
1560 del b # does nothing because callback has already been called
1561
1562 c = Foo()
1563 util.Finalize(c, conn.send, args=('c',))
1564
1565 d10 = Foo()
1566 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1567
1568 d01 = Foo()
1569 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1570 d02 = Foo()
1571 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1572 d03 = Foo()
1573 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1574
1575 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1576
1577 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1578
1579 # call mutliprocessing's cleanup function then exit process without
1580 # garbage collecting locals
1581 util._exit_function()
1582 conn.close()
1583 os._exit(0)
1584
1585 def test_finalize(self):
1586 conn, child_conn = self.Pipe()
1587
1588 p = self.Process(target=self._test_finalize, args=(child_conn,))
1589 p.start()
1590 p.join()
1591
1592 result = [obj for obj in iter(conn.recv, 'STOP')]
1593 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1594
1595#
1596# Test that from ... import * works for each module
1597#
1598
1599class _TestImportStar(BaseTestCase):
1600
1601 ALLOWED_TYPES = ('processes',)
1602
1603 def test_import(self):
1604 modules = (
1605 'multiprocessing', 'multiprocessing.connection',
1606 'multiprocessing.heap', 'multiprocessing.managers',
1607 'multiprocessing.pool', 'multiprocessing.process',
1608 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1609 'multiprocessing.synchronize', 'multiprocessing.util'
1610 )
1611
1612 for name in modules:
1613 __import__(name)
1614 mod = sys.modules[name]
1615
1616 for attr in getattr(mod, '__all__', ()):
1617 self.assertTrue(
1618 hasattr(mod, attr),
1619 '%r does not have attribute %r' % (mod, attr)
1620 )
1621
1622#
1623# Quick test that logging works -- does not test logging output
1624#
1625
1626class _TestLogging(BaseTestCase):
1627
1628 ALLOWED_TYPES = ('processes',)
1629
1630 def test_enable_logging(self):
1631 logger = multiprocessing.get_logger()
1632 logger.setLevel(util.SUBWARNING)
1633 self.assertTrue(logger is not None)
1634 logger.debug('this will not be printed')
1635 logger.info('nor will this')
1636 logger.setLevel(LOG_LEVEL)
1637
1638 def _test_level(self, conn):
1639 logger = multiprocessing.get_logger()
1640 conn.send(logger.getEffectiveLevel())
1641
1642 def test_level(self):
1643 LEVEL1 = 32
1644 LEVEL2 = 37
1645
1646 logger = multiprocessing.get_logger()
1647 root_logger = logging.getLogger()
1648 root_level = root_logger.level
1649
1650 reader, writer = multiprocessing.Pipe(duplex=False)
1651
1652 logger.setLevel(LEVEL1)
1653 self.Process(target=self._test_level, args=(writer,)).start()
1654 self.assertEqual(LEVEL1, reader.recv())
1655
1656 logger.setLevel(logging.NOTSET)
1657 root_logger.setLevel(LEVEL2)
1658 self.Process(target=self._test_level, args=(writer,)).start()
1659 self.assertEqual(LEVEL2, reader.recv())
1660
1661 root_logger.setLevel(root_level)
1662 logger.setLevel(level=LOG_LEVEL)
1663
1664#
1665# Functions used to create test cases from the base ones in this module
1666#
1667
1668def get_attributes(Source, names):
1669 d = {}
1670 for name in names:
1671 obj = getattr(Source, name)
1672 if type(obj) == type(get_attributes):
1673 obj = staticmethod(obj)
1674 d[name] = obj
1675 return d
1676
1677def create_test_cases(Mixin, type):
1678 result = {}
1679 glob = globals()
1680 Type = type[0].upper() + type[1:]
1681
1682 for name in glob.keys():
1683 if name.startswith('_Test'):
1684 base = glob[name]
1685 if type in base.ALLOWED_TYPES:
1686 newname = 'With' + Type + name[1:]
1687 class Temp(base, unittest.TestCase, Mixin):
1688 pass
1689 result[newname] = Temp
1690 Temp.__name__ = newname
1691 Temp.__module__ = Mixin.__module__
1692 return result
1693
1694#
1695# Create test cases
1696#
1697
1698class ProcessesMixin(object):
1699 TYPE = 'processes'
1700 Process = multiprocessing.Process
1701 locals().update(get_attributes(multiprocessing, (
1702 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1703 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1704 'RawArray', 'current_process', 'active_children', 'Pipe',
1705 'connection', 'JoinableQueue'
1706 )))
1707
1708testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1709globals().update(testcases_processes)
1710
1711
1712class ManagerMixin(object):
1713 TYPE = 'manager'
1714 Process = multiprocessing.Process
1715 manager = object.__new__(multiprocessing.managers.SyncManager)
1716 locals().update(get_attributes(manager, (
1717 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1718 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1719 'Namespace', 'JoinableQueue'
1720 )))
1721
1722testcases_manager = create_test_cases(ManagerMixin, type='manager')
1723globals().update(testcases_manager)
1724
1725
1726class ThreadsMixin(object):
1727 TYPE = 'threads'
1728 Process = multiprocessing.dummy.Process
1729 locals().update(get_attributes(multiprocessing.dummy, (
1730 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1731 'Condition', 'Event', 'Value', 'Array', 'current_process',
1732 'active_children', 'Pipe', 'connection', 'dict', 'list',
1733 'Namespace', 'JoinableQueue'
1734 )))
1735
1736testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1737globals().update(testcases_threads)
1738
1739#
1740#
1741#
1742
1743def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001744 if sys.platform.startswith("linux"):
1745 try:
1746 lock = multiprocessing.RLock()
1747 except OSError:
1748 from test.test_support import TestSkipped
1749 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001750
Benjamin Petersondfd79492008-06-13 19:13:39 +00001751 if run is None:
1752 from test.test_support import run_unittest as run
1753
1754 util.get_temp_dir() # creates temp directory for use by all processes
1755
1756 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1757
Jesse Noller146b7ab2008-07-02 16:44:09 +00001758 ProcessesMixin.pool = multiprocessing.Pool(4)
1759 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1760 ManagerMixin.manager.__init__()
1761 ManagerMixin.manager.start()
1762 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001763
1764 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001765 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1766 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1767 sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001768 )
1769
1770 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1771 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1772 run(suite)
1773
Jesse Noller146b7ab2008-07-02 16:44:09 +00001774 ThreadsMixin.pool.terminate()
1775 ProcessesMixin.pool.terminate()
1776 ManagerMixin.pool.terminate()
1777 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001778
Jesse Noller146b7ab2008-07-02 16:44:09 +00001779 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001780
1781def main():
1782 test_main(unittest.TextTestRunner(verbosity=2).run)
1783
1784if __name__ == '__main__':
1785 main()