blob: a446ac304d99fc29cf1d458d638d4d346e692b89 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
R. David Murray3db8a342009-03-30 23:05:48 +000020import test_support
Benjamin Petersondfd79492008-06-13 19:13:39 +000021
Jesse Noller37040cd2008-09-30 00:15:45 +000022
R. David Murray3db8a342009-03-30 23:05:48 +000023_multiprocessing = test_support.import_module('_multiprocessing')
24
Jesse Noller37040cd2008-09-30 00:15:45 +000025# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000026test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000027
Benjamin Petersondfd79492008-06-13 19:13:39 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000032import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000033
34from multiprocessing import util
35
36#
37#
38#
39
Benjamin Petersone79edf52008-07-13 18:34:58 +000040latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000041
Benjamin Petersondfd79492008-06-13 19:13:39 +000042#
43# Constants
44#
45
46LOG_LEVEL = util.SUBWARNING
47#LOG_LEVEL = logging.WARNING
48
49DELTA = 0.1
50CHECK_TIMINGS = False # making true makes tests take a lot longer
51 # and can sometimes cause some non-serious
52 # failures because some calls block a bit
53 # longer than expected
54if CHECK_TIMINGS:
55 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
56else:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
58
59HAVE_GETVALUE = not getattr(_multiprocessing,
60 'HAVE_BROKEN_SEM_GETVALUE', False)
61
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000062WIN32 = (sys.platform == "win32")
63
Benjamin Petersondfd79492008-06-13 19:13:39 +000064#
65# Creates a wrapper for a function which records the time it takes to finish
66#
67
68class TimingWrapper(object):
69
70 def __init__(self, func):
71 self.func = func
72 self.elapsed = None
73
74 def __call__(self, *args, **kwds):
75 t = time.time()
76 try:
77 return self.func(*args, **kwds)
78 finally:
79 self.elapsed = time.time() - t
80
81#
82# Base class for test cases
83#
84
85class BaseTestCase(object):
86
87 ALLOWED_TYPES = ('processes', 'manager', 'threads')
88
89 def assertTimingAlmostEqual(self, a, b):
90 if CHECK_TIMINGS:
91 self.assertAlmostEqual(a, b, 1)
92
93 def assertReturnsIfImplemented(self, value, func, *args):
94 try:
95 res = func(*args)
96 except NotImplementedError:
97 pass
98 else:
99 return self.assertEqual(value, res)
100
101#
102# Return the value of a semaphore
103#
104
105def get_value(self):
106 try:
107 return self.get_value()
108 except AttributeError:
109 try:
110 return self._Semaphore__value
111 except AttributeError:
112 try:
113 return self._value
114 except AttributeError:
115 raise NotImplementedError
116
117#
118# Testcases
119#
120
121class _TestProcess(BaseTestCase):
122
123 ALLOWED_TYPES = ('processes', 'threads')
124
125 def test_current(self):
126 if self.TYPE == 'threads':
127 return
128
129 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000130 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000131
132 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000133 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000134 self.assertTrue(isinstance(authkey, bytes))
135 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000136 self.assertEqual(current.ident, os.getpid())
137 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000138
139 def _test(self, q, *args, **kwds):
140 current = self.current_process()
141 q.put(args)
142 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000143 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000144 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000145 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000146 q.put(current.pid)
147
148 def test_process(self):
149 q = self.Queue(1)
150 e = self.Event()
151 args = (q, 1, 2)
152 kwargs = {'hello':23, 'bye':2.54}
153 name = 'SomeProcess'
154 p = self.Process(
155 target=self._test, args=args, kwargs=kwargs, name=name
156 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158 current = self.current_process()
159
160 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000161 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000163 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000164 self.assertTrue(p not in self.active_children())
165 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000166 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000167
168 p.start()
169
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 self.assertEquals(p.is_alive(), True)
172 self.assertTrue(p in self.active_children())
173
174 self.assertEquals(q.get(), args[1:])
175 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000176 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000177 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000178 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179 self.assertEquals(q.get(), p.pid)
180
181 p.join()
182
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000183 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184 self.assertEquals(p.is_alive(), False)
185 self.assertTrue(p not in self.active_children())
186
187 def _test_terminate(self):
188 time.sleep(1000)
189
190 def test_terminate(self):
191 if self.TYPE == 'threads':
192 return
193
194 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000195 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196 p.start()
197
198 self.assertEqual(p.is_alive(), True)
199 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201
202 p.terminate()
203
204 join = TimingWrapper(p.join)
205 self.assertEqual(join(), None)
206 self.assertTimingAlmostEqual(join.elapsed, 0.0)
207
208 self.assertEqual(p.is_alive(), False)
209 self.assertTrue(p not in self.active_children())
210
211 p.join()
212
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000213 # XXX sometimes get p.exitcode == 0 on Windows ...
214 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215
216 def test_cpu_count(self):
217 try:
218 cpus = multiprocessing.cpu_count()
219 except NotImplementedError:
220 cpus = 1
221 self.assertTrue(type(cpus) is int)
222 self.assertTrue(cpus >= 1)
223
224 def test_active_children(self):
225 self.assertEqual(type(self.active_children()), list)
226
227 p = self.Process(target=time.sleep, args=(DELTA,))
228 self.assertTrue(p not in self.active_children())
229
230 p.start()
231 self.assertTrue(p in self.active_children())
232
233 p.join()
234 self.assertTrue(p not in self.active_children())
235
236 def _test_recursion(self, wconn, id):
237 from multiprocessing import forking
238 wconn.send(id)
239 if len(id) < 2:
240 for i in range(2):
241 p = self.Process(
242 target=self._test_recursion, args=(wconn, id+[i])
243 )
244 p.start()
245 p.join()
246
247 def test_recursion(self):
248 rconn, wconn = self.Pipe(duplex=False)
249 self._test_recursion(wconn, [])
250
251 time.sleep(DELTA)
252 result = []
253 while rconn.poll():
254 result.append(rconn.recv())
255
256 expected = [
257 [],
258 [0],
259 [0, 0],
260 [0, 1],
261 [1],
262 [1, 0],
263 [1, 1]
264 ]
265 self.assertEqual(result, expected)
266
267#
268#
269#
270
271class _UpperCaser(multiprocessing.Process):
272
273 def __init__(self):
274 multiprocessing.Process.__init__(self)
275 self.child_conn, self.parent_conn = multiprocessing.Pipe()
276
277 def run(self):
278 self.parent_conn.close()
279 for s in iter(self.child_conn.recv, None):
280 self.child_conn.send(s.upper())
281 self.child_conn.close()
282
283 def submit(self, s):
284 assert type(s) is str
285 self.parent_conn.send(s)
286 return self.parent_conn.recv()
287
288 def stop(self):
289 self.parent_conn.send(None)
290 self.parent_conn.close()
291 self.child_conn.close()
292
293class _TestSubclassingProcess(BaseTestCase):
294
295 ALLOWED_TYPES = ('processes',)
296
297 def test_subclassing(self):
298 uppercaser = _UpperCaser()
299 uppercaser.start()
300 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
301 self.assertEqual(uppercaser.submit('world'), 'WORLD')
302 uppercaser.stop()
303 uppercaser.join()
304
305#
306#
307#
308
309def queue_empty(q):
310 if hasattr(q, 'empty'):
311 return q.empty()
312 else:
313 return q.qsize() == 0
314
315def queue_full(q, maxsize):
316 if hasattr(q, 'full'):
317 return q.full()
318 else:
319 return q.qsize() == maxsize
320
321
322class _TestQueue(BaseTestCase):
323
324
325 def _test_put(self, queue, child_can_start, parent_can_continue):
326 child_can_start.wait()
327 for i in range(6):
328 queue.get()
329 parent_can_continue.set()
330
331 def test_put(self):
332 MAXSIZE = 6
333 queue = self.Queue(maxsize=MAXSIZE)
334 child_can_start = self.Event()
335 parent_can_continue = self.Event()
336
337 proc = self.Process(
338 target=self._test_put,
339 args=(queue, child_can_start, parent_can_continue)
340 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000341 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000342 proc.start()
343
344 self.assertEqual(queue_empty(queue), True)
345 self.assertEqual(queue_full(queue, MAXSIZE), False)
346
347 queue.put(1)
348 queue.put(2, True)
349 queue.put(3, True, None)
350 queue.put(4, False)
351 queue.put(5, False, None)
352 queue.put_nowait(6)
353
354 # the values may be in buffer but not yet in pipe so sleep a bit
355 time.sleep(DELTA)
356
357 self.assertEqual(queue_empty(queue), False)
358 self.assertEqual(queue_full(queue, MAXSIZE), True)
359
360 put = TimingWrapper(queue.put)
361 put_nowait = TimingWrapper(queue.put_nowait)
362
363 self.assertRaises(Queue.Full, put, 7, False)
364 self.assertTimingAlmostEqual(put.elapsed, 0)
365
366 self.assertRaises(Queue.Full, put, 7, False, None)
367 self.assertTimingAlmostEqual(put.elapsed, 0)
368
369 self.assertRaises(Queue.Full, put_nowait, 7)
370 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
371
372 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
373 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
374
375 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
376 self.assertTimingAlmostEqual(put.elapsed, 0)
377
378 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
379 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
380
381 child_can_start.set()
382 parent_can_continue.wait()
383
384 self.assertEqual(queue_empty(queue), True)
385 self.assertEqual(queue_full(queue, MAXSIZE), False)
386
387 proc.join()
388
389 def _test_get(self, queue, child_can_start, parent_can_continue):
390 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000391 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000392 queue.put(2)
393 queue.put(3)
394 queue.put(4)
395 queue.put(5)
396 parent_can_continue.set()
397
398 def test_get(self):
399 queue = self.Queue()
400 child_can_start = self.Event()
401 parent_can_continue = self.Event()
402
403 proc = self.Process(
404 target=self._test_get,
405 args=(queue, child_can_start, parent_can_continue)
406 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000407 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000408 proc.start()
409
410 self.assertEqual(queue_empty(queue), True)
411
412 child_can_start.set()
413 parent_can_continue.wait()
414
415 time.sleep(DELTA)
416 self.assertEqual(queue_empty(queue), False)
417
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000418 # Hangs unexpectedly, remove for now
419 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000420 self.assertEqual(queue.get(True, None), 2)
421 self.assertEqual(queue.get(True), 3)
422 self.assertEqual(queue.get(timeout=1), 4)
423 self.assertEqual(queue.get_nowait(), 5)
424
425 self.assertEqual(queue_empty(queue), True)
426
427 get = TimingWrapper(queue.get)
428 get_nowait = TimingWrapper(queue.get_nowait)
429
430 self.assertRaises(Queue.Empty, get, False)
431 self.assertTimingAlmostEqual(get.elapsed, 0)
432
433 self.assertRaises(Queue.Empty, get, False, None)
434 self.assertTimingAlmostEqual(get.elapsed, 0)
435
436 self.assertRaises(Queue.Empty, get_nowait)
437 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
438
439 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
440 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
441
442 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
443 self.assertTimingAlmostEqual(get.elapsed, 0)
444
445 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
446 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
447
448 proc.join()
449
450 def _test_fork(self, queue):
451 for i in range(10, 20):
452 queue.put(i)
453 # note that at this point the items may only be buffered, so the
454 # process cannot shutdown until the feeder thread has finished
455 # pushing items onto the pipe.
456
457 def test_fork(self):
458 # Old versions of Queue would fail to create a new feeder
459 # thread for a forked process if the original process had its
460 # own feeder thread. This test checks that this no longer
461 # happens.
462
463 queue = self.Queue()
464
465 # put items on queue so that main process starts a feeder thread
466 for i in range(10):
467 queue.put(i)
468
469 # wait to make sure thread starts before we fork a new process
470 time.sleep(DELTA)
471
472 # fork process
473 p = self.Process(target=self._test_fork, args=(queue,))
474 p.start()
475
476 # check that all expected items are in the queue
477 for i in range(20):
478 self.assertEqual(queue.get(), i)
479 self.assertRaises(Queue.Empty, queue.get, False)
480
481 p.join()
482
483 def test_qsize(self):
484 q = self.Queue()
485 try:
486 self.assertEqual(q.qsize(), 0)
487 except NotImplementedError:
488 return
489 q.put(1)
490 self.assertEqual(q.qsize(), 1)
491 q.put(5)
492 self.assertEqual(q.qsize(), 2)
493 q.get()
494 self.assertEqual(q.qsize(), 1)
495 q.get()
496 self.assertEqual(q.qsize(), 0)
497
498 def _test_task_done(self, q):
499 for obj in iter(q.get, None):
500 time.sleep(DELTA)
501 q.task_done()
502
503 def test_task_done(self):
504 queue = self.JoinableQueue()
505
506 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
507 return
508
509 workers = [self.Process(target=self._test_task_done, args=(queue,))
510 for i in xrange(4)]
511
512 for p in workers:
513 p.start()
514
515 for i in xrange(10):
516 queue.put(i)
517
518 queue.join()
519
520 for p in workers:
521 queue.put(None)
522
523 for p in workers:
524 p.join()
525
526#
527#
528#
529
530class _TestLock(BaseTestCase):
531
532 def test_lock(self):
533 lock = self.Lock()
534 self.assertEqual(lock.acquire(), True)
535 self.assertEqual(lock.acquire(False), False)
536 self.assertEqual(lock.release(), None)
537 self.assertRaises((ValueError, threading.ThreadError), lock.release)
538
539 def test_rlock(self):
540 lock = self.RLock()
541 self.assertEqual(lock.acquire(), True)
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.release(), None)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertRaises((AssertionError, RuntimeError), lock.release)
548
Jesse Noller82eb5902009-03-30 23:29:31 +0000549 def test_lock_context(self):
550 with self.Lock():
551 pass
552
Benjamin Petersondfd79492008-06-13 19:13:39 +0000553
554class _TestSemaphore(BaseTestCase):
555
556 def _test_semaphore(self, sem):
557 self.assertReturnsIfImplemented(2, get_value, sem)
558 self.assertEqual(sem.acquire(), True)
559 self.assertReturnsIfImplemented(1, get_value, sem)
560 self.assertEqual(sem.acquire(), True)
561 self.assertReturnsIfImplemented(0, get_value, sem)
562 self.assertEqual(sem.acquire(False), False)
563 self.assertReturnsIfImplemented(0, get_value, sem)
564 self.assertEqual(sem.release(), None)
565 self.assertReturnsIfImplemented(1, get_value, sem)
566 self.assertEqual(sem.release(), None)
567 self.assertReturnsIfImplemented(2, get_value, sem)
568
569 def test_semaphore(self):
570 sem = self.Semaphore(2)
571 self._test_semaphore(sem)
572 self.assertEqual(sem.release(), None)
573 self.assertReturnsIfImplemented(3, get_value, sem)
574 self.assertEqual(sem.release(), None)
575 self.assertReturnsIfImplemented(4, get_value, sem)
576
577 def test_bounded_semaphore(self):
578 sem = self.BoundedSemaphore(2)
579 self._test_semaphore(sem)
580 # Currently fails on OS/X
581 #if HAVE_GETVALUE:
582 # self.assertRaises(ValueError, sem.release)
583 # self.assertReturnsIfImplemented(2, get_value, sem)
584
585 def test_timeout(self):
586 if self.TYPE != 'processes':
587 return
588
589 sem = self.Semaphore(0)
590 acquire = TimingWrapper(sem.acquire)
591
592 self.assertEqual(acquire(False), False)
593 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
594
595 self.assertEqual(acquire(False, None), False)
596 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
597
598 self.assertEqual(acquire(False, TIMEOUT1), False)
599 self.assertTimingAlmostEqual(acquire.elapsed, 0)
600
601 self.assertEqual(acquire(True, TIMEOUT2), False)
602 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
603
604 self.assertEqual(acquire(timeout=TIMEOUT3), False)
605 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
606
607
608class _TestCondition(BaseTestCase):
609
610 def f(self, cond, sleeping, woken, timeout=None):
611 cond.acquire()
612 sleeping.release()
613 cond.wait(timeout)
614 woken.release()
615 cond.release()
616
617 def check_invariant(self, cond):
618 # this is only supposed to succeed when there are no sleepers
619 if self.TYPE == 'processes':
620 try:
621 sleepers = (cond._sleeping_count.get_value() -
622 cond._woken_count.get_value())
623 self.assertEqual(sleepers, 0)
624 self.assertEqual(cond._wait_semaphore.get_value(), 0)
625 except NotImplementedError:
626 pass
627
628 def test_notify(self):
629 cond = self.Condition()
630 sleeping = self.Semaphore(0)
631 woken = self.Semaphore(0)
632
633 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000634 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000635 p.start()
636
637 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000638 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000639 p.start()
640
641 # wait for both children to start sleeping
642 sleeping.acquire()
643 sleeping.acquire()
644
645 # check no process/thread has woken up
646 time.sleep(DELTA)
647 self.assertReturnsIfImplemented(0, get_value, woken)
648
649 # wake up one process/thread
650 cond.acquire()
651 cond.notify()
652 cond.release()
653
654 # check one process/thread has woken up
655 time.sleep(DELTA)
656 self.assertReturnsIfImplemented(1, get_value, woken)
657
658 # wake up another
659 cond.acquire()
660 cond.notify()
661 cond.release()
662
663 # check other has woken up
664 time.sleep(DELTA)
665 self.assertReturnsIfImplemented(2, get_value, woken)
666
667 # check state is not mucked up
668 self.check_invariant(cond)
669 p.join()
670
671 def test_notify_all(self):
672 cond = self.Condition()
673 sleeping = self.Semaphore(0)
674 woken = self.Semaphore(0)
675
676 # start some threads/processes which will timeout
677 for i in range(3):
678 p = self.Process(target=self.f,
679 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000680 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000681 p.start()
682
683 t = threading.Thread(target=self.f,
684 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000685 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000686 t.start()
687
688 # wait for them all to sleep
689 for i in xrange(6):
690 sleeping.acquire()
691
692 # check they have all timed out
693 for i in xrange(6):
694 woken.acquire()
695 self.assertReturnsIfImplemented(0, get_value, woken)
696
697 # check state is not mucked up
698 self.check_invariant(cond)
699
700 # start some more threads/processes
701 for i in range(3):
702 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000703 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704 p.start()
705
706 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000707 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000708 t.start()
709
710 # wait for them to all sleep
711 for i in xrange(6):
712 sleeping.acquire()
713
714 # check no process/thread has woken up
715 time.sleep(DELTA)
716 self.assertReturnsIfImplemented(0, get_value, woken)
717
718 # wake them all up
719 cond.acquire()
720 cond.notify_all()
721 cond.release()
722
723 # check they have all woken
724 time.sleep(DELTA)
725 self.assertReturnsIfImplemented(6, get_value, woken)
726
727 # check state is not mucked up
728 self.check_invariant(cond)
729
730 def test_timeout(self):
731 cond = self.Condition()
732 wait = TimingWrapper(cond.wait)
733 cond.acquire()
734 res = wait(TIMEOUT1)
735 cond.release()
736 self.assertEqual(res, None)
737 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
738
739
740class _TestEvent(BaseTestCase):
741
742 def _test_event(self, event):
743 time.sleep(TIMEOUT2)
744 event.set()
745
746 def test_event(self):
747 event = self.Event()
748 wait = TimingWrapper(event.wait)
749
750 # Removed temporaily, due to API shear, this does not
751 # work with threading._Event objects. is_set == isSet
752 #self.assertEqual(event.is_set(), False)
753
754 self.assertEqual(wait(0.0), None)
755 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
756 self.assertEqual(wait(TIMEOUT1), None)
757 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
758
759 event.set()
760
761 # See note above on the API differences
762 # self.assertEqual(event.is_set(), True)
763 self.assertEqual(wait(), None)
764 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
765 self.assertEqual(wait(TIMEOUT1), None)
766 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
767 # self.assertEqual(event.is_set(), True)
768
769 event.clear()
770
771 #self.assertEqual(event.is_set(), False)
772
773 self.Process(target=self._test_event, args=(event,)).start()
774 self.assertEqual(wait(), None)
775
776#
777#
778#
779
780class _TestValue(BaseTestCase):
781
782 codes_values = [
783 ('i', 4343, 24234),
784 ('d', 3.625, -4.25),
785 ('h', -232, 234),
786 ('c', latin('x'), latin('y'))
787 ]
788
789 def _test(self, values):
790 for sv, cv in zip(values, self.codes_values):
791 sv.value = cv[2]
792
793
794 def test_value(self, raw=False):
795 if self.TYPE != 'processes':
796 return
797
798 if raw:
799 values = [self.RawValue(code, value)
800 for code, value, _ in self.codes_values]
801 else:
802 values = [self.Value(code, value)
803 for code, value, _ in self.codes_values]
804
805 for sv, cv in zip(values, self.codes_values):
806 self.assertEqual(sv.value, cv[1])
807
808 proc = self.Process(target=self._test, args=(values,))
809 proc.start()
810 proc.join()
811
812 for sv, cv in zip(values, self.codes_values):
813 self.assertEqual(sv.value, cv[2])
814
815 def test_rawvalue(self):
816 self.test_value(raw=True)
817
818 def test_getobj_getlock(self):
819 if self.TYPE != 'processes':
820 return
821
822 val1 = self.Value('i', 5)
823 lock1 = val1.get_lock()
824 obj1 = val1.get_obj()
825
826 val2 = self.Value('i', 5, lock=None)
827 lock2 = val2.get_lock()
828 obj2 = val2.get_obj()
829
830 lock = self.Lock()
831 val3 = self.Value('i', 5, lock=lock)
832 lock3 = val3.get_lock()
833 obj3 = val3.get_obj()
834 self.assertEqual(lock, lock3)
835
Jesse Noller6ab22152009-01-18 02:45:38 +0000836 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000837 self.assertFalse(hasattr(arr4, 'get_lock'))
838 self.assertFalse(hasattr(arr4, 'get_obj'))
839
Jesse Noller6ab22152009-01-18 02:45:38 +0000840 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
841
842 arr5 = self.RawValue('i', 5)
843 self.assertFalse(hasattr(arr5, 'get_lock'))
844 self.assertFalse(hasattr(arr5, 'get_obj'))
845
Benjamin Petersondfd79492008-06-13 19:13:39 +0000846
847class _TestArray(BaseTestCase):
848
849 def f(self, seq):
850 for i in range(1, len(seq)):
851 seq[i] += seq[i-1]
852
853 def test_array(self, raw=False):
854 if self.TYPE != 'processes':
855 return
856
857 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
858 if raw:
859 arr = self.RawArray('i', seq)
860 else:
861 arr = self.Array('i', seq)
862
863 self.assertEqual(len(arr), len(seq))
864 self.assertEqual(arr[3], seq[3])
865 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
866
867 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
868
869 self.assertEqual(list(arr[:]), seq)
870
871 self.f(seq)
872
873 p = self.Process(target=self.f, args=(arr,))
874 p.start()
875 p.join()
876
877 self.assertEqual(list(arr[:]), seq)
878
879 def test_rawarray(self):
880 self.test_array(raw=True)
881
882 def test_getobj_getlock_obj(self):
883 if self.TYPE != 'processes':
884 return
885
886 arr1 = self.Array('i', range(10))
887 lock1 = arr1.get_lock()
888 obj1 = arr1.get_obj()
889
890 arr2 = self.Array('i', range(10), lock=None)
891 lock2 = arr2.get_lock()
892 obj2 = arr2.get_obj()
893
894 lock = self.Lock()
895 arr3 = self.Array('i', range(10), lock=lock)
896 lock3 = arr3.get_lock()
897 obj3 = arr3.get_obj()
898 self.assertEqual(lock, lock3)
899
Jesse Noller6ab22152009-01-18 02:45:38 +0000900 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000901 self.assertFalse(hasattr(arr4, 'get_lock'))
902 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000903 self.assertRaises(AttributeError,
904 self.Array, 'i', range(10), lock='notalock')
905
906 arr5 = self.RawArray('i', range(10))
907 self.assertFalse(hasattr(arr5, 'get_lock'))
908 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000909
910#
911#
912#
913
914class _TestContainers(BaseTestCase):
915
916 ALLOWED_TYPES = ('manager',)
917
918 def test_list(self):
919 a = self.list(range(10))
920 self.assertEqual(a[:], range(10))
921
922 b = self.list()
923 self.assertEqual(b[:], [])
924
925 b.extend(range(5))
926 self.assertEqual(b[:], range(5))
927
928 self.assertEqual(b[2], 2)
929 self.assertEqual(b[2:10], [2,3,4])
930
931 b *= 2
932 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
933
934 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
935
936 self.assertEqual(a[:], range(10))
937
938 d = [a, b]
939 e = self.list(d)
940 self.assertEqual(
941 e[:],
942 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
943 )
944
945 f = self.list([a])
946 a.append('hello')
947 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
948
949 def test_dict(self):
950 d = self.dict()
951 indices = range(65, 70)
952 for i in indices:
953 d[i] = chr(i)
954 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
955 self.assertEqual(sorted(d.keys()), indices)
956 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
957 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
958
959 def test_namespace(self):
960 n = self.Namespace()
961 n.name = 'Bob'
962 n.job = 'Builder'
963 n._hidden = 'hidden'
964 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
965 del n.job
966 self.assertEqual(str(n), "Namespace(name='Bob')")
967 self.assertTrue(hasattr(n, 'name'))
968 self.assertTrue(not hasattr(n, 'job'))
969
970#
971#
972#
973
974def sqr(x, wait=0.0):
975 time.sleep(wait)
976 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000977class _TestPool(BaseTestCase):
978
979 def test_apply(self):
980 papply = self.pool.apply
981 self.assertEqual(papply(sqr, (5,)), sqr(5))
982 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
983
984 def test_map(self):
985 pmap = self.pool.map
986 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
987 self.assertEqual(pmap(sqr, range(100), chunksize=20),
988 map(sqr, range(100)))
989
990 def test_async(self):
991 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
992 get = TimingWrapper(res.get)
993 self.assertEqual(get(), 49)
994 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
995
996 def test_async_timeout(self):
997 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
998 get = TimingWrapper(res.get)
999 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1000 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1001
1002 def test_imap(self):
1003 it = self.pool.imap(sqr, range(10))
1004 self.assertEqual(list(it), map(sqr, range(10)))
1005
1006 it = self.pool.imap(sqr, range(10))
1007 for i in range(10):
1008 self.assertEqual(it.next(), i*i)
1009 self.assertRaises(StopIteration, it.next)
1010
1011 it = self.pool.imap(sqr, range(1000), chunksize=100)
1012 for i in range(1000):
1013 self.assertEqual(it.next(), i*i)
1014 self.assertRaises(StopIteration, it.next)
1015
1016 def test_imap_unordered(self):
1017 it = self.pool.imap_unordered(sqr, range(1000))
1018 self.assertEqual(sorted(it), map(sqr, range(1000)))
1019
1020 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1021 self.assertEqual(sorted(it), map(sqr, range(1000)))
1022
1023 def test_make_pool(self):
1024 p = multiprocessing.Pool(3)
1025 self.assertEqual(3, len(p._pool))
1026 p.close()
1027 p.join()
1028
1029 def test_terminate(self):
1030 if self.TYPE == 'manager':
1031 # On Unix a forked process increfs each shared object to
1032 # which its parent process held a reference. If the
1033 # forked process gets terminated then there is likely to
1034 # be a reference leak. So to prevent
1035 # _TestZZZNumberOfObjects from failing we skip this test
1036 # when using a manager.
1037 return
1038
1039 result = self.pool.map_async(
1040 time.sleep, [0.1 for i in range(10000)], chunksize=1
1041 )
1042 self.pool.terminate()
1043 join = TimingWrapper(self.pool.join)
1044 join()
1045 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001046#
1047# Test that manager has expected number of shared objects left
1048#
1049
1050class _TestZZZNumberOfObjects(BaseTestCase):
1051 # Because test cases are sorted alphabetically, this one will get
1052 # run after all the other tests for the manager. It tests that
1053 # there have been no "reference leaks" for the manager's shared
1054 # objects. Note the comment in _TestPool.test_terminate().
1055 ALLOWED_TYPES = ('manager',)
1056
1057 def test_number_of_objects(self):
1058 EXPECTED_NUMBER = 1 # the pool object is still alive
1059 multiprocessing.active_children() # discard dead process objs
1060 gc.collect() # do garbage collection
1061 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001062 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001063 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001064 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001065 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001066
1067 self.assertEqual(refs, EXPECTED_NUMBER)
1068
1069#
1070# Test of creating a customized manager class
1071#
1072
1073from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1074
1075class FooBar(object):
1076 def f(self):
1077 return 'f()'
1078 def g(self):
1079 raise ValueError
1080 def _h(self):
1081 return '_h()'
1082
1083def baz():
1084 for i in xrange(10):
1085 yield i*i
1086
1087class IteratorProxy(BaseProxy):
1088 _exposed_ = ('next', '__next__')
1089 def __iter__(self):
1090 return self
1091 def next(self):
1092 return self._callmethod('next')
1093 def __next__(self):
1094 return self._callmethod('__next__')
1095
1096class MyManager(BaseManager):
1097 pass
1098
1099MyManager.register('Foo', callable=FooBar)
1100MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1101MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1102
1103
1104class _TestMyManager(BaseTestCase):
1105
1106 ALLOWED_TYPES = ('manager',)
1107
1108 def test_mymanager(self):
1109 manager = MyManager()
1110 manager.start()
1111
1112 foo = manager.Foo()
1113 bar = manager.Bar()
1114 baz = manager.baz()
1115
1116 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1117 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1118
1119 self.assertEqual(foo_methods, ['f', 'g'])
1120 self.assertEqual(bar_methods, ['f', '_h'])
1121
1122 self.assertEqual(foo.f(), 'f()')
1123 self.assertRaises(ValueError, foo.g)
1124 self.assertEqual(foo._callmethod('f'), 'f()')
1125 self.assertRaises(RemoteError, foo._callmethod, '_h')
1126
1127 self.assertEqual(bar.f(), 'f()')
1128 self.assertEqual(bar._h(), '_h()')
1129 self.assertEqual(bar._callmethod('f'), 'f()')
1130 self.assertEqual(bar._callmethod('_h'), '_h()')
1131
1132 self.assertEqual(list(baz), [i*i for i in range(10)])
1133
1134 manager.shutdown()
1135
1136#
1137# Test of connecting to a remote server and using xmlrpclib for serialization
1138#
1139
1140_queue = Queue.Queue()
1141def get_queue():
1142 return _queue
1143
1144class QueueManager(BaseManager):
1145 '''manager class used by server process'''
1146QueueManager.register('get_queue', callable=get_queue)
1147
1148class QueueManager2(BaseManager):
1149 '''manager class which specifies the same interface as QueueManager'''
1150QueueManager2.register('get_queue')
1151
1152
1153SERIALIZER = 'xmlrpclib'
1154
1155class _TestRemoteManager(BaseTestCase):
1156
1157 ALLOWED_TYPES = ('manager',)
1158
1159 def _putter(self, address, authkey):
1160 manager = QueueManager2(
1161 address=address, authkey=authkey, serializer=SERIALIZER
1162 )
1163 manager.connect()
1164 queue = manager.get_queue()
1165 queue.put(('hello world', None, True, 2.25))
1166
1167 def test_remote(self):
1168 authkey = os.urandom(32)
1169
1170 manager = QueueManager(
1171 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1172 )
1173 manager.start()
1174
1175 p = self.Process(target=self._putter, args=(manager.address, authkey))
1176 p.start()
1177
1178 manager2 = QueueManager2(
1179 address=manager.address, authkey=authkey, serializer=SERIALIZER
1180 )
1181 manager2.connect()
1182 queue = manager2.get_queue()
1183
1184 # Note that xmlrpclib will deserialize object as a list not a tuple
1185 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1186
1187 # Because we are using xmlrpclib for serialization instead of
1188 # pickle this will cause a serialization error.
1189 self.assertRaises(Exception, queue.put, time.sleep)
1190
1191 # Make queue finalizer run before the server is stopped
1192 del queue
1193 manager.shutdown()
1194
Jesse Noller459a6482009-03-30 15:50:42 +00001195class _TestManagerRestart(BaseTestCase):
1196
1197 def _putter(self, address, authkey):
1198 manager = QueueManager(
1199 address=address, authkey=authkey, serializer=SERIALIZER)
1200 manager.connect()
1201 queue = manager.get_queue()
1202 queue.put('hello world')
1203
1204 def test_rapid_restart(self):
1205 authkey = os.urandom(32)
1206 manager = QueueManager(
1207 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1208 manager.start()
1209
1210 p = self.Process(target=self._putter, args=(manager.address, authkey))
1211 p.start()
1212 queue = manager.get_queue()
1213 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001214 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001215 manager.shutdown()
1216 manager = QueueManager(
1217 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1218 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001219 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001220
Benjamin Petersondfd79492008-06-13 19:13:39 +00001221#
1222#
1223#
1224
1225SENTINEL = latin('')
1226
1227class _TestConnection(BaseTestCase):
1228
1229 ALLOWED_TYPES = ('processes', 'threads')
1230
1231 def _echo(self, conn):
1232 for msg in iter(conn.recv_bytes, SENTINEL):
1233 conn.send_bytes(msg)
1234 conn.close()
1235
1236 def test_connection(self):
1237 conn, child_conn = self.Pipe()
1238
1239 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001240 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001241 p.start()
1242
1243 seq = [1, 2.25, None]
1244 msg = latin('hello world')
1245 longmsg = msg * 10
1246 arr = array.array('i', range(4))
1247
1248 if self.TYPE == 'processes':
1249 self.assertEqual(type(conn.fileno()), int)
1250
1251 self.assertEqual(conn.send(seq), None)
1252 self.assertEqual(conn.recv(), seq)
1253
1254 self.assertEqual(conn.send_bytes(msg), None)
1255 self.assertEqual(conn.recv_bytes(), msg)
1256
1257 if self.TYPE == 'processes':
1258 buffer = array.array('i', [0]*10)
1259 expected = list(arr) + [0] * (10 - len(arr))
1260 self.assertEqual(conn.send_bytes(arr), None)
1261 self.assertEqual(conn.recv_bytes_into(buffer),
1262 len(arr) * buffer.itemsize)
1263 self.assertEqual(list(buffer), expected)
1264
1265 buffer = array.array('i', [0]*10)
1266 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1267 self.assertEqual(conn.send_bytes(arr), None)
1268 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1269 len(arr) * buffer.itemsize)
1270 self.assertEqual(list(buffer), expected)
1271
1272 buffer = bytearray(latin(' ' * 40))
1273 self.assertEqual(conn.send_bytes(longmsg), None)
1274 try:
1275 res = conn.recv_bytes_into(buffer)
1276 except multiprocessing.BufferTooShort, e:
1277 self.assertEqual(e.args, (longmsg,))
1278 else:
1279 self.fail('expected BufferTooShort, got %s' % res)
1280
1281 poll = TimingWrapper(conn.poll)
1282
1283 self.assertEqual(poll(), False)
1284 self.assertTimingAlmostEqual(poll.elapsed, 0)
1285
1286 self.assertEqual(poll(TIMEOUT1), False)
1287 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1288
1289 conn.send(None)
1290
1291 self.assertEqual(poll(TIMEOUT1), True)
1292 self.assertTimingAlmostEqual(poll.elapsed, 0)
1293
1294 self.assertEqual(conn.recv(), None)
1295
1296 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1297 conn.send_bytes(really_big_msg)
1298 self.assertEqual(conn.recv_bytes(), really_big_msg)
1299
1300 conn.send_bytes(SENTINEL) # tell child to quit
1301 child_conn.close()
1302
1303 if self.TYPE == 'processes':
1304 self.assertEqual(conn.readable, True)
1305 self.assertEqual(conn.writable, True)
1306 self.assertRaises(EOFError, conn.recv)
1307 self.assertRaises(EOFError, conn.recv_bytes)
1308
1309 p.join()
1310
1311 def test_duplex_false(self):
1312 reader, writer = self.Pipe(duplex=False)
1313 self.assertEqual(writer.send(1), None)
1314 self.assertEqual(reader.recv(), 1)
1315 if self.TYPE == 'processes':
1316 self.assertEqual(reader.readable, True)
1317 self.assertEqual(reader.writable, False)
1318 self.assertEqual(writer.readable, False)
1319 self.assertEqual(writer.writable, True)
1320 self.assertRaises(IOError, reader.send, 2)
1321 self.assertRaises(IOError, writer.recv)
1322 self.assertRaises(IOError, writer.poll)
1323
1324 def test_spawn_close(self):
1325 # We test that a pipe connection can be closed by parent
1326 # process immediately after child is spawned. On Windows this
1327 # would have sometimes failed on old versions because
1328 # child_conn would be closed before the child got a chance to
1329 # duplicate it.
1330 conn, child_conn = self.Pipe()
1331
1332 p = self.Process(target=self._echo, args=(child_conn,))
1333 p.start()
1334 child_conn.close() # this might complete before child initializes
1335
1336 msg = latin('hello')
1337 conn.send_bytes(msg)
1338 self.assertEqual(conn.recv_bytes(), msg)
1339
1340 conn.send_bytes(SENTINEL)
1341 conn.close()
1342 p.join()
1343
1344 def test_sendbytes(self):
1345 if self.TYPE != 'processes':
1346 return
1347
1348 msg = latin('abcdefghijklmnopqrstuvwxyz')
1349 a, b = self.Pipe()
1350
1351 a.send_bytes(msg)
1352 self.assertEqual(b.recv_bytes(), msg)
1353
1354 a.send_bytes(msg, 5)
1355 self.assertEqual(b.recv_bytes(), msg[5:])
1356
1357 a.send_bytes(msg, 7, 8)
1358 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1359
1360 a.send_bytes(msg, 26)
1361 self.assertEqual(b.recv_bytes(), latin(''))
1362
1363 a.send_bytes(msg, 26, 0)
1364 self.assertEqual(b.recv_bytes(), latin(''))
1365
1366 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1367
1368 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1369
1370 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1371
1372 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1373
1374 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1375
Benjamin Petersondfd79492008-06-13 19:13:39 +00001376class _TestListenerClient(BaseTestCase):
1377
1378 ALLOWED_TYPES = ('processes', 'threads')
1379
1380 def _test(self, address):
1381 conn = self.connection.Client(address)
1382 conn.send('hello')
1383 conn.close()
1384
1385 def test_listener_client(self):
1386 for family in self.connection.families:
1387 l = self.connection.Listener(family=family)
1388 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001389 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001390 p.start()
1391 conn = l.accept()
1392 self.assertEqual(conn.recv(), 'hello')
1393 p.join()
1394 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001395#
1396# Test of sending connection and socket objects between processes
1397#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001398"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001399class _TestPicklingConnections(BaseTestCase):
1400
1401 ALLOWED_TYPES = ('processes',)
1402
1403 def _listener(self, conn, families):
1404 for fam in families:
1405 l = self.connection.Listener(family=fam)
1406 conn.send(l.address)
1407 new_conn = l.accept()
1408 conn.send(new_conn)
1409
1410 if self.TYPE == 'processes':
1411 l = socket.socket()
1412 l.bind(('localhost', 0))
1413 conn.send(l.getsockname())
1414 l.listen(1)
1415 new_conn, addr = l.accept()
1416 conn.send(new_conn)
1417
1418 conn.recv()
1419
1420 def _remote(self, conn):
1421 for (address, msg) in iter(conn.recv, None):
1422 client = self.connection.Client(address)
1423 client.send(msg.upper())
1424 client.close()
1425
1426 if self.TYPE == 'processes':
1427 address, msg = conn.recv()
1428 client = socket.socket()
1429 client.connect(address)
1430 client.sendall(msg.upper())
1431 client.close()
1432
1433 conn.close()
1434
1435 def test_pickling(self):
1436 try:
1437 multiprocessing.allow_connection_pickling()
1438 except ImportError:
1439 return
1440
1441 families = self.connection.families
1442
1443 lconn, lconn0 = self.Pipe()
1444 lp = self.Process(target=self._listener, args=(lconn0, families))
1445 lp.start()
1446 lconn0.close()
1447
1448 rconn, rconn0 = self.Pipe()
1449 rp = self.Process(target=self._remote, args=(rconn0,))
1450 rp.start()
1451 rconn0.close()
1452
1453 for fam in families:
1454 msg = ('This connection uses family %s' % fam).encode('ascii')
1455 address = lconn.recv()
1456 rconn.send((address, msg))
1457 new_conn = lconn.recv()
1458 self.assertEqual(new_conn.recv(), msg.upper())
1459
1460 rconn.send(None)
1461
1462 if self.TYPE == 'processes':
1463 msg = latin('This connection uses a normal socket')
1464 address = lconn.recv()
1465 rconn.send((address, msg))
1466 if hasattr(socket, 'fromfd'):
1467 new_conn = lconn.recv()
1468 self.assertEqual(new_conn.recv(100), msg.upper())
1469 else:
1470 # XXX On Windows with Py2.6 need to backport fromfd()
1471 discard = lconn.recv_bytes()
1472
1473 lconn.send(None)
1474
1475 rconn.close()
1476 lconn.close()
1477
1478 lp.join()
1479 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001480"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001481#
1482#
1483#
1484
1485class _TestHeap(BaseTestCase):
1486
1487 ALLOWED_TYPES = ('processes',)
1488
1489 def test_heap(self):
1490 iterations = 5000
1491 maxblocks = 50
1492 blocks = []
1493
1494 # create and destroy lots of blocks of different sizes
1495 for i in xrange(iterations):
1496 size = int(random.lognormvariate(0, 1) * 1000)
1497 b = multiprocessing.heap.BufferWrapper(size)
1498 blocks.append(b)
1499 if len(blocks) > maxblocks:
1500 i = random.randrange(maxblocks)
1501 del blocks[i]
1502
1503 # get the heap object
1504 heap = multiprocessing.heap.BufferWrapper._heap
1505
1506 # verify the state of the heap
1507 all = []
1508 occupied = 0
1509 for L in heap._len_to_seq.values():
1510 for arena, start, stop in L:
1511 all.append((heap._arenas.index(arena), start, stop,
1512 stop-start, 'free'))
1513 for arena, start, stop in heap._allocated_blocks:
1514 all.append((heap._arenas.index(arena), start, stop,
1515 stop-start, 'occupied'))
1516 occupied += (stop-start)
1517
1518 all.sort()
1519
1520 for i in range(len(all)-1):
1521 (arena, start, stop) = all[i][:3]
1522 (narena, nstart, nstop) = all[i+1][:3]
1523 self.assertTrue((arena != narena and nstart == 0) or
1524 (stop == nstart))
1525
1526#
1527#
1528#
1529
1530try:
1531 from ctypes import Structure, Value, copy, c_int, c_double
1532except ImportError:
1533 Structure = object
1534 c_int = c_double = None
1535
1536class _Foo(Structure):
1537 _fields_ = [
1538 ('x', c_int),
1539 ('y', c_double)
1540 ]
1541
1542class _TestSharedCTypes(BaseTestCase):
1543
1544 ALLOWED_TYPES = ('processes',)
1545
1546 def _double(self, x, y, foo, arr, string):
1547 x.value *= 2
1548 y.value *= 2
1549 foo.x *= 2
1550 foo.y *= 2
1551 string.value *= 2
1552 for i in range(len(arr)):
1553 arr[i] *= 2
1554
1555 def test_sharedctypes(self, lock=False):
1556 if c_int is None:
1557 return
1558
1559 x = Value('i', 7, lock=lock)
1560 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1561 foo = Value(_Foo, 3, 2, lock=lock)
1562 arr = Array('d', range(10), lock=lock)
1563 string = Array('c', 20, lock=lock)
1564 string.value = 'hello'
1565
1566 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1567 p.start()
1568 p.join()
1569
1570 self.assertEqual(x.value, 14)
1571 self.assertAlmostEqual(y.value, 2.0/3.0)
1572 self.assertEqual(foo.x, 6)
1573 self.assertAlmostEqual(foo.y, 4.0)
1574 for i in range(10):
1575 self.assertAlmostEqual(arr[i], i*2)
1576 self.assertEqual(string.value, latin('hellohello'))
1577
1578 def test_synchronize(self):
1579 self.test_sharedctypes(lock=True)
1580
1581 def test_copy(self):
1582 if c_int is None:
1583 return
1584
1585 foo = _Foo(2, 5.0)
1586 bar = copy(foo)
1587 foo.x = 0
1588 foo.y = 0
1589 self.assertEqual(bar.x, 2)
1590 self.assertAlmostEqual(bar.y, 5.0)
1591
1592#
1593#
1594#
1595
1596class _TestFinalize(BaseTestCase):
1597
1598 ALLOWED_TYPES = ('processes',)
1599
1600 def _test_finalize(self, conn):
1601 class Foo(object):
1602 pass
1603
1604 a = Foo()
1605 util.Finalize(a, conn.send, args=('a',))
1606 del a # triggers callback for a
1607
1608 b = Foo()
1609 close_b = util.Finalize(b, conn.send, args=('b',))
1610 close_b() # triggers callback for b
1611 close_b() # does nothing because callback has already been called
1612 del b # does nothing because callback has already been called
1613
1614 c = Foo()
1615 util.Finalize(c, conn.send, args=('c',))
1616
1617 d10 = Foo()
1618 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1619
1620 d01 = Foo()
1621 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1622 d02 = Foo()
1623 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1624 d03 = Foo()
1625 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1626
1627 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1628
1629 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1630
1631 # call mutliprocessing's cleanup function then exit process without
1632 # garbage collecting locals
1633 util._exit_function()
1634 conn.close()
1635 os._exit(0)
1636
1637 def test_finalize(self):
1638 conn, child_conn = self.Pipe()
1639
1640 p = self.Process(target=self._test_finalize, args=(child_conn,))
1641 p.start()
1642 p.join()
1643
1644 result = [obj for obj in iter(conn.recv, 'STOP')]
1645 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1646
1647#
1648# Test that from ... import * works for each module
1649#
1650
1651class _TestImportStar(BaseTestCase):
1652
1653 ALLOWED_TYPES = ('processes',)
1654
1655 def test_import(self):
1656 modules = (
1657 'multiprocessing', 'multiprocessing.connection',
1658 'multiprocessing.heap', 'multiprocessing.managers',
1659 'multiprocessing.pool', 'multiprocessing.process',
1660 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1661 'multiprocessing.synchronize', 'multiprocessing.util'
1662 )
1663
1664 for name in modules:
1665 __import__(name)
1666 mod = sys.modules[name]
1667
1668 for attr in getattr(mod, '__all__', ()):
1669 self.assertTrue(
1670 hasattr(mod, attr),
1671 '%r does not have attribute %r' % (mod, attr)
1672 )
1673
1674#
1675# Quick test that logging works -- does not test logging output
1676#
1677
1678class _TestLogging(BaseTestCase):
1679
1680 ALLOWED_TYPES = ('processes',)
1681
1682 def test_enable_logging(self):
1683 logger = multiprocessing.get_logger()
1684 logger.setLevel(util.SUBWARNING)
1685 self.assertTrue(logger is not None)
1686 logger.debug('this will not be printed')
1687 logger.info('nor will this')
1688 logger.setLevel(LOG_LEVEL)
1689
1690 def _test_level(self, conn):
1691 logger = multiprocessing.get_logger()
1692 conn.send(logger.getEffectiveLevel())
1693
1694 def test_level(self):
1695 LEVEL1 = 32
1696 LEVEL2 = 37
1697
1698 logger = multiprocessing.get_logger()
1699 root_logger = logging.getLogger()
1700 root_level = root_logger.level
1701
1702 reader, writer = multiprocessing.Pipe(duplex=False)
1703
1704 logger.setLevel(LEVEL1)
1705 self.Process(target=self._test_level, args=(writer,)).start()
1706 self.assertEqual(LEVEL1, reader.recv())
1707
1708 logger.setLevel(logging.NOTSET)
1709 root_logger.setLevel(LEVEL2)
1710 self.Process(target=self._test_level, args=(writer,)).start()
1711 self.assertEqual(LEVEL2, reader.recv())
1712
1713 root_logger.setLevel(root_level)
1714 logger.setLevel(level=LOG_LEVEL)
1715
1716#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001717# Test to verify handle verification, see issue 3321
1718#
1719
1720class TestInvalidHandle(unittest.TestCase):
1721
1722 def test_invalid_handles(self):
1723 if WIN32:
1724 return
1725 conn = _multiprocessing.Connection(44977608)
1726 self.assertRaises(IOError, conn.poll)
1727 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1728#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001729# Functions used to create test cases from the base ones in this module
1730#
1731
1732def get_attributes(Source, names):
1733 d = {}
1734 for name in names:
1735 obj = getattr(Source, name)
1736 if type(obj) == type(get_attributes):
1737 obj = staticmethod(obj)
1738 d[name] = obj
1739 return d
1740
1741def create_test_cases(Mixin, type):
1742 result = {}
1743 glob = globals()
1744 Type = type[0].upper() + type[1:]
1745
1746 for name in glob.keys():
1747 if name.startswith('_Test'):
1748 base = glob[name]
1749 if type in base.ALLOWED_TYPES:
1750 newname = 'With' + Type + name[1:]
1751 class Temp(base, unittest.TestCase, Mixin):
1752 pass
1753 result[newname] = Temp
1754 Temp.__name__ = newname
1755 Temp.__module__ = Mixin.__module__
1756 return result
1757
1758#
1759# Create test cases
1760#
1761
1762class ProcessesMixin(object):
1763 TYPE = 'processes'
1764 Process = multiprocessing.Process
1765 locals().update(get_attributes(multiprocessing, (
1766 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1767 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1768 'RawArray', 'current_process', 'active_children', 'Pipe',
1769 'connection', 'JoinableQueue'
1770 )))
1771
1772testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1773globals().update(testcases_processes)
1774
1775
1776class ManagerMixin(object):
1777 TYPE = 'manager'
1778 Process = multiprocessing.Process
1779 manager = object.__new__(multiprocessing.managers.SyncManager)
1780 locals().update(get_attributes(manager, (
1781 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1782 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1783 'Namespace', 'JoinableQueue'
1784 )))
1785
1786testcases_manager = create_test_cases(ManagerMixin, type='manager')
1787globals().update(testcases_manager)
1788
1789
1790class ThreadsMixin(object):
1791 TYPE = 'threads'
1792 Process = multiprocessing.dummy.Process
1793 locals().update(get_attributes(multiprocessing.dummy, (
1794 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1795 'Condition', 'Event', 'Value', 'Array', 'current_process',
1796 'active_children', 'Pipe', 'connection', 'dict', 'list',
1797 'Namespace', 'JoinableQueue'
1798 )))
1799
1800testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1801globals().update(testcases_threads)
1802
Neal Norwitz0c519b32008-08-25 01:50:24 +00001803class OtherTest(unittest.TestCase):
1804 # TODO: add more tests for deliver/answer challenge.
1805 def test_deliver_challenge_auth_failure(self):
1806 class _FakeConnection(object):
1807 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001808 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001809 def send_bytes(self, data):
1810 pass
1811 self.assertRaises(multiprocessing.AuthenticationError,
1812 multiprocessing.connection.deliver_challenge,
1813 _FakeConnection(), b'abc')
1814
1815 def test_answer_challenge_auth_failure(self):
1816 class _FakeConnection(object):
1817 def __init__(self):
1818 self.count = 0
1819 def recv_bytes(self, size):
1820 self.count += 1
1821 if self.count == 1:
1822 return multiprocessing.connection.CHALLENGE
1823 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001824 return b'something bogus'
1825 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001826 def send_bytes(self, data):
1827 pass
1828 self.assertRaises(multiprocessing.AuthenticationError,
1829 multiprocessing.connection.answer_challenge,
1830 _FakeConnection(), b'abc')
1831
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001832testcases_other = [OtherTest, TestInvalidHandle]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001833
Benjamin Petersondfd79492008-06-13 19:13:39 +00001834#
1835#
1836#
1837
1838def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001839 if sys.platform.startswith("linux"):
1840 try:
1841 lock = multiprocessing.RLock()
1842 except OSError:
Benjamin Peterson888a39b2009-03-26 20:48:25 +00001843 from test.test_support import SkipTest
Benjamin Petersonbec087f2009-03-26 21:10:30 +00001844 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001845
Benjamin Petersondfd79492008-06-13 19:13:39 +00001846 if run is None:
1847 from test.test_support import run_unittest as run
1848
1849 util.get_temp_dir() # creates temp directory for use by all processes
1850
1851 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1852
Jesse Noller146b7ab2008-07-02 16:44:09 +00001853 ProcessesMixin.pool = multiprocessing.Pool(4)
1854 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1855 ManagerMixin.manager.__init__()
1856 ManagerMixin.manager.start()
1857 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001858
1859 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001860 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1861 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00001862 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1863 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00001864 )
1865
1866 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1867 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1868 run(suite)
1869
Jesse Noller146b7ab2008-07-02 16:44:09 +00001870 ThreadsMixin.pool.terminate()
1871 ProcessesMixin.pool.terminate()
1872 ManagerMixin.pool.terminate()
1873 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001874
Jesse Noller146b7ab2008-07-02 16:44:09 +00001875 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001876
1877def main():
1878 test_main(unittest.TextTestRunner(verbosity=2).run)
1879
1880if __name__ == '__main__':
1881 main()