blob: 4388fe052343bda8ad06d74040c51a0d10131751 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
Jesse Noller37040cd2008-09-30 00:15:45 +000021
22# Work around broken sem_open implementations
23try:
24 import multiprocessing.synchronize
25except ImportError, e:
Benjamin Petersonbec087f2009-03-26 21:10:30 +000026 raise unittest.SkipTest(e)
Jesse Noller37040cd2008-09-30 00:15:45 +000027
Benjamin Petersondfd79492008-06-13 19:13:39 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000032import multiprocessing.pool
33import _multiprocessing
34
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Petersone79edf52008-07-13 18:34:58 +000041latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44# Constants
45#
46
47LOG_LEVEL = util.SUBWARNING
48#LOG_LEVEL = logging.WARNING
49
50DELTA = 0.1
51CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
59
60HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
62
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000063WIN32 = (sys.platform == "win32")
64
Benjamin Petersondfd79492008-06-13 19:13:39 +000065#
66# Creates a wrapper for a function which records the time it takes to finish
67#
68
69class TimingWrapper(object):
70
71 def __init__(self, func):
72 self.func = func
73 self.elapsed = None
74
75 def __call__(self, *args, **kwds):
76 t = time.time()
77 try:
78 return self.func(*args, **kwds)
79 finally:
80 self.elapsed = time.time() - t
81
82#
83# Base class for test cases
84#
85
86class BaseTestCase(object):
87
88 ALLOWED_TYPES = ('processes', 'manager', 'threads')
89
90 def assertTimingAlmostEqual(self, a, b):
91 if CHECK_TIMINGS:
92 self.assertAlmostEqual(a, b, 1)
93
94 def assertReturnsIfImplemented(self, value, func, *args):
95 try:
96 res = func(*args)
97 except NotImplementedError:
98 pass
99 else:
100 return self.assertEqual(value, res)
101
102#
103# Return the value of a semaphore
104#
105
106def get_value(self):
107 try:
108 return self.get_value()
109 except AttributeError:
110 try:
111 return self._Semaphore__value
112 except AttributeError:
113 try:
114 return self._value
115 except AttributeError:
116 raise NotImplementedError
117
118#
119# Testcases
120#
121
122class _TestProcess(BaseTestCase):
123
124 ALLOWED_TYPES = ('processes', 'threads')
125
126 def test_current(self):
127 if self.TYPE == 'threads':
128 return
129
130 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000131 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000132
133 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000134 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000135 self.assertTrue(isinstance(authkey, bytes))
136 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000137 self.assertEqual(current.ident, os.getpid())
138 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000139
140 def _test(self, q, *args, **kwds):
141 current = self.current_process()
142 q.put(args)
143 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000144 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000145 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000146 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000147 q.put(current.pid)
148
149 def test_process(self):
150 q = self.Queue(1)
151 e = self.Event()
152 args = (q, 1, 2)
153 kwargs = {'hello':23, 'bye':2.54}
154 name = 'SomeProcess'
155 p = self.Process(
156 target=self._test, args=args, kwargs=kwargs, name=name
157 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000158 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159 current = self.current_process()
160
161 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000162 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000163 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000164 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000165 self.assertTrue(p not in self.active_children())
166 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000167 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000168
169 p.start()
170
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000171 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000172 self.assertEquals(p.is_alive(), True)
173 self.assertTrue(p in self.active_children())
174
175 self.assertEquals(q.get(), args[1:])
176 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000177 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000178 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000179 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000180 self.assertEquals(q.get(), p.pid)
181
182 p.join()
183
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000184 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000185 self.assertEquals(p.is_alive(), False)
186 self.assertTrue(p not in self.active_children())
187
188 def _test_terminate(self):
189 time.sleep(1000)
190
191 def test_terminate(self):
192 if self.TYPE == 'threads':
193 return
194
195 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000196 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000197 p.start()
198
199 self.assertEqual(p.is_alive(), True)
200 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000202
203 p.terminate()
204
205 join = TimingWrapper(p.join)
206 self.assertEqual(join(), None)
207 self.assertTimingAlmostEqual(join.elapsed, 0.0)
208
209 self.assertEqual(p.is_alive(), False)
210 self.assertTrue(p not in self.active_children())
211
212 p.join()
213
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000214 # XXX sometimes get p.exitcode == 0 on Windows ...
215 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216
217 def test_cpu_count(self):
218 try:
219 cpus = multiprocessing.cpu_count()
220 except NotImplementedError:
221 cpus = 1
222 self.assertTrue(type(cpus) is int)
223 self.assertTrue(cpus >= 1)
224
225 def test_active_children(self):
226 self.assertEqual(type(self.active_children()), list)
227
228 p = self.Process(target=time.sleep, args=(DELTA,))
229 self.assertTrue(p not in self.active_children())
230
231 p.start()
232 self.assertTrue(p in self.active_children())
233
234 p.join()
235 self.assertTrue(p not in self.active_children())
236
237 def _test_recursion(self, wconn, id):
238 from multiprocessing import forking
239 wconn.send(id)
240 if len(id) < 2:
241 for i in range(2):
242 p = self.Process(
243 target=self._test_recursion, args=(wconn, id+[i])
244 )
245 p.start()
246 p.join()
247
248 def test_recursion(self):
249 rconn, wconn = self.Pipe(duplex=False)
250 self._test_recursion(wconn, [])
251
252 time.sleep(DELTA)
253 result = []
254 while rconn.poll():
255 result.append(rconn.recv())
256
257 expected = [
258 [],
259 [0],
260 [0, 0],
261 [0, 1],
262 [1],
263 [1, 0],
264 [1, 1]
265 ]
266 self.assertEqual(result, expected)
267
268#
269#
270#
271
272class _UpperCaser(multiprocessing.Process):
273
274 def __init__(self):
275 multiprocessing.Process.__init__(self)
276 self.child_conn, self.parent_conn = multiprocessing.Pipe()
277
278 def run(self):
279 self.parent_conn.close()
280 for s in iter(self.child_conn.recv, None):
281 self.child_conn.send(s.upper())
282 self.child_conn.close()
283
284 def submit(self, s):
285 assert type(s) is str
286 self.parent_conn.send(s)
287 return self.parent_conn.recv()
288
289 def stop(self):
290 self.parent_conn.send(None)
291 self.parent_conn.close()
292 self.child_conn.close()
293
294class _TestSubclassingProcess(BaseTestCase):
295
296 ALLOWED_TYPES = ('processes',)
297
298 def test_subclassing(self):
299 uppercaser = _UpperCaser()
300 uppercaser.start()
301 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
302 self.assertEqual(uppercaser.submit('world'), 'WORLD')
303 uppercaser.stop()
304 uppercaser.join()
305
306#
307#
308#
309
310def queue_empty(q):
311 if hasattr(q, 'empty'):
312 return q.empty()
313 else:
314 return q.qsize() == 0
315
316def queue_full(q, maxsize):
317 if hasattr(q, 'full'):
318 return q.full()
319 else:
320 return q.qsize() == maxsize
321
322
323class _TestQueue(BaseTestCase):
324
325
326 def _test_put(self, queue, child_can_start, parent_can_continue):
327 child_can_start.wait()
328 for i in range(6):
329 queue.get()
330 parent_can_continue.set()
331
332 def test_put(self):
333 MAXSIZE = 6
334 queue = self.Queue(maxsize=MAXSIZE)
335 child_can_start = self.Event()
336 parent_can_continue = self.Event()
337
338 proc = self.Process(
339 target=self._test_put,
340 args=(queue, child_can_start, parent_can_continue)
341 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000342 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000343 proc.start()
344
345 self.assertEqual(queue_empty(queue), True)
346 self.assertEqual(queue_full(queue, MAXSIZE), False)
347
348 queue.put(1)
349 queue.put(2, True)
350 queue.put(3, True, None)
351 queue.put(4, False)
352 queue.put(5, False, None)
353 queue.put_nowait(6)
354
355 # the values may be in buffer but not yet in pipe so sleep a bit
356 time.sleep(DELTA)
357
358 self.assertEqual(queue_empty(queue), False)
359 self.assertEqual(queue_full(queue, MAXSIZE), True)
360
361 put = TimingWrapper(queue.put)
362 put_nowait = TimingWrapper(queue.put_nowait)
363
364 self.assertRaises(Queue.Full, put, 7, False)
365 self.assertTimingAlmostEqual(put.elapsed, 0)
366
367 self.assertRaises(Queue.Full, put, 7, False, None)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(Queue.Full, put_nowait, 7)
371 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
372
373 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
374 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
375
376 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
377 self.assertTimingAlmostEqual(put.elapsed, 0)
378
379 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
380 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
381
382 child_can_start.set()
383 parent_can_continue.wait()
384
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
387
388 proc.join()
389
390 def _test_get(self, queue, child_can_start, parent_can_continue):
391 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000392 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000393 queue.put(2)
394 queue.put(3)
395 queue.put(4)
396 queue.put(5)
397 parent_can_continue.set()
398
399 def test_get(self):
400 queue = self.Queue()
401 child_can_start = self.Event()
402 parent_can_continue = self.Event()
403
404 proc = self.Process(
405 target=self._test_get,
406 args=(queue, child_can_start, parent_can_continue)
407 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000408 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000409 proc.start()
410
411 self.assertEqual(queue_empty(queue), True)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 time.sleep(DELTA)
417 self.assertEqual(queue_empty(queue), False)
418
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000419 # Hangs unexpectedly, remove for now
420 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000421 self.assertEqual(queue.get(True, None), 2)
422 self.assertEqual(queue.get(True), 3)
423 self.assertEqual(queue.get(timeout=1), 4)
424 self.assertEqual(queue.get_nowait(), 5)
425
426 self.assertEqual(queue_empty(queue), True)
427
428 get = TimingWrapper(queue.get)
429 get_nowait = TimingWrapper(queue.get_nowait)
430
431 self.assertRaises(Queue.Empty, get, False)
432 self.assertTimingAlmostEqual(get.elapsed, 0)
433
434 self.assertRaises(Queue.Empty, get, False, None)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(Queue.Empty, get_nowait)
438 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
439
440 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
442
443 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
444 self.assertTimingAlmostEqual(get.elapsed, 0)
445
446 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
447 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
448
449 proc.join()
450
451 def _test_fork(self, queue):
452 for i in range(10, 20):
453 queue.put(i)
454 # note that at this point the items may only be buffered, so the
455 # process cannot shutdown until the feeder thread has finished
456 # pushing items onto the pipe.
457
458 def test_fork(self):
459 # Old versions of Queue would fail to create a new feeder
460 # thread for a forked process if the original process had its
461 # own feeder thread. This test checks that this no longer
462 # happens.
463
464 queue = self.Queue()
465
466 # put items on queue so that main process starts a feeder thread
467 for i in range(10):
468 queue.put(i)
469
470 # wait to make sure thread starts before we fork a new process
471 time.sleep(DELTA)
472
473 # fork process
474 p = self.Process(target=self._test_fork, args=(queue,))
475 p.start()
476
477 # check that all expected items are in the queue
478 for i in range(20):
479 self.assertEqual(queue.get(), i)
480 self.assertRaises(Queue.Empty, queue.get, False)
481
482 p.join()
483
484 def test_qsize(self):
485 q = self.Queue()
486 try:
487 self.assertEqual(q.qsize(), 0)
488 except NotImplementedError:
489 return
490 q.put(1)
491 self.assertEqual(q.qsize(), 1)
492 q.put(5)
493 self.assertEqual(q.qsize(), 2)
494 q.get()
495 self.assertEqual(q.qsize(), 1)
496 q.get()
497 self.assertEqual(q.qsize(), 0)
498
499 def _test_task_done(self, q):
500 for obj in iter(q.get, None):
501 time.sleep(DELTA)
502 q.task_done()
503
504 def test_task_done(self):
505 queue = self.JoinableQueue()
506
507 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
508 return
509
510 workers = [self.Process(target=self._test_task_done, args=(queue,))
511 for i in xrange(4)]
512
513 for p in workers:
514 p.start()
515
516 for i in xrange(10):
517 queue.put(i)
518
519 queue.join()
520
521 for p in workers:
522 queue.put(None)
523
524 for p in workers:
525 p.join()
526
527#
528#
529#
530
531class _TestLock(BaseTestCase):
532
533 def test_lock(self):
534 lock = self.Lock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(False), False)
537 self.assertEqual(lock.release(), None)
538 self.assertRaises((ValueError, threading.ThreadError), lock.release)
539
540 def test_rlock(self):
541 lock = self.RLock()
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((AssertionError, RuntimeError), lock.release)
549
550
551class _TestSemaphore(BaseTestCase):
552
553 def _test_semaphore(self, sem):
554 self.assertReturnsIfImplemented(2, get_value, sem)
555 self.assertEqual(sem.acquire(), True)
556 self.assertReturnsIfImplemented(1, get_value, sem)
557 self.assertEqual(sem.acquire(), True)
558 self.assertReturnsIfImplemented(0, get_value, sem)
559 self.assertEqual(sem.acquire(False), False)
560 self.assertReturnsIfImplemented(0, get_value, sem)
561 self.assertEqual(sem.release(), None)
562 self.assertReturnsIfImplemented(1, get_value, sem)
563 self.assertEqual(sem.release(), None)
564 self.assertReturnsIfImplemented(2, get_value, sem)
565
566 def test_semaphore(self):
567 sem = self.Semaphore(2)
568 self._test_semaphore(sem)
569 self.assertEqual(sem.release(), None)
570 self.assertReturnsIfImplemented(3, get_value, sem)
571 self.assertEqual(sem.release(), None)
572 self.assertReturnsIfImplemented(4, get_value, sem)
573
574 def test_bounded_semaphore(self):
575 sem = self.BoundedSemaphore(2)
576 self._test_semaphore(sem)
577 # Currently fails on OS/X
578 #if HAVE_GETVALUE:
579 # self.assertRaises(ValueError, sem.release)
580 # self.assertReturnsIfImplemented(2, get_value, sem)
581
582 def test_timeout(self):
583 if self.TYPE != 'processes':
584 return
585
586 sem = self.Semaphore(0)
587 acquire = TimingWrapper(sem.acquire)
588
589 self.assertEqual(acquire(False), False)
590 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
591
592 self.assertEqual(acquire(False, None), False)
593 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
594
595 self.assertEqual(acquire(False, TIMEOUT1), False)
596 self.assertTimingAlmostEqual(acquire.elapsed, 0)
597
598 self.assertEqual(acquire(True, TIMEOUT2), False)
599 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
600
601 self.assertEqual(acquire(timeout=TIMEOUT3), False)
602 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
603
604
605class _TestCondition(BaseTestCase):
606
607 def f(self, cond, sleeping, woken, timeout=None):
608 cond.acquire()
609 sleeping.release()
610 cond.wait(timeout)
611 woken.release()
612 cond.release()
613
614 def check_invariant(self, cond):
615 # this is only supposed to succeed when there are no sleepers
616 if self.TYPE == 'processes':
617 try:
618 sleepers = (cond._sleeping_count.get_value() -
619 cond._woken_count.get_value())
620 self.assertEqual(sleepers, 0)
621 self.assertEqual(cond._wait_semaphore.get_value(), 0)
622 except NotImplementedError:
623 pass
624
625 def test_notify(self):
626 cond = self.Condition()
627 sleeping = self.Semaphore(0)
628 woken = self.Semaphore(0)
629
630 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000631 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000632 p.start()
633
634 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000635 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000636 p.start()
637
638 # wait for both children to start sleeping
639 sleeping.acquire()
640 sleeping.acquire()
641
642 # check no process/thread has woken up
643 time.sleep(DELTA)
644 self.assertReturnsIfImplemented(0, get_value, woken)
645
646 # wake up one process/thread
647 cond.acquire()
648 cond.notify()
649 cond.release()
650
651 # check one process/thread has woken up
652 time.sleep(DELTA)
653 self.assertReturnsIfImplemented(1, get_value, woken)
654
655 # wake up another
656 cond.acquire()
657 cond.notify()
658 cond.release()
659
660 # check other has woken up
661 time.sleep(DELTA)
662 self.assertReturnsIfImplemented(2, get_value, woken)
663
664 # check state is not mucked up
665 self.check_invariant(cond)
666 p.join()
667
668 def test_notify_all(self):
669 cond = self.Condition()
670 sleeping = self.Semaphore(0)
671 woken = self.Semaphore(0)
672
673 # start some threads/processes which will timeout
674 for i in range(3):
675 p = self.Process(target=self.f,
676 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000677 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000678 p.start()
679
680 t = threading.Thread(target=self.f,
681 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000682 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000683 t.start()
684
685 # wait for them all to sleep
686 for i in xrange(6):
687 sleeping.acquire()
688
689 # check they have all timed out
690 for i in xrange(6):
691 woken.acquire()
692 self.assertReturnsIfImplemented(0, get_value, woken)
693
694 # check state is not mucked up
695 self.check_invariant(cond)
696
697 # start some more threads/processes
698 for i in range(3):
699 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000700 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000701 p.start()
702
703 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000704 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000705 t.start()
706
707 # wait for them to all sleep
708 for i in xrange(6):
709 sleeping.acquire()
710
711 # check no process/thread has woken up
712 time.sleep(DELTA)
713 self.assertReturnsIfImplemented(0, get_value, woken)
714
715 # wake them all up
716 cond.acquire()
717 cond.notify_all()
718 cond.release()
719
720 # check they have all woken
721 time.sleep(DELTA)
722 self.assertReturnsIfImplemented(6, get_value, woken)
723
724 # check state is not mucked up
725 self.check_invariant(cond)
726
727 def test_timeout(self):
728 cond = self.Condition()
729 wait = TimingWrapper(cond.wait)
730 cond.acquire()
731 res = wait(TIMEOUT1)
732 cond.release()
733 self.assertEqual(res, None)
734 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
735
736
737class _TestEvent(BaseTestCase):
738
739 def _test_event(self, event):
740 time.sleep(TIMEOUT2)
741 event.set()
742
743 def test_event(self):
744 event = self.Event()
745 wait = TimingWrapper(event.wait)
746
747 # Removed temporaily, due to API shear, this does not
748 # work with threading._Event objects. is_set == isSet
749 #self.assertEqual(event.is_set(), False)
750
751 self.assertEqual(wait(0.0), None)
752 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
753 self.assertEqual(wait(TIMEOUT1), None)
754 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
755
756 event.set()
757
758 # See note above on the API differences
759 # self.assertEqual(event.is_set(), True)
760 self.assertEqual(wait(), None)
761 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
762 self.assertEqual(wait(TIMEOUT1), None)
763 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
764 # self.assertEqual(event.is_set(), True)
765
766 event.clear()
767
768 #self.assertEqual(event.is_set(), False)
769
770 self.Process(target=self._test_event, args=(event,)).start()
771 self.assertEqual(wait(), None)
772
773#
774#
775#
776
777class _TestValue(BaseTestCase):
778
779 codes_values = [
780 ('i', 4343, 24234),
781 ('d', 3.625, -4.25),
782 ('h', -232, 234),
783 ('c', latin('x'), latin('y'))
784 ]
785
786 def _test(self, values):
787 for sv, cv in zip(values, self.codes_values):
788 sv.value = cv[2]
789
790
791 def test_value(self, raw=False):
792 if self.TYPE != 'processes':
793 return
794
795 if raw:
796 values = [self.RawValue(code, value)
797 for code, value, _ in self.codes_values]
798 else:
799 values = [self.Value(code, value)
800 for code, value, _ in self.codes_values]
801
802 for sv, cv in zip(values, self.codes_values):
803 self.assertEqual(sv.value, cv[1])
804
805 proc = self.Process(target=self._test, args=(values,))
806 proc.start()
807 proc.join()
808
809 for sv, cv in zip(values, self.codes_values):
810 self.assertEqual(sv.value, cv[2])
811
812 def test_rawvalue(self):
813 self.test_value(raw=True)
814
815 def test_getobj_getlock(self):
816 if self.TYPE != 'processes':
817 return
818
819 val1 = self.Value('i', 5)
820 lock1 = val1.get_lock()
821 obj1 = val1.get_obj()
822
823 val2 = self.Value('i', 5, lock=None)
824 lock2 = val2.get_lock()
825 obj2 = val2.get_obj()
826
827 lock = self.Lock()
828 val3 = self.Value('i', 5, lock=lock)
829 lock3 = val3.get_lock()
830 obj3 = val3.get_obj()
831 self.assertEqual(lock, lock3)
832
Jesse Noller6ab22152009-01-18 02:45:38 +0000833 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000834 self.assertFalse(hasattr(arr4, 'get_lock'))
835 self.assertFalse(hasattr(arr4, 'get_obj'))
836
Jesse Noller6ab22152009-01-18 02:45:38 +0000837 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
838
839 arr5 = self.RawValue('i', 5)
840 self.assertFalse(hasattr(arr5, 'get_lock'))
841 self.assertFalse(hasattr(arr5, 'get_obj'))
842
Benjamin Petersondfd79492008-06-13 19:13:39 +0000843
844class _TestArray(BaseTestCase):
845
846 def f(self, seq):
847 for i in range(1, len(seq)):
848 seq[i] += seq[i-1]
849
850 def test_array(self, raw=False):
851 if self.TYPE != 'processes':
852 return
853
854 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
855 if raw:
856 arr = self.RawArray('i', seq)
857 else:
858 arr = self.Array('i', seq)
859
860 self.assertEqual(len(arr), len(seq))
861 self.assertEqual(arr[3], seq[3])
862 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
863
864 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
865
866 self.assertEqual(list(arr[:]), seq)
867
868 self.f(seq)
869
870 p = self.Process(target=self.f, args=(arr,))
871 p.start()
872 p.join()
873
874 self.assertEqual(list(arr[:]), seq)
875
876 def test_rawarray(self):
877 self.test_array(raw=True)
878
879 def test_getobj_getlock_obj(self):
880 if self.TYPE != 'processes':
881 return
882
883 arr1 = self.Array('i', range(10))
884 lock1 = arr1.get_lock()
885 obj1 = arr1.get_obj()
886
887 arr2 = self.Array('i', range(10), lock=None)
888 lock2 = arr2.get_lock()
889 obj2 = arr2.get_obj()
890
891 lock = self.Lock()
892 arr3 = self.Array('i', range(10), lock=lock)
893 lock3 = arr3.get_lock()
894 obj3 = arr3.get_obj()
895 self.assertEqual(lock, lock3)
896
Jesse Noller6ab22152009-01-18 02:45:38 +0000897 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000898 self.assertFalse(hasattr(arr4, 'get_lock'))
899 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000900 self.assertRaises(AttributeError,
901 self.Array, 'i', range(10), lock='notalock')
902
903 arr5 = self.RawArray('i', range(10))
904 self.assertFalse(hasattr(arr5, 'get_lock'))
905 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000906
907#
908#
909#
910
911class _TestContainers(BaseTestCase):
912
913 ALLOWED_TYPES = ('manager',)
914
915 def test_list(self):
916 a = self.list(range(10))
917 self.assertEqual(a[:], range(10))
918
919 b = self.list()
920 self.assertEqual(b[:], [])
921
922 b.extend(range(5))
923 self.assertEqual(b[:], range(5))
924
925 self.assertEqual(b[2], 2)
926 self.assertEqual(b[2:10], [2,3,4])
927
928 b *= 2
929 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
930
931 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
932
933 self.assertEqual(a[:], range(10))
934
935 d = [a, b]
936 e = self.list(d)
937 self.assertEqual(
938 e[:],
939 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
940 )
941
942 f = self.list([a])
943 a.append('hello')
944 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
945
946 def test_dict(self):
947 d = self.dict()
948 indices = range(65, 70)
949 for i in indices:
950 d[i] = chr(i)
951 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
952 self.assertEqual(sorted(d.keys()), indices)
953 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
954 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
955
956 def test_namespace(self):
957 n = self.Namespace()
958 n.name = 'Bob'
959 n.job = 'Builder'
960 n._hidden = 'hidden'
961 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
962 del n.job
963 self.assertEqual(str(n), "Namespace(name='Bob')")
964 self.assertTrue(hasattr(n, 'name'))
965 self.assertTrue(not hasattr(n, 'job'))
966
967#
968#
969#
970
971def sqr(x, wait=0.0):
972 time.sleep(wait)
973 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000974class _TestPool(BaseTestCase):
975
976 def test_apply(self):
977 papply = self.pool.apply
978 self.assertEqual(papply(sqr, (5,)), sqr(5))
979 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
980
981 def test_map(self):
982 pmap = self.pool.map
983 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
984 self.assertEqual(pmap(sqr, range(100), chunksize=20),
985 map(sqr, range(100)))
986
987 def test_async(self):
988 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
989 get = TimingWrapper(res.get)
990 self.assertEqual(get(), 49)
991 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
992
993 def test_async_timeout(self):
994 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
995 get = TimingWrapper(res.get)
996 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
997 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
998
999 def test_imap(self):
1000 it = self.pool.imap(sqr, range(10))
1001 self.assertEqual(list(it), map(sqr, range(10)))
1002
1003 it = self.pool.imap(sqr, range(10))
1004 for i in range(10):
1005 self.assertEqual(it.next(), i*i)
1006 self.assertRaises(StopIteration, it.next)
1007
1008 it = self.pool.imap(sqr, range(1000), chunksize=100)
1009 for i in range(1000):
1010 self.assertEqual(it.next(), i*i)
1011 self.assertRaises(StopIteration, it.next)
1012
1013 def test_imap_unordered(self):
1014 it = self.pool.imap_unordered(sqr, range(1000))
1015 self.assertEqual(sorted(it), map(sqr, range(1000)))
1016
1017 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1018 self.assertEqual(sorted(it), map(sqr, range(1000)))
1019
1020 def test_make_pool(self):
1021 p = multiprocessing.Pool(3)
1022 self.assertEqual(3, len(p._pool))
1023 p.close()
1024 p.join()
1025
1026 def test_terminate(self):
1027 if self.TYPE == 'manager':
1028 # On Unix a forked process increfs each shared object to
1029 # which its parent process held a reference. If the
1030 # forked process gets terminated then there is likely to
1031 # be a reference leak. So to prevent
1032 # _TestZZZNumberOfObjects from failing we skip this test
1033 # when using a manager.
1034 return
1035
1036 result = self.pool.map_async(
1037 time.sleep, [0.1 for i in range(10000)], chunksize=1
1038 )
1039 self.pool.terminate()
1040 join = TimingWrapper(self.pool.join)
1041 join()
1042 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001043#
1044# Test that manager has expected number of shared objects left
1045#
1046
1047class _TestZZZNumberOfObjects(BaseTestCase):
1048 # Because test cases are sorted alphabetically, this one will get
1049 # run after all the other tests for the manager. It tests that
1050 # there have been no "reference leaks" for the manager's shared
1051 # objects. Note the comment in _TestPool.test_terminate().
1052 ALLOWED_TYPES = ('manager',)
1053
1054 def test_number_of_objects(self):
1055 EXPECTED_NUMBER = 1 # the pool object is still alive
1056 multiprocessing.active_children() # discard dead process objs
1057 gc.collect() # do garbage collection
1058 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001059 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001060 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001061 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001062 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001063
1064 self.assertEqual(refs, EXPECTED_NUMBER)
1065
1066#
1067# Test of creating a customized manager class
1068#
1069
1070from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1071
1072class FooBar(object):
1073 def f(self):
1074 return 'f()'
1075 def g(self):
1076 raise ValueError
1077 def _h(self):
1078 return '_h()'
1079
1080def baz():
1081 for i in xrange(10):
1082 yield i*i
1083
1084class IteratorProxy(BaseProxy):
1085 _exposed_ = ('next', '__next__')
1086 def __iter__(self):
1087 return self
1088 def next(self):
1089 return self._callmethod('next')
1090 def __next__(self):
1091 return self._callmethod('__next__')
1092
1093class MyManager(BaseManager):
1094 pass
1095
1096MyManager.register('Foo', callable=FooBar)
1097MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1098MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1099
1100
1101class _TestMyManager(BaseTestCase):
1102
1103 ALLOWED_TYPES = ('manager',)
1104
1105 def test_mymanager(self):
1106 manager = MyManager()
1107 manager.start()
1108
1109 foo = manager.Foo()
1110 bar = manager.Bar()
1111 baz = manager.baz()
1112
1113 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1114 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1115
1116 self.assertEqual(foo_methods, ['f', 'g'])
1117 self.assertEqual(bar_methods, ['f', '_h'])
1118
1119 self.assertEqual(foo.f(), 'f()')
1120 self.assertRaises(ValueError, foo.g)
1121 self.assertEqual(foo._callmethod('f'), 'f()')
1122 self.assertRaises(RemoteError, foo._callmethod, '_h')
1123
1124 self.assertEqual(bar.f(), 'f()')
1125 self.assertEqual(bar._h(), '_h()')
1126 self.assertEqual(bar._callmethod('f'), 'f()')
1127 self.assertEqual(bar._callmethod('_h'), '_h()')
1128
1129 self.assertEqual(list(baz), [i*i for i in range(10)])
1130
1131 manager.shutdown()
1132
1133#
1134# Test of connecting to a remote server and using xmlrpclib for serialization
1135#
1136
1137_queue = Queue.Queue()
1138def get_queue():
1139 return _queue
1140
1141class QueueManager(BaseManager):
1142 '''manager class used by server process'''
1143QueueManager.register('get_queue', callable=get_queue)
1144
1145class QueueManager2(BaseManager):
1146 '''manager class which specifies the same interface as QueueManager'''
1147QueueManager2.register('get_queue')
1148
1149
1150SERIALIZER = 'xmlrpclib'
1151
1152class _TestRemoteManager(BaseTestCase):
1153
1154 ALLOWED_TYPES = ('manager',)
1155
1156 def _putter(self, address, authkey):
1157 manager = QueueManager2(
1158 address=address, authkey=authkey, serializer=SERIALIZER
1159 )
1160 manager.connect()
1161 queue = manager.get_queue()
1162 queue.put(('hello world', None, True, 2.25))
1163
1164 def test_remote(self):
1165 authkey = os.urandom(32)
1166
1167 manager = QueueManager(
1168 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1169 )
1170 manager.start()
1171
1172 p = self.Process(target=self._putter, args=(manager.address, authkey))
1173 p.start()
1174
1175 manager2 = QueueManager2(
1176 address=manager.address, authkey=authkey, serializer=SERIALIZER
1177 )
1178 manager2.connect()
1179 queue = manager2.get_queue()
1180
1181 # Note that xmlrpclib will deserialize object as a list not a tuple
1182 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1183
1184 # Because we are using xmlrpclib for serialization instead of
1185 # pickle this will cause a serialization error.
1186 self.assertRaises(Exception, queue.put, time.sleep)
1187
1188 # Make queue finalizer run before the server is stopped
1189 del queue
1190 manager.shutdown()
1191
Jesse Noller459a6482009-03-30 15:50:42 +00001192class _TestManagerRestart(BaseTestCase):
1193
1194 def _putter(self, address, authkey):
1195 manager = QueueManager(
1196 address=address, authkey=authkey, serializer=SERIALIZER)
1197 manager.connect()
1198 queue = manager.get_queue()
1199 queue.put('hello world')
1200
1201 def test_rapid_restart(self):
1202 authkey = os.urandom(32)
1203 manager = QueueManager(
1204 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1205 manager.start()
1206
1207 p = self.Process(target=self._putter, args=(manager.address, authkey))
1208 p.start()
1209 queue = manager.get_queue()
1210 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001211 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001212 manager.shutdown()
1213 manager = QueueManager(
1214 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1215 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001216 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001217
Benjamin Petersondfd79492008-06-13 19:13:39 +00001218#
1219#
1220#
1221
1222SENTINEL = latin('')
1223
1224class _TestConnection(BaseTestCase):
1225
1226 ALLOWED_TYPES = ('processes', 'threads')
1227
1228 def _echo(self, conn):
1229 for msg in iter(conn.recv_bytes, SENTINEL):
1230 conn.send_bytes(msg)
1231 conn.close()
1232
1233 def test_connection(self):
1234 conn, child_conn = self.Pipe()
1235
1236 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001237 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001238 p.start()
1239
1240 seq = [1, 2.25, None]
1241 msg = latin('hello world')
1242 longmsg = msg * 10
1243 arr = array.array('i', range(4))
1244
1245 if self.TYPE == 'processes':
1246 self.assertEqual(type(conn.fileno()), int)
1247
1248 self.assertEqual(conn.send(seq), None)
1249 self.assertEqual(conn.recv(), seq)
1250
1251 self.assertEqual(conn.send_bytes(msg), None)
1252 self.assertEqual(conn.recv_bytes(), msg)
1253
1254 if self.TYPE == 'processes':
1255 buffer = array.array('i', [0]*10)
1256 expected = list(arr) + [0] * (10 - len(arr))
1257 self.assertEqual(conn.send_bytes(arr), None)
1258 self.assertEqual(conn.recv_bytes_into(buffer),
1259 len(arr) * buffer.itemsize)
1260 self.assertEqual(list(buffer), expected)
1261
1262 buffer = array.array('i', [0]*10)
1263 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1264 self.assertEqual(conn.send_bytes(arr), None)
1265 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1266 len(arr) * buffer.itemsize)
1267 self.assertEqual(list(buffer), expected)
1268
1269 buffer = bytearray(latin(' ' * 40))
1270 self.assertEqual(conn.send_bytes(longmsg), None)
1271 try:
1272 res = conn.recv_bytes_into(buffer)
1273 except multiprocessing.BufferTooShort, e:
1274 self.assertEqual(e.args, (longmsg,))
1275 else:
1276 self.fail('expected BufferTooShort, got %s' % res)
1277
1278 poll = TimingWrapper(conn.poll)
1279
1280 self.assertEqual(poll(), False)
1281 self.assertTimingAlmostEqual(poll.elapsed, 0)
1282
1283 self.assertEqual(poll(TIMEOUT1), False)
1284 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1285
1286 conn.send(None)
1287
1288 self.assertEqual(poll(TIMEOUT1), True)
1289 self.assertTimingAlmostEqual(poll.elapsed, 0)
1290
1291 self.assertEqual(conn.recv(), None)
1292
1293 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1294 conn.send_bytes(really_big_msg)
1295 self.assertEqual(conn.recv_bytes(), really_big_msg)
1296
1297 conn.send_bytes(SENTINEL) # tell child to quit
1298 child_conn.close()
1299
1300 if self.TYPE == 'processes':
1301 self.assertEqual(conn.readable, True)
1302 self.assertEqual(conn.writable, True)
1303 self.assertRaises(EOFError, conn.recv)
1304 self.assertRaises(EOFError, conn.recv_bytes)
1305
1306 p.join()
1307
1308 def test_duplex_false(self):
1309 reader, writer = self.Pipe(duplex=False)
1310 self.assertEqual(writer.send(1), None)
1311 self.assertEqual(reader.recv(), 1)
1312 if self.TYPE == 'processes':
1313 self.assertEqual(reader.readable, True)
1314 self.assertEqual(reader.writable, False)
1315 self.assertEqual(writer.readable, False)
1316 self.assertEqual(writer.writable, True)
1317 self.assertRaises(IOError, reader.send, 2)
1318 self.assertRaises(IOError, writer.recv)
1319 self.assertRaises(IOError, writer.poll)
1320
1321 def test_spawn_close(self):
1322 # We test that a pipe connection can be closed by parent
1323 # process immediately after child is spawned. On Windows this
1324 # would have sometimes failed on old versions because
1325 # child_conn would be closed before the child got a chance to
1326 # duplicate it.
1327 conn, child_conn = self.Pipe()
1328
1329 p = self.Process(target=self._echo, args=(child_conn,))
1330 p.start()
1331 child_conn.close() # this might complete before child initializes
1332
1333 msg = latin('hello')
1334 conn.send_bytes(msg)
1335 self.assertEqual(conn.recv_bytes(), msg)
1336
1337 conn.send_bytes(SENTINEL)
1338 conn.close()
1339 p.join()
1340
1341 def test_sendbytes(self):
1342 if self.TYPE != 'processes':
1343 return
1344
1345 msg = latin('abcdefghijklmnopqrstuvwxyz')
1346 a, b = self.Pipe()
1347
1348 a.send_bytes(msg)
1349 self.assertEqual(b.recv_bytes(), msg)
1350
1351 a.send_bytes(msg, 5)
1352 self.assertEqual(b.recv_bytes(), msg[5:])
1353
1354 a.send_bytes(msg, 7, 8)
1355 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1356
1357 a.send_bytes(msg, 26)
1358 self.assertEqual(b.recv_bytes(), latin(''))
1359
1360 a.send_bytes(msg, 26, 0)
1361 self.assertEqual(b.recv_bytes(), latin(''))
1362
1363 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1364
1365 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1366
1367 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1368
1369 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1370
1371 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1372
Benjamin Petersondfd79492008-06-13 19:13:39 +00001373class _TestListenerClient(BaseTestCase):
1374
1375 ALLOWED_TYPES = ('processes', 'threads')
1376
1377 def _test(self, address):
1378 conn = self.connection.Client(address)
1379 conn.send('hello')
1380 conn.close()
1381
1382 def test_listener_client(self):
1383 for family in self.connection.families:
1384 l = self.connection.Listener(family=family)
1385 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001386 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001387 p.start()
1388 conn = l.accept()
1389 self.assertEqual(conn.recv(), 'hello')
1390 p.join()
1391 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001392#
1393# Test of sending connection and socket objects between processes
1394#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001395"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001396class _TestPicklingConnections(BaseTestCase):
1397
1398 ALLOWED_TYPES = ('processes',)
1399
1400 def _listener(self, conn, families):
1401 for fam in families:
1402 l = self.connection.Listener(family=fam)
1403 conn.send(l.address)
1404 new_conn = l.accept()
1405 conn.send(new_conn)
1406
1407 if self.TYPE == 'processes':
1408 l = socket.socket()
1409 l.bind(('localhost', 0))
1410 conn.send(l.getsockname())
1411 l.listen(1)
1412 new_conn, addr = l.accept()
1413 conn.send(new_conn)
1414
1415 conn.recv()
1416
1417 def _remote(self, conn):
1418 for (address, msg) in iter(conn.recv, None):
1419 client = self.connection.Client(address)
1420 client.send(msg.upper())
1421 client.close()
1422
1423 if self.TYPE == 'processes':
1424 address, msg = conn.recv()
1425 client = socket.socket()
1426 client.connect(address)
1427 client.sendall(msg.upper())
1428 client.close()
1429
1430 conn.close()
1431
1432 def test_pickling(self):
1433 try:
1434 multiprocessing.allow_connection_pickling()
1435 except ImportError:
1436 return
1437
1438 families = self.connection.families
1439
1440 lconn, lconn0 = self.Pipe()
1441 lp = self.Process(target=self._listener, args=(lconn0, families))
1442 lp.start()
1443 lconn0.close()
1444
1445 rconn, rconn0 = self.Pipe()
1446 rp = self.Process(target=self._remote, args=(rconn0,))
1447 rp.start()
1448 rconn0.close()
1449
1450 for fam in families:
1451 msg = ('This connection uses family %s' % fam).encode('ascii')
1452 address = lconn.recv()
1453 rconn.send((address, msg))
1454 new_conn = lconn.recv()
1455 self.assertEqual(new_conn.recv(), msg.upper())
1456
1457 rconn.send(None)
1458
1459 if self.TYPE == 'processes':
1460 msg = latin('This connection uses a normal socket')
1461 address = lconn.recv()
1462 rconn.send((address, msg))
1463 if hasattr(socket, 'fromfd'):
1464 new_conn = lconn.recv()
1465 self.assertEqual(new_conn.recv(100), msg.upper())
1466 else:
1467 # XXX On Windows with Py2.6 need to backport fromfd()
1468 discard = lconn.recv_bytes()
1469
1470 lconn.send(None)
1471
1472 rconn.close()
1473 lconn.close()
1474
1475 lp.join()
1476 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001477"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001478#
1479#
1480#
1481
1482class _TestHeap(BaseTestCase):
1483
1484 ALLOWED_TYPES = ('processes',)
1485
1486 def test_heap(self):
1487 iterations = 5000
1488 maxblocks = 50
1489 blocks = []
1490
1491 # create and destroy lots of blocks of different sizes
1492 for i in xrange(iterations):
1493 size = int(random.lognormvariate(0, 1) * 1000)
1494 b = multiprocessing.heap.BufferWrapper(size)
1495 blocks.append(b)
1496 if len(blocks) > maxblocks:
1497 i = random.randrange(maxblocks)
1498 del blocks[i]
1499
1500 # get the heap object
1501 heap = multiprocessing.heap.BufferWrapper._heap
1502
1503 # verify the state of the heap
1504 all = []
1505 occupied = 0
1506 for L in heap._len_to_seq.values():
1507 for arena, start, stop in L:
1508 all.append((heap._arenas.index(arena), start, stop,
1509 stop-start, 'free'))
1510 for arena, start, stop in heap._allocated_blocks:
1511 all.append((heap._arenas.index(arena), start, stop,
1512 stop-start, 'occupied'))
1513 occupied += (stop-start)
1514
1515 all.sort()
1516
1517 for i in range(len(all)-1):
1518 (arena, start, stop) = all[i][:3]
1519 (narena, nstart, nstop) = all[i+1][:3]
1520 self.assertTrue((arena != narena and nstart == 0) or
1521 (stop == nstart))
1522
1523#
1524#
1525#
1526
1527try:
1528 from ctypes import Structure, Value, copy, c_int, c_double
1529except ImportError:
1530 Structure = object
1531 c_int = c_double = None
1532
1533class _Foo(Structure):
1534 _fields_ = [
1535 ('x', c_int),
1536 ('y', c_double)
1537 ]
1538
1539class _TestSharedCTypes(BaseTestCase):
1540
1541 ALLOWED_TYPES = ('processes',)
1542
1543 def _double(self, x, y, foo, arr, string):
1544 x.value *= 2
1545 y.value *= 2
1546 foo.x *= 2
1547 foo.y *= 2
1548 string.value *= 2
1549 for i in range(len(arr)):
1550 arr[i] *= 2
1551
1552 def test_sharedctypes(self, lock=False):
1553 if c_int is None:
1554 return
1555
1556 x = Value('i', 7, lock=lock)
1557 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1558 foo = Value(_Foo, 3, 2, lock=lock)
1559 arr = Array('d', range(10), lock=lock)
1560 string = Array('c', 20, lock=lock)
1561 string.value = 'hello'
1562
1563 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1564 p.start()
1565 p.join()
1566
1567 self.assertEqual(x.value, 14)
1568 self.assertAlmostEqual(y.value, 2.0/3.0)
1569 self.assertEqual(foo.x, 6)
1570 self.assertAlmostEqual(foo.y, 4.0)
1571 for i in range(10):
1572 self.assertAlmostEqual(arr[i], i*2)
1573 self.assertEqual(string.value, latin('hellohello'))
1574
1575 def test_synchronize(self):
1576 self.test_sharedctypes(lock=True)
1577
1578 def test_copy(self):
1579 if c_int is None:
1580 return
1581
1582 foo = _Foo(2, 5.0)
1583 bar = copy(foo)
1584 foo.x = 0
1585 foo.y = 0
1586 self.assertEqual(bar.x, 2)
1587 self.assertAlmostEqual(bar.y, 5.0)
1588
1589#
1590#
1591#
1592
1593class _TestFinalize(BaseTestCase):
1594
1595 ALLOWED_TYPES = ('processes',)
1596
1597 def _test_finalize(self, conn):
1598 class Foo(object):
1599 pass
1600
1601 a = Foo()
1602 util.Finalize(a, conn.send, args=('a',))
1603 del a # triggers callback for a
1604
1605 b = Foo()
1606 close_b = util.Finalize(b, conn.send, args=('b',))
1607 close_b() # triggers callback for b
1608 close_b() # does nothing because callback has already been called
1609 del b # does nothing because callback has already been called
1610
1611 c = Foo()
1612 util.Finalize(c, conn.send, args=('c',))
1613
1614 d10 = Foo()
1615 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1616
1617 d01 = Foo()
1618 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1619 d02 = Foo()
1620 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1621 d03 = Foo()
1622 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1623
1624 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1625
1626 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1627
1628 # call mutliprocessing's cleanup function then exit process without
1629 # garbage collecting locals
1630 util._exit_function()
1631 conn.close()
1632 os._exit(0)
1633
1634 def test_finalize(self):
1635 conn, child_conn = self.Pipe()
1636
1637 p = self.Process(target=self._test_finalize, args=(child_conn,))
1638 p.start()
1639 p.join()
1640
1641 result = [obj for obj in iter(conn.recv, 'STOP')]
1642 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1643
1644#
1645# Test that from ... import * works for each module
1646#
1647
1648class _TestImportStar(BaseTestCase):
1649
1650 ALLOWED_TYPES = ('processes',)
1651
1652 def test_import(self):
1653 modules = (
1654 'multiprocessing', 'multiprocessing.connection',
1655 'multiprocessing.heap', 'multiprocessing.managers',
1656 'multiprocessing.pool', 'multiprocessing.process',
1657 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1658 'multiprocessing.synchronize', 'multiprocessing.util'
1659 )
1660
1661 for name in modules:
1662 __import__(name)
1663 mod = sys.modules[name]
1664
1665 for attr in getattr(mod, '__all__', ()):
1666 self.assertTrue(
1667 hasattr(mod, attr),
1668 '%r does not have attribute %r' % (mod, attr)
1669 )
1670
1671#
1672# Quick test that logging works -- does not test logging output
1673#
1674
1675class _TestLogging(BaseTestCase):
1676
1677 ALLOWED_TYPES = ('processes',)
1678
1679 def test_enable_logging(self):
1680 logger = multiprocessing.get_logger()
1681 logger.setLevel(util.SUBWARNING)
1682 self.assertTrue(logger is not None)
1683 logger.debug('this will not be printed')
1684 logger.info('nor will this')
1685 logger.setLevel(LOG_LEVEL)
1686
1687 def _test_level(self, conn):
1688 logger = multiprocessing.get_logger()
1689 conn.send(logger.getEffectiveLevel())
1690
1691 def test_level(self):
1692 LEVEL1 = 32
1693 LEVEL2 = 37
1694
1695 logger = multiprocessing.get_logger()
1696 root_logger = logging.getLogger()
1697 root_level = root_logger.level
1698
1699 reader, writer = multiprocessing.Pipe(duplex=False)
1700
1701 logger.setLevel(LEVEL1)
1702 self.Process(target=self._test_level, args=(writer,)).start()
1703 self.assertEqual(LEVEL1, reader.recv())
1704
1705 logger.setLevel(logging.NOTSET)
1706 root_logger.setLevel(LEVEL2)
1707 self.Process(target=self._test_level, args=(writer,)).start()
1708 self.assertEqual(LEVEL2, reader.recv())
1709
1710 root_logger.setLevel(root_level)
1711 logger.setLevel(level=LOG_LEVEL)
1712
1713#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001714# Test to verify handle verification, see issue 3321
1715#
1716
1717class TestInvalidHandle(unittest.TestCase):
1718
1719 def test_invalid_handles(self):
1720 if WIN32:
1721 return
1722 conn = _multiprocessing.Connection(44977608)
1723 self.assertRaises(IOError, conn.poll)
1724 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1725#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001726# Functions used to create test cases from the base ones in this module
1727#
1728
1729def get_attributes(Source, names):
1730 d = {}
1731 for name in names:
1732 obj = getattr(Source, name)
1733 if type(obj) == type(get_attributes):
1734 obj = staticmethod(obj)
1735 d[name] = obj
1736 return d
1737
1738def create_test_cases(Mixin, type):
1739 result = {}
1740 glob = globals()
1741 Type = type[0].upper() + type[1:]
1742
1743 for name in glob.keys():
1744 if name.startswith('_Test'):
1745 base = glob[name]
1746 if type in base.ALLOWED_TYPES:
1747 newname = 'With' + Type + name[1:]
1748 class Temp(base, unittest.TestCase, Mixin):
1749 pass
1750 result[newname] = Temp
1751 Temp.__name__ = newname
1752 Temp.__module__ = Mixin.__module__
1753 return result
1754
1755#
1756# Create test cases
1757#
1758
1759class ProcessesMixin(object):
1760 TYPE = 'processes'
1761 Process = multiprocessing.Process
1762 locals().update(get_attributes(multiprocessing, (
1763 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1764 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1765 'RawArray', 'current_process', 'active_children', 'Pipe',
1766 'connection', 'JoinableQueue'
1767 )))
1768
1769testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1770globals().update(testcases_processes)
1771
1772
1773class ManagerMixin(object):
1774 TYPE = 'manager'
1775 Process = multiprocessing.Process
1776 manager = object.__new__(multiprocessing.managers.SyncManager)
1777 locals().update(get_attributes(manager, (
1778 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1779 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1780 'Namespace', 'JoinableQueue'
1781 )))
1782
1783testcases_manager = create_test_cases(ManagerMixin, type='manager')
1784globals().update(testcases_manager)
1785
1786
1787class ThreadsMixin(object):
1788 TYPE = 'threads'
1789 Process = multiprocessing.dummy.Process
1790 locals().update(get_attributes(multiprocessing.dummy, (
1791 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1792 'Condition', 'Event', 'Value', 'Array', 'current_process',
1793 'active_children', 'Pipe', 'connection', 'dict', 'list',
1794 'Namespace', 'JoinableQueue'
1795 )))
1796
1797testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1798globals().update(testcases_threads)
1799
Neal Norwitz0c519b32008-08-25 01:50:24 +00001800class OtherTest(unittest.TestCase):
1801 # TODO: add more tests for deliver/answer challenge.
1802 def test_deliver_challenge_auth_failure(self):
1803 class _FakeConnection(object):
1804 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001805 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001806 def send_bytes(self, data):
1807 pass
1808 self.assertRaises(multiprocessing.AuthenticationError,
1809 multiprocessing.connection.deliver_challenge,
1810 _FakeConnection(), b'abc')
1811
1812 def test_answer_challenge_auth_failure(self):
1813 class _FakeConnection(object):
1814 def __init__(self):
1815 self.count = 0
1816 def recv_bytes(self, size):
1817 self.count += 1
1818 if self.count == 1:
1819 return multiprocessing.connection.CHALLENGE
1820 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001821 return b'something bogus'
1822 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001823 def send_bytes(self, data):
1824 pass
1825 self.assertRaises(multiprocessing.AuthenticationError,
1826 multiprocessing.connection.answer_challenge,
1827 _FakeConnection(), b'abc')
1828
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001829testcases_other = [OtherTest, TestInvalidHandle]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001830
Benjamin Petersondfd79492008-06-13 19:13:39 +00001831#
1832#
1833#
1834
1835def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001836 if sys.platform.startswith("linux"):
1837 try:
1838 lock = multiprocessing.RLock()
1839 except OSError:
Benjamin Peterson888a39b2009-03-26 20:48:25 +00001840 from test.test_support import SkipTest
Benjamin Petersonbec087f2009-03-26 21:10:30 +00001841 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001842
Benjamin Petersondfd79492008-06-13 19:13:39 +00001843 if run is None:
1844 from test.test_support import run_unittest as run
1845
1846 util.get_temp_dir() # creates temp directory for use by all processes
1847
1848 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1849
Jesse Noller146b7ab2008-07-02 16:44:09 +00001850 ProcessesMixin.pool = multiprocessing.Pool(4)
1851 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1852 ManagerMixin.manager.__init__()
1853 ManagerMixin.manager.start()
1854 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001855
1856 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001857 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1858 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00001859 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1860 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00001861 )
1862
1863 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1864 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1865 run(suite)
1866
Jesse Noller146b7ab2008-07-02 16:44:09 +00001867 ThreadsMixin.pool.terminate()
1868 ProcessesMixin.pool.terminate()
1869 ManagerMixin.pool.terminate()
1870 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001871
Jesse Noller146b7ab2008-07-02 16:44:09 +00001872 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001873
1874def main():
1875 test_main(unittest.TextTestRunner(verbosity=2).run)
1876
1877if __name__ == '__main__':
1878 main()