blob: 58647ed0f8766204163114ec0cbb9ac97585e841 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
R. David Murray17438dc2009-07-21 17:02:14 +000020from StringIO import StringIO
Benjamin Petersondfd79492008-06-13 19:13:39 +000021
Jesse Noller37040cd2008-09-30 00:15:45 +000022
23# Work around broken sem_open implementations
24try:
25 import multiprocessing.synchronize
26except ImportError, e:
27 from test.test_support import TestSkipped
28 raise TestSkipped(e)
29
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
35import _multiprocessing
36
37from multiprocessing import util
38
39#
40#
41#
42
Benjamin Petersone79edf52008-07-13 18:34:58 +000043latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000044
Benjamin Petersondfd79492008-06-13 19:13:39 +000045#
46# Constants
47#
48
49LOG_LEVEL = util.SUBWARNING
50#LOG_LEVEL = logging.WARNING
51
52DELTA = 0.1
53CHECK_TIMINGS = False # making true makes tests take a lot longer
54 # and can sometimes cause some non-serious
55 # failures because some calls block a bit
56 # longer than expected
57if CHECK_TIMINGS:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
59else:
60 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
61
62HAVE_GETVALUE = not getattr(_multiprocessing,
63 'HAVE_BROKEN_SEM_GETVALUE', False)
64
Jesse Nollere6bab482009-03-30 16:11:16 +000065WIN32 = (sys.platform == "win32")
66
Benjamin Petersondfd79492008-06-13 19:13:39 +000067#
68# Creates a wrapper for a function which records the time it takes to finish
69#
70
71class TimingWrapper(object):
72
73 def __init__(self, func):
74 self.func = func
75 self.elapsed = None
76
77 def __call__(self, *args, **kwds):
78 t = time.time()
79 try:
80 return self.func(*args, **kwds)
81 finally:
82 self.elapsed = time.time() - t
83
84#
85# Base class for test cases
86#
87
88class BaseTestCase(object):
89
90 ALLOWED_TYPES = ('processes', 'manager', 'threads')
91
92 def assertTimingAlmostEqual(self, a, b):
93 if CHECK_TIMINGS:
94 self.assertAlmostEqual(a, b, 1)
95
96 def assertReturnsIfImplemented(self, value, func, *args):
97 try:
98 res = func(*args)
99 except NotImplementedError:
100 pass
101 else:
102 return self.assertEqual(value, res)
103
104#
105# Return the value of a semaphore
106#
107
108def get_value(self):
109 try:
110 return self.get_value()
111 except AttributeError:
112 try:
113 return self._Semaphore__value
114 except AttributeError:
115 try:
116 return self._value
117 except AttributeError:
118 raise NotImplementedError
119
120#
121# Testcases
122#
123
124class _TestProcess(BaseTestCase):
125
126 ALLOWED_TYPES = ('processes', 'threads')
127
128 def test_current(self):
129 if self.TYPE == 'threads':
130 return
131
132 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000133 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000134
135 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000136 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000137 self.assertTrue(isinstance(authkey, bytes))
138 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000139 self.assertEqual(current.ident, os.getpid())
140 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000141
142 def _test(self, q, *args, **kwds):
143 current = self.current_process()
144 q.put(args)
145 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000146 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000147 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000148 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000149 q.put(current.pid)
150
151 def test_process(self):
152 q = self.Queue(1)
153 e = self.Event()
154 args = (q, 1, 2)
155 kwargs = {'hello':23, 'bye':2.54}
156 name = 'SomeProcess'
157 p = self.Process(
158 target=self._test, args=args, kwargs=kwargs, name=name
159 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000160 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000161 current = self.current_process()
162
163 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000164 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000165 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000166 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000167 self.assertTrue(p not in self.active_children())
168 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000169 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000170
171 p.start()
172
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000173 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000174 self.assertEquals(p.is_alive(), True)
175 self.assertTrue(p in self.active_children())
176
177 self.assertEquals(q.get(), args[1:])
178 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000179 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000180 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000181 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000182 self.assertEquals(q.get(), p.pid)
183
184 p.join()
185
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000186 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000187 self.assertEquals(p.is_alive(), False)
188 self.assertTrue(p not in self.active_children())
189
190 def _test_terminate(self):
191 time.sleep(1000)
192
193 def test_terminate(self):
194 if self.TYPE == 'threads':
195 return
196
197 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000198 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 p.start()
200
201 self.assertEqual(p.is_alive(), True)
202 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204
205 p.terminate()
206
207 join = TimingWrapper(p.join)
208 self.assertEqual(join(), None)
209 self.assertTimingAlmostEqual(join.elapsed, 0.0)
210
211 self.assertEqual(p.is_alive(), False)
212 self.assertTrue(p not in self.active_children())
213
214 p.join()
215
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000216 # XXX sometimes get p.exitcode == 0 on Windows ...
217 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000218
219 def test_cpu_count(self):
220 try:
221 cpus = multiprocessing.cpu_count()
222 except NotImplementedError:
223 cpus = 1
224 self.assertTrue(type(cpus) is int)
225 self.assertTrue(cpus >= 1)
226
227 def test_active_children(self):
228 self.assertEqual(type(self.active_children()), list)
229
230 p = self.Process(target=time.sleep, args=(DELTA,))
231 self.assertTrue(p not in self.active_children())
232
233 p.start()
234 self.assertTrue(p in self.active_children())
235
236 p.join()
237 self.assertTrue(p not in self.active_children())
238
239 def _test_recursion(self, wconn, id):
240 from multiprocessing import forking
241 wconn.send(id)
242 if len(id) < 2:
243 for i in range(2):
244 p = self.Process(
245 target=self._test_recursion, args=(wconn, id+[i])
246 )
247 p.start()
248 p.join()
249
250 def test_recursion(self):
251 rconn, wconn = self.Pipe(duplex=False)
252 self._test_recursion(wconn, [])
253
254 time.sleep(DELTA)
255 result = []
256 while rconn.poll():
257 result.append(rconn.recv())
258
259 expected = [
260 [],
261 [0],
262 [0, 0],
263 [0, 1],
264 [1],
265 [1, 0],
266 [1, 1]
267 ]
268 self.assertEqual(result, expected)
269
270#
271#
272#
273
274class _UpperCaser(multiprocessing.Process):
275
276 def __init__(self):
277 multiprocessing.Process.__init__(self)
278 self.child_conn, self.parent_conn = multiprocessing.Pipe()
279
280 def run(self):
281 self.parent_conn.close()
282 for s in iter(self.child_conn.recv, None):
283 self.child_conn.send(s.upper())
284 self.child_conn.close()
285
286 def submit(self, s):
287 assert type(s) is str
288 self.parent_conn.send(s)
289 return self.parent_conn.recv()
290
291 def stop(self):
292 self.parent_conn.send(None)
293 self.parent_conn.close()
294 self.child_conn.close()
295
296class _TestSubclassingProcess(BaseTestCase):
297
298 ALLOWED_TYPES = ('processes',)
299
300 def test_subclassing(self):
301 uppercaser = _UpperCaser()
302 uppercaser.start()
303 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
304 self.assertEqual(uppercaser.submit('world'), 'WORLD')
305 uppercaser.stop()
306 uppercaser.join()
307
308#
309#
310#
311
312def queue_empty(q):
313 if hasattr(q, 'empty'):
314 return q.empty()
315 else:
316 return q.qsize() == 0
317
318def queue_full(q, maxsize):
319 if hasattr(q, 'full'):
320 return q.full()
321 else:
322 return q.qsize() == maxsize
323
324
325class _TestQueue(BaseTestCase):
326
327
328 def _test_put(self, queue, child_can_start, parent_can_continue):
329 child_can_start.wait()
330 for i in range(6):
331 queue.get()
332 parent_can_continue.set()
333
334 def test_put(self):
335 MAXSIZE = 6
336 queue = self.Queue(maxsize=MAXSIZE)
337 child_can_start = self.Event()
338 parent_can_continue = self.Event()
339
340 proc = self.Process(
341 target=self._test_put,
342 args=(queue, child_can_start, parent_can_continue)
343 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000344 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000345 proc.start()
346
347 self.assertEqual(queue_empty(queue), True)
348 self.assertEqual(queue_full(queue, MAXSIZE), False)
349
350 queue.put(1)
351 queue.put(2, True)
352 queue.put(3, True, None)
353 queue.put(4, False)
354 queue.put(5, False, None)
355 queue.put_nowait(6)
356
357 # the values may be in buffer but not yet in pipe so sleep a bit
358 time.sleep(DELTA)
359
360 self.assertEqual(queue_empty(queue), False)
361 self.assertEqual(queue_full(queue, MAXSIZE), True)
362
363 put = TimingWrapper(queue.put)
364 put_nowait = TimingWrapper(queue.put_nowait)
365
366 self.assertRaises(Queue.Full, put, 7, False)
367 self.assertTimingAlmostEqual(put.elapsed, 0)
368
369 self.assertRaises(Queue.Full, put, 7, False, None)
370 self.assertTimingAlmostEqual(put.elapsed, 0)
371
372 self.assertRaises(Queue.Full, put_nowait, 7)
373 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
374
375 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
376 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
377
378 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
379 self.assertTimingAlmostEqual(put.elapsed, 0)
380
381 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
382 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
383
384 child_can_start.set()
385 parent_can_continue.wait()
386
387 self.assertEqual(queue_empty(queue), True)
388 self.assertEqual(queue_full(queue, MAXSIZE), False)
389
390 proc.join()
391
392 def _test_get(self, queue, child_can_start, parent_can_continue):
393 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000394 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000395 queue.put(2)
396 queue.put(3)
397 queue.put(4)
398 queue.put(5)
399 parent_can_continue.set()
400
401 def test_get(self):
402 queue = self.Queue()
403 child_can_start = self.Event()
404 parent_can_continue = self.Event()
405
406 proc = self.Process(
407 target=self._test_get,
408 args=(queue, child_can_start, parent_can_continue)
409 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000410 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000411 proc.start()
412
413 self.assertEqual(queue_empty(queue), True)
414
415 child_can_start.set()
416 parent_can_continue.wait()
417
418 time.sleep(DELTA)
419 self.assertEqual(queue_empty(queue), False)
420
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000421 # Hangs unexpectedly, remove for now
422 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000423 self.assertEqual(queue.get(True, None), 2)
424 self.assertEqual(queue.get(True), 3)
425 self.assertEqual(queue.get(timeout=1), 4)
426 self.assertEqual(queue.get_nowait(), 5)
427
428 self.assertEqual(queue_empty(queue), True)
429
430 get = TimingWrapper(queue.get)
431 get_nowait = TimingWrapper(queue.get_nowait)
432
433 self.assertRaises(Queue.Empty, get, False)
434 self.assertTimingAlmostEqual(get.elapsed, 0)
435
436 self.assertRaises(Queue.Empty, get, False, None)
437 self.assertTimingAlmostEqual(get.elapsed, 0)
438
439 self.assertRaises(Queue.Empty, get_nowait)
440 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
441
442 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
443 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
444
445 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
446 self.assertTimingAlmostEqual(get.elapsed, 0)
447
448 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
449 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
450
451 proc.join()
452
453 def _test_fork(self, queue):
454 for i in range(10, 20):
455 queue.put(i)
456 # note that at this point the items may only be buffered, so the
457 # process cannot shutdown until the feeder thread has finished
458 # pushing items onto the pipe.
459
460 def test_fork(self):
461 # Old versions of Queue would fail to create a new feeder
462 # thread for a forked process if the original process had its
463 # own feeder thread. This test checks that this no longer
464 # happens.
465
466 queue = self.Queue()
467
468 # put items on queue so that main process starts a feeder thread
469 for i in range(10):
470 queue.put(i)
471
472 # wait to make sure thread starts before we fork a new process
473 time.sleep(DELTA)
474
475 # fork process
476 p = self.Process(target=self._test_fork, args=(queue,))
477 p.start()
478
479 # check that all expected items are in the queue
480 for i in range(20):
481 self.assertEqual(queue.get(), i)
482 self.assertRaises(Queue.Empty, queue.get, False)
483
484 p.join()
485
486 def test_qsize(self):
487 q = self.Queue()
488 try:
489 self.assertEqual(q.qsize(), 0)
490 except NotImplementedError:
491 return
492 q.put(1)
493 self.assertEqual(q.qsize(), 1)
494 q.put(5)
495 self.assertEqual(q.qsize(), 2)
496 q.get()
497 self.assertEqual(q.qsize(), 1)
498 q.get()
499 self.assertEqual(q.qsize(), 0)
500
501 def _test_task_done(self, q):
502 for obj in iter(q.get, None):
503 time.sleep(DELTA)
504 q.task_done()
505
506 def test_task_done(self):
507 queue = self.JoinableQueue()
508
509 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
510 return
511
512 workers = [self.Process(target=self._test_task_done, args=(queue,))
513 for i in xrange(4)]
514
515 for p in workers:
516 p.start()
517
518 for i in xrange(10):
519 queue.put(i)
520
521 queue.join()
522
523 for p in workers:
524 queue.put(None)
525
526 for p in workers:
527 p.join()
528
529#
530#
531#
532
533class _TestLock(BaseTestCase):
534
535 def test_lock(self):
536 lock = self.Lock()
537 self.assertEqual(lock.acquire(), True)
538 self.assertEqual(lock.acquire(False), False)
539 self.assertEqual(lock.release(), None)
540 self.assertRaises((ValueError, threading.ThreadError), lock.release)
541
542 def test_rlock(self):
543 lock = self.RLock()
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.acquire(), True)
547 self.assertEqual(lock.release(), None)
548 self.assertEqual(lock.release(), None)
549 self.assertEqual(lock.release(), None)
550 self.assertRaises((AssertionError, RuntimeError), lock.release)
551
Jesse Nollerf0d21c72009-03-30 23:38:36 +0000552 def test_lock_context(self):
553 with self.Lock():
554 pass
555
Benjamin Petersondfd79492008-06-13 19:13:39 +0000556
557class _TestSemaphore(BaseTestCase):
558
559 def _test_semaphore(self, sem):
560 self.assertReturnsIfImplemented(2, get_value, sem)
561 self.assertEqual(sem.acquire(), True)
562 self.assertReturnsIfImplemented(1, get_value, sem)
563 self.assertEqual(sem.acquire(), True)
564 self.assertReturnsIfImplemented(0, get_value, sem)
565 self.assertEqual(sem.acquire(False), False)
566 self.assertReturnsIfImplemented(0, get_value, sem)
567 self.assertEqual(sem.release(), None)
568 self.assertReturnsIfImplemented(1, get_value, sem)
569 self.assertEqual(sem.release(), None)
570 self.assertReturnsIfImplemented(2, get_value, sem)
571
572 def test_semaphore(self):
573 sem = self.Semaphore(2)
574 self._test_semaphore(sem)
575 self.assertEqual(sem.release(), None)
576 self.assertReturnsIfImplemented(3, get_value, sem)
577 self.assertEqual(sem.release(), None)
578 self.assertReturnsIfImplemented(4, get_value, sem)
579
580 def test_bounded_semaphore(self):
581 sem = self.BoundedSemaphore(2)
582 self._test_semaphore(sem)
583 # Currently fails on OS/X
584 #if HAVE_GETVALUE:
585 # self.assertRaises(ValueError, sem.release)
586 # self.assertReturnsIfImplemented(2, get_value, sem)
587
588 def test_timeout(self):
589 if self.TYPE != 'processes':
590 return
591
592 sem = self.Semaphore(0)
593 acquire = TimingWrapper(sem.acquire)
594
595 self.assertEqual(acquire(False), False)
596 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
597
598 self.assertEqual(acquire(False, None), False)
599 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
600
601 self.assertEqual(acquire(False, TIMEOUT1), False)
602 self.assertTimingAlmostEqual(acquire.elapsed, 0)
603
604 self.assertEqual(acquire(True, TIMEOUT2), False)
605 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
606
607 self.assertEqual(acquire(timeout=TIMEOUT3), False)
608 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
609
610
611class _TestCondition(BaseTestCase):
612
613 def f(self, cond, sleeping, woken, timeout=None):
614 cond.acquire()
615 sleeping.release()
616 cond.wait(timeout)
617 woken.release()
618 cond.release()
619
620 def check_invariant(self, cond):
621 # this is only supposed to succeed when there are no sleepers
622 if self.TYPE == 'processes':
623 try:
624 sleepers = (cond._sleeping_count.get_value() -
625 cond._woken_count.get_value())
626 self.assertEqual(sleepers, 0)
627 self.assertEqual(cond._wait_semaphore.get_value(), 0)
628 except NotImplementedError:
629 pass
630
631 def test_notify(self):
632 cond = self.Condition()
633 sleeping = self.Semaphore(0)
634 woken = self.Semaphore(0)
635
636 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000637 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000638 p.start()
639
640 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000641 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000642 p.start()
643
644 # wait for both children to start sleeping
645 sleeping.acquire()
646 sleeping.acquire()
647
648 # check no process/thread has woken up
649 time.sleep(DELTA)
650 self.assertReturnsIfImplemented(0, get_value, woken)
651
652 # wake up one process/thread
653 cond.acquire()
654 cond.notify()
655 cond.release()
656
657 # check one process/thread has woken up
658 time.sleep(DELTA)
659 self.assertReturnsIfImplemented(1, get_value, woken)
660
661 # wake up another
662 cond.acquire()
663 cond.notify()
664 cond.release()
665
666 # check other has woken up
667 time.sleep(DELTA)
668 self.assertReturnsIfImplemented(2, get_value, woken)
669
670 # check state is not mucked up
671 self.check_invariant(cond)
672 p.join()
673
674 def test_notify_all(self):
675 cond = self.Condition()
676 sleeping = self.Semaphore(0)
677 woken = self.Semaphore(0)
678
679 # start some threads/processes which will timeout
680 for i in range(3):
681 p = self.Process(target=self.f,
682 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000683 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000684 p.start()
685
686 t = threading.Thread(target=self.f,
687 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000688 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000689 t.start()
690
691 # wait for them all to sleep
692 for i in xrange(6):
693 sleeping.acquire()
694
695 # check they have all timed out
696 for i in xrange(6):
697 woken.acquire()
698 self.assertReturnsIfImplemented(0, get_value, woken)
699
700 # check state is not mucked up
701 self.check_invariant(cond)
702
703 # start some more threads/processes
704 for i in range(3):
705 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000706 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000707 p.start()
708
709 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000710 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000711 t.start()
712
713 # wait for them to all sleep
714 for i in xrange(6):
715 sleeping.acquire()
716
717 # check no process/thread has woken up
718 time.sleep(DELTA)
719 self.assertReturnsIfImplemented(0, get_value, woken)
720
721 # wake them all up
722 cond.acquire()
723 cond.notify_all()
724 cond.release()
725
726 # check they have all woken
727 time.sleep(DELTA)
728 self.assertReturnsIfImplemented(6, get_value, woken)
729
730 # check state is not mucked up
731 self.check_invariant(cond)
732
733 def test_timeout(self):
734 cond = self.Condition()
735 wait = TimingWrapper(cond.wait)
736 cond.acquire()
737 res = wait(TIMEOUT1)
738 cond.release()
739 self.assertEqual(res, None)
740 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
741
742
743class _TestEvent(BaseTestCase):
744
745 def _test_event(self, event):
746 time.sleep(TIMEOUT2)
747 event.set()
748
749 def test_event(self):
750 event = self.Event()
751 wait = TimingWrapper(event.wait)
752
753 # Removed temporaily, due to API shear, this does not
754 # work with threading._Event objects. is_set == isSet
755 #self.assertEqual(event.is_set(), False)
756
757 self.assertEqual(wait(0.0), None)
758 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
759 self.assertEqual(wait(TIMEOUT1), None)
760 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
761
762 event.set()
763
764 # See note above on the API differences
765 # self.assertEqual(event.is_set(), True)
766 self.assertEqual(wait(), None)
767 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
768 self.assertEqual(wait(TIMEOUT1), None)
769 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
770 # self.assertEqual(event.is_set(), True)
771
772 event.clear()
773
774 #self.assertEqual(event.is_set(), False)
775
776 self.Process(target=self._test_event, args=(event,)).start()
777 self.assertEqual(wait(), None)
778
779#
780#
781#
782
783class _TestValue(BaseTestCase):
784
785 codes_values = [
786 ('i', 4343, 24234),
787 ('d', 3.625, -4.25),
788 ('h', -232, 234),
789 ('c', latin('x'), latin('y'))
790 ]
791
792 def _test(self, values):
793 for sv, cv in zip(values, self.codes_values):
794 sv.value = cv[2]
795
796
797 def test_value(self, raw=False):
798 if self.TYPE != 'processes':
799 return
800
801 if raw:
802 values = [self.RawValue(code, value)
803 for code, value, _ in self.codes_values]
804 else:
805 values = [self.Value(code, value)
806 for code, value, _ in self.codes_values]
807
808 for sv, cv in zip(values, self.codes_values):
809 self.assertEqual(sv.value, cv[1])
810
811 proc = self.Process(target=self._test, args=(values,))
812 proc.start()
813 proc.join()
814
815 for sv, cv in zip(values, self.codes_values):
816 self.assertEqual(sv.value, cv[2])
817
818 def test_rawvalue(self):
819 self.test_value(raw=True)
820
821 def test_getobj_getlock(self):
822 if self.TYPE != 'processes':
823 return
824
825 val1 = self.Value('i', 5)
826 lock1 = val1.get_lock()
827 obj1 = val1.get_obj()
828
829 val2 = self.Value('i', 5, lock=None)
830 lock2 = val2.get_lock()
831 obj2 = val2.get_obj()
832
833 lock = self.Lock()
834 val3 = self.Value('i', 5, lock=lock)
835 lock3 = val3.get_lock()
836 obj3 = val3.get_obj()
837 self.assertEqual(lock, lock3)
838
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000839 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000840 self.assertFalse(hasattr(arr4, 'get_lock'))
841 self.assertFalse(hasattr(arr4, 'get_obj'))
842
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000843 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
844
845 arr5 = self.RawValue('i', 5)
846 self.assertFalse(hasattr(arr5, 'get_lock'))
847 self.assertFalse(hasattr(arr5, 'get_obj'))
848
Benjamin Petersondfd79492008-06-13 19:13:39 +0000849
850class _TestArray(BaseTestCase):
851
852 def f(self, seq):
853 for i in range(1, len(seq)):
854 seq[i] += seq[i-1]
855
856 def test_array(self, raw=False):
857 if self.TYPE != 'processes':
858 return
859
860 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
861 if raw:
862 arr = self.RawArray('i', seq)
863 else:
864 arr = self.Array('i', seq)
865
866 self.assertEqual(len(arr), len(seq))
867 self.assertEqual(arr[3], seq[3])
868 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
869
870 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
871
872 self.assertEqual(list(arr[:]), seq)
873
874 self.f(seq)
875
876 p = self.Process(target=self.f, args=(arr,))
877 p.start()
878 p.join()
879
880 self.assertEqual(list(arr[:]), seq)
881
882 def test_rawarray(self):
883 self.test_array(raw=True)
884
885 def test_getobj_getlock_obj(self):
886 if self.TYPE != 'processes':
887 return
888
889 arr1 = self.Array('i', range(10))
890 lock1 = arr1.get_lock()
891 obj1 = arr1.get_obj()
892
893 arr2 = self.Array('i', range(10), lock=None)
894 lock2 = arr2.get_lock()
895 obj2 = arr2.get_obj()
896
897 lock = self.Lock()
898 arr3 = self.Array('i', range(10), lock=lock)
899 lock3 = arr3.get_lock()
900 obj3 = arr3.get_obj()
901 self.assertEqual(lock, lock3)
902
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000903 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000904 self.assertFalse(hasattr(arr4, 'get_lock'))
905 self.assertFalse(hasattr(arr4, 'get_obj'))
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000906 self.assertRaises(AttributeError,
907 self.Array, 'i', range(10), lock='notalock')
908
909 arr5 = self.RawArray('i', range(10))
910 self.assertFalse(hasattr(arr5, 'get_lock'))
911 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000912
913#
914#
915#
916
917class _TestContainers(BaseTestCase):
918
919 ALLOWED_TYPES = ('manager',)
920
921 def test_list(self):
922 a = self.list(range(10))
923 self.assertEqual(a[:], range(10))
924
925 b = self.list()
926 self.assertEqual(b[:], [])
927
928 b.extend(range(5))
929 self.assertEqual(b[:], range(5))
930
931 self.assertEqual(b[2], 2)
932 self.assertEqual(b[2:10], [2,3,4])
933
934 b *= 2
935 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
936
937 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
938
939 self.assertEqual(a[:], range(10))
940
941 d = [a, b]
942 e = self.list(d)
943 self.assertEqual(
944 e[:],
945 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
946 )
947
948 f = self.list([a])
949 a.append('hello')
950 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
951
952 def test_dict(self):
953 d = self.dict()
954 indices = range(65, 70)
955 for i in indices:
956 d[i] = chr(i)
957 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
958 self.assertEqual(sorted(d.keys()), indices)
959 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
960 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
961
962 def test_namespace(self):
963 n = self.Namespace()
964 n.name = 'Bob'
965 n.job = 'Builder'
966 n._hidden = 'hidden'
967 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
968 del n.job
969 self.assertEqual(str(n), "Namespace(name='Bob')")
970 self.assertTrue(hasattr(n, 'name'))
971 self.assertTrue(not hasattr(n, 'job'))
972
973#
974#
975#
976
977def sqr(x, wait=0.0):
978 time.sleep(wait)
979 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000980class _TestPool(BaseTestCase):
981
982 def test_apply(self):
983 papply = self.pool.apply
984 self.assertEqual(papply(sqr, (5,)), sqr(5))
985 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
986
987 def test_map(self):
988 pmap = self.pool.map
989 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
990 self.assertEqual(pmap(sqr, range(100), chunksize=20),
991 map(sqr, range(100)))
992
993 def test_async(self):
994 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
995 get = TimingWrapper(res.get)
996 self.assertEqual(get(), 49)
997 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
998
999 def test_async_timeout(self):
1000 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1001 get = TimingWrapper(res.get)
1002 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1003 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1004
1005 def test_imap(self):
1006 it = self.pool.imap(sqr, range(10))
1007 self.assertEqual(list(it), map(sqr, range(10)))
1008
1009 it = self.pool.imap(sqr, range(10))
1010 for i in range(10):
1011 self.assertEqual(it.next(), i*i)
1012 self.assertRaises(StopIteration, it.next)
1013
1014 it = self.pool.imap(sqr, range(1000), chunksize=100)
1015 for i in range(1000):
1016 self.assertEqual(it.next(), i*i)
1017 self.assertRaises(StopIteration, it.next)
1018
1019 def test_imap_unordered(self):
1020 it = self.pool.imap_unordered(sqr, range(1000))
1021 self.assertEqual(sorted(it), map(sqr, range(1000)))
1022
1023 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1024 self.assertEqual(sorted(it), map(sqr, range(1000)))
1025
1026 def test_make_pool(self):
1027 p = multiprocessing.Pool(3)
1028 self.assertEqual(3, len(p._pool))
1029 p.close()
1030 p.join()
1031
1032 def test_terminate(self):
1033 if self.TYPE == 'manager':
1034 # On Unix a forked process increfs each shared object to
1035 # which its parent process held a reference. If the
1036 # forked process gets terminated then there is likely to
1037 # be a reference leak. So to prevent
1038 # _TestZZZNumberOfObjects from failing we skip this test
1039 # when using a manager.
1040 return
1041
1042 result = self.pool.map_async(
1043 time.sleep, [0.1 for i in range(10000)], chunksize=1
1044 )
1045 self.pool.terminate()
1046 join = TimingWrapper(self.pool.join)
1047 join()
1048 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001049#
1050# Test that manager has expected number of shared objects left
1051#
1052
1053class _TestZZZNumberOfObjects(BaseTestCase):
1054 # Because test cases are sorted alphabetically, this one will get
1055 # run after all the other tests for the manager. It tests that
1056 # there have been no "reference leaks" for the manager's shared
1057 # objects. Note the comment in _TestPool.test_terminate().
1058 ALLOWED_TYPES = ('manager',)
1059
1060 def test_number_of_objects(self):
1061 EXPECTED_NUMBER = 1 # the pool object is still alive
1062 multiprocessing.active_children() # discard dead process objs
1063 gc.collect() # do garbage collection
1064 refs = self.manager._number_of_objects()
1065 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001066 print self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001067
1068 self.assertEqual(refs, EXPECTED_NUMBER)
1069
1070#
1071# Test of creating a customized manager class
1072#
1073
1074from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1075
1076class FooBar(object):
1077 def f(self):
1078 return 'f()'
1079 def g(self):
1080 raise ValueError
1081 def _h(self):
1082 return '_h()'
1083
1084def baz():
1085 for i in xrange(10):
1086 yield i*i
1087
1088class IteratorProxy(BaseProxy):
1089 _exposed_ = ('next', '__next__')
1090 def __iter__(self):
1091 return self
1092 def next(self):
1093 return self._callmethod('next')
1094 def __next__(self):
1095 return self._callmethod('__next__')
1096
1097class MyManager(BaseManager):
1098 pass
1099
1100MyManager.register('Foo', callable=FooBar)
1101MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1102MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1103
1104
1105class _TestMyManager(BaseTestCase):
1106
1107 ALLOWED_TYPES = ('manager',)
1108
1109 def test_mymanager(self):
1110 manager = MyManager()
1111 manager.start()
1112
1113 foo = manager.Foo()
1114 bar = manager.Bar()
1115 baz = manager.baz()
1116
1117 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1118 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1119
1120 self.assertEqual(foo_methods, ['f', 'g'])
1121 self.assertEqual(bar_methods, ['f', '_h'])
1122
1123 self.assertEqual(foo.f(), 'f()')
1124 self.assertRaises(ValueError, foo.g)
1125 self.assertEqual(foo._callmethod('f'), 'f()')
1126 self.assertRaises(RemoteError, foo._callmethod, '_h')
1127
1128 self.assertEqual(bar.f(), 'f()')
1129 self.assertEqual(bar._h(), '_h()')
1130 self.assertEqual(bar._callmethod('f'), 'f()')
1131 self.assertEqual(bar._callmethod('_h'), '_h()')
1132
1133 self.assertEqual(list(baz), [i*i for i in range(10)])
1134
1135 manager.shutdown()
1136
1137#
1138# Test of connecting to a remote server and using xmlrpclib for serialization
1139#
1140
1141_queue = Queue.Queue()
1142def get_queue():
1143 return _queue
1144
1145class QueueManager(BaseManager):
1146 '''manager class used by server process'''
1147QueueManager.register('get_queue', callable=get_queue)
1148
1149class QueueManager2(BaseManager):
1150 '''manager class which specifies the same interface as QueueManager'''
1151QueueManager2.register('get_queue')
1152
1153
1154SERIALIZER = 'xmlrpclib'
1155
1156class _TestRemoteManager(BaseTestCase):
1157
1158 ALLOWED_TYPES = ('manager',)
1159
1160 def _putter(self, address, authkey):
1161 manager = QueueManager2(
1162 address=address, authkey=authkey, serializer=SERIALIZER
1163 )
1164 manager.connect()
1165 queue = manager.get_queue()
1166 queue.put(('hello world', None, True, 2.25))
1167
1168 def test_remote(self):
1169 authkey = os.urandom(32)
1170
1171 manager = QueueManager(
1172 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1173 )
1174 manager.start()
1175
1176 p = self.Process(target=self._putter, args=(manager.address, authkey))
1177 p.start()
1178
1179 manager2 = QueueManager2(
1180 address=manager.address, authkey=authkey, serializer=SERIALIZER
1181 )
1182 manager2.connect()
1183 queue = manager2.get_queue()
1184
1185 # Note that xmlrpclib will deserialize object as a list not a tuple
1186 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1187
1188 # Because we are using xmlrpclib for serialization instead of
1189 # pickle this will cause a serialization error.
1190 self.assertRaises(Exception, queue.put, time.sleep)
1191
1192 # Make queue finalizer run before the server is stopped
1193 del queue
1194 manager.shutdown()
1195
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001196class _TestManagerRestart(BaseTestCase):
1197
1198 def _putter(self, address, authkey):
1199 manager = QueueManager(
1200 address=address, authkey=authkey, serializer=SERIALIZER)
1201 manager.connect()
1202 queue = manager.get_queue()
1203 queue.put('hello world')
1204
1205 def test_rapid_restart(self):
1206 authkey = os.urandom(32)
1207 manager = QueueManager(
1208 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1209 manager.start()
1210
1211 p = self.Process(target=self._putter, args=(manager.address, authkey))
1212 p.start()
1213 queue = manager.get_queue()
1214 self.assertEqual(queue.get(), 'hello world')
Jesse Nollerd5ac4442009-03-30 21:57:36 +00001215 del queue
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001216 manager.shutdown()
1217 manager = QueueManager(
1218 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1219 manager.start()
Jesse Nollerd5ac4442009-03-30 21:57:36 +00001220 manager.shutdown()
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001221
Benjamin Petersondfd79492008-06-13 19:13:39 +00001222#
1223#
1224#
1225
1226SENTINEL = latin('')
1227
1228class _TestConnection(BaseTestCase):
1229
1230 ALLOWED_TYPES = ('processes', 'threads')
1231
1232 def _echo(self, conn):
1233 for msg in iter(conn.recv_bytes, SENTINEL):
1234 conn.send_bytes(msg)
1235 conn.close()
1236
1237 def test_connection(self):
1238 conn, child_conn = self.Pipe()
1239
1240 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001241 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001242 p.start()
1243
1244 seq = [1, 2.25, None]
1245 msg = latin('hello world')
1246 longmsg = msg * 10
1247 arr = array.array('i', range(4))
1248
1249 if self.TYPE == 'processes':
1250 self.assertEqual(type(conn.fileno()), int)
1251
1252 self.assertEqual(conn.send(seq), None)
1253 self.assertEqual(conn.recv(), seq)
1254
1255 self.assertEqual(conn.send_bytes(msg), None)
1256 self.assertEqual(conn.recv_bytes(), msg)
1257
1258 if self.TYPE == 'processes':
1259 buffer = array.array('i', [0]*10)
1260 expected = list(arr) + [0] * (10 - len(arr))
1261 self.assertEqual(conn.send_bytes(arr), None)
1262 self.assertEqual(conn.recv_bytes_into(buffer),
1263 len(arr) * buffer.itemsize)
1264 self.assertEqual(list(buffer), expected)
1265
1266 buffer = array.array('i', [0]*10)
1267 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1268 self.assertEqual(conn.send_bytes(arr), None)
1269 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1270 len(arr) * buffer.itemsize)
1271 self.assertEqual(list(buffer), expected)
1272
1273 buffer = bytearray(latin(' ' * 40))
1274 self.assertEqual(conn.send_bytes(longmsg), None)
1275 try:
1276 res = conn.recv_bytes_into(buffer)
1277 except multiprocessing.BufferTooShort, e:
1278 self.assertEqual(e.args, (longmsg,))
1279 else:
1280 self.fail('expected BufferTooShort, got %s' % res)
1281
1282 poll = TimingWrapper(conn.poll)
1283
1284 self.assertEqual(poll(), False)
1285 self.assertTimingAlmostEqual(poll.elapsed, 0)
1286
1287 self.assertEqual(poll(TIMEOUT1), False)
1288 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1289
1290 conn.send(None)
1291
1292 self.assertEqual(poll(TIMEOUT1), True)
1293 self.assertTimingAlmostEqual(poll.elapsed, 0)
1294
1295 self.assertEqual(conn.recv(), None)
1296
1297 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1298 conn.send_bytes(really_big_msg)
1299 self.assertEqual(conn.recv_bytes(), really_big_msg)
1300
1301 conn.send_bytes(SENTINEL) # tell child to quit
1302 child_conn.close()
1303
1304 if self.TYPE == 'processes':
1305 self.assertEqual(conn.readable, True)
1306 self.assertEqual(conn.writable, True)
1307 self.assertRaises(EOFError, conn.recv)
1308 self.assertRaises(EOFError, conn.recv_bytes)
1309
1310 p.join()
1311
1312 def test_duplex_false(self):
1313 reader, writer = self.Pipe(duplex=False)
1314 self.assertEqual(writer.send(1), None)
1315 self.assertEqual(reader.recv(), 1)
1316 if self.TYPE == 'processes':
1317 self.assertEqual(reader.readable, True)
1318 self.assertEqual(reader.writable, False)
1319 self.assertEqual(writer.readable, False)
1320 self.assertEqual(writer.writable, True)
1321 self.assertRaises(IOError, reader.send, 2)
1322 self.assertRaises(IOError, writer.recv)
1323 self.assertRaises(IOError, writer.poll)
1324
1325 def test_spawn_close(self):
1326 # We test that a pipe connection can be closed by parent
1327 # process immediately after child is spawned. On Windows this
1328 # would have sometimes failed on old versions because
1329 # child_conn would be closed before the child got a chance to
1330 # duplicate it.
1331 conn, child_conn = self.Pipe()
1332
1333 p = self.Process(target=self._echo, args=(child_conn,))
1334 p.start()
1335 child_conn.close() # this might complete before child initializes
1336
1337 msg = latin('hello')
1338 conn.send_bytes(msg)
1339 self.assertEqual(conn.recv_bytes(), msg)
1340
1341 conn.send_bytes(SENTINEL)
1342 conn.close()
1343 p.join()
1344
1345 def test_sendbytes(self):
1346 if self.TYPE != 'processes':
1347 return
1348
1349 msg = latin('abcdefghijklmnopqrstuvwxyz')
1350 a, b = self.Pipe()
1351
1352 a.send_bytes(msg)
1353 self.assertEqual(b.recv_bytes(), msg)
1354
1355 a.send_bytes(msg, 5)
1356 self.assertEqual(b.recv_bytes(), msg[5:])
1357
1358 a.send_bytes(msg, 7, 8)
1359 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1360
1361 a.send_bytes(msg, 26)
1362 self.assertEqual(b.recv_bytes(), latin(''))
1363
1364 a.send_bytes(msg, 26, 0)
1365 self.assertEqual(b.recv_bytes(), latin(''))
1366
1367 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1368
1369 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1370
1371 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1372
1373 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1374
1375 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1376
Benjamin Petersondfd79492008-06-13 19:13:39 +00001377class _TestListenerClient(BaseTestCase):
1378
1379 ALLOWED_TYPES = ('processes', 'threads')
1380
1381 def _test(self, address):
1382 conn = self.connection.Client(address)
1383 conn.send('hello')
1384 conn.close()
1385
1386 def test_listener_client(self):
1387 for family in self.connection.families:
1388 l = self.connection.Listener(family=family)
1389 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001390 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001391 p.start()
1392 conn = l.accept()
1393 self.assertEqual(conn.recv(), 'hello')
1394 p.join()
1395 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001396#
1397# Test of sending connection and socket objects between processes
1398#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001399"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001400class _TestPicklingConnections(BaseTestCase):
1401
1402 ALLOWED_TYPES = ('processes',)
1403
1404 def _listener(self, conn, families):
1405 for fam in families:
1406 l = self.connection.Listener(family=fam)
1407 conn.send(l.address)
1408 new_conn = l.accept()
1409 conn.send(new_conn)
1410
1411 if self.TYPE == 'processes':
1412 l = socket.socket()
1413 l.bind(('localhost', 0))
1414 conn.send(l.getsockname())
1415 l.listen(1)
1416 new_conn, addr = l.accept()
1417 conn.send(new_conn)
1418
1419 conn.recv()
1420
1421 def _remote(self, conn):
1422 for (address, msg) in iter(conn.recv, None):
1423 client = self.connection.Client(address)
1424 client.send(msg.upper())
1425 client.close()
1426
1427 if self.TYPE == 'processes':
1428 address, msg = conn.recv()
1429 client = socket.socket()
1430 client.connect(address)
1431 client.sendall(msg.upper())
1432 client.close()
1433
1434 conn.close()
1435
1436 def test_pickling(self):
1437 try:
1438 multiprocessing.allow_connection_pickling()
1439 except ImportError:
1440 return
1441
1442 families = self.connection.families
1443
1444 lconn, lconn0 = self.Pipe()
1445 lp = self.Process(target=self._listener, args=(lconn0, families))
1446 lp.start()
1447 lconn0.close()
1448
1449 rconn, rconn0 = self.Pipe()
1450 rp = self.Process(target=self._remote, args=(rconn0,))
1451 rp.start()
1452 rconn0.close()
1453
1454 for fam in families:
1455 msg = ('This connection uses family %s' % fam).encode('ascii')
1456 address = lconn.recv()
1457 rconn.send((address, msg))
1458 new_conn = lconn.recv()
1459 self.assertEqual(new_conn.recv(), msg.upper())
1460
1461 rconn.send(None)
1462
1463 if self.TYPE == 'processes':
1464 msg = latin('This connection uses a normal socket')
1465 address = lconn.recv()
1466 rconn.send((address, msg))
1467 if hasattr(socket, 'fromfd'):
1468 new_conn = lconn.recv()
1469 self.assertEqual(new_conn.recv(100), msg.upper())
1470 else:
1471 # XXX On Windows with Py2.6 need to backport fromfd()
1472 discard = lconn.recv_bytes()
1473
1474 lconn.send(None)
1475
1476 rconn.close()
1477 lconn.close()
1478
1479 lp.join()
1480 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001481"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001482#
1483#
1484#
1485
1486class _TestHeap(BaseTestCase):
1487
1488 ALLOWED_TYPES = ('processes',)
1489
1490 def test_heap(self):
1491 iterations = 5000
1492 maxblocks = 50
1493 blocks = []
1494
1495 # create and destroy lots of blocks of different sizes
1496 for i in xrange(iterations):
1497 size = int(random.lognormvariate(0, 1) * 1000)
1498 b = multiprocessing.heap.BufferWrapper(size)
1499 blocks.append(b)
1500 if len(blocks) > maxblocks:
1501 i = random.randrange(maxblocks)
1502 del blocks[i]
1503
1504 # get the heap object
1505 heap = multiprocessing.heap.BufferWrapper._heap
1506
1507 # verify the state of the heap
1508 all = []
1509 occupied = 0
1510 for L in heap._len_to_seq.values():
1511 for arena, start, stop in L:
1512 all.append((heap._arenas.index(arena), start, stop,
1513 stop-start, 'free'))
1514 for arena, start, stop in heap._allocated_blocks:
1515 all.append((heap._arenas.index(arena), start, stop,
1516 stop-start, 'occupied'))
1517 occupied += (stop-start)
1518
1519 all.sort()
1520
1521 for i in range(len(all)-1):
1522 (arena, start, stop) = all[i][:3]
1523 (narena, nstart, nstop) = all[i+1][:3]
1524 self.assertTrue((arena != narena and nstart == 0) or
1525 (stop == nstart))
1526
1527#
1528#
1529#
1530
1531try:
1532 from ctypes import Structure, Value, copy, c_int, c_double
1533except ImportError:
1534 Structure = object
1535 c_int = c_double = None
1536
1537class _Foo(Structure):
1538 _fields_ = [
1539 ('x', c_int),
1540 ('y', c_double)
1541 ]
1542
1543class _TestSharedCTypes(BaseTestCase):
1544
1545 ALLOWED_TYPES = ('processes',)
1546
1547 def _double(self, x, y, foo, arr, string):
1548 x.value *= 2
1549 y.value *= 2
1550 foo.x *= 2
1551 foo.y *= 2
1552 string.value *= 2
1553 for i in range(len(arr)):
1554 arr[i] *= 2
1555
1556 def test_sharedctypes(self, lock=False):
1557 if c_int is None:
1558 return
1559
1560 x = Value('i', 7, lock=lock)
1561 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1562 foo = Value(_Foo, 3, 2, lock=lock)
1563 arr = Array('d', range(10), lock=lock)
1564 string = Array('c', 20, lock=lock)
1565 string.value = 'hello'
1566
1567 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1568 p.start()
1569 p.join()
1570
1571 self.assertEqual(x.value, 14)
1572 self.assertAlmostEqual(y.value, 2.0/3.0)
1573 self.assertEqual(foo.x, 6)
1574 self.assertAlmostEqual(foo.y, 4.0)
1575 for i in range(10):
1576 self.assertAlmostEqual(arr[i], i*2)
1577 self.assertEqual(string.value, latin('hellohello'))
1578
1579 def test_synchronize(self):
1580 self.test_sharedctypes(lock=True)
1581
1582 def test_copy(self):
1583 if c_int is None:
1584 return
1585
1586 foo = _Foo(2, 5.0)
1587 bar = copy(foo)
1588 foo.x = 0
1589 foo.y = 0
1590 self.assertEqual(bar.x, 2)
1591 self.assertAlmostEqual(bar.y, 5.0)
1592
1593#
1594#
1595#
1596
1597class _TestFinalize(BaseTestCase):
1598
1599 ALLOWED_TYPES = ('processes',)
1600
1601 def _test_finalize(self, conn):
1602 class Foo(object):
1603 pass
1604
1605 a = Foo()
1606 util.Finalize(a, conn.send, args=('a',))
1607 del a # triggers callback for a
1608
1609 b = Foo()
1610 close_b = util.Finalize(b, conn.send, args=('b',))
1611 close_b() # triggers callback for b
1612 close_b() # does nothing because callback has already been called
1613 del b # does nothing because callback has already been called
1614
1615 c = Foo()
1616 util.Finalize(c, conn.send, args=('c',))
1617
1618 d10 = Foo()
1619 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1620
1621 d01 = Foo()
1622 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1623 d02 = Foo()
1624 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1625 d03 = Foo()
1626 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1627
1628 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1629
1630 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1631
1632 # call mutliprocessing's cleanup function then exit process without
1633 # garbage collecting locals
1634 util._exit_function()
1635 conn.close()
1636 os._exit(0)
1637
1638 def test_finalize(self):
1639 conn, child_conn = self.Pipe()
1640
1641 p = self.Process(target=self._test_finalize, args=(child_conn,))
1642 p.start()
1643 p.join()
1644
1645 result = [obj for obj in iter(conn.recv, 'STOP')]
1646 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1647
1648#
1649# Test that from ... import * works for each module
1650#
1651
1652class _TestImportStar(BaseTestCase):
1653
1654 ALLOWED_TYPES = ('processes',)
1655
1656 def test_import(self):
1657 modules = (
1658 'multiprocessing', 'multiprocessing.connection',
1659 'multiprocessing.heap', 'multiprocessing.managers',
1660 'multiprocessing.pool', 'multiprocessing.process',
1661 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1662 'multiprocessing.synchronize', 'multiprocessing.util'
1663 )
1664
1665 for name in modules:
1666 __import__(name)
1667 mod = sys.modules[name]
1668
1669 for attr in getattr(mod, '__all__', ()):
1670 self.assertTrue(
1671 hasattr(mod, attr),
1672 '%r does not have attribute %r' % (mod, attr)
1673 )
1674
1675#
1676# Quick test that logging works -- does not test logging output
1677#
1678
1679class _TestLogging(BaseTestCase):
1680
1681 ALLOWED_TYPES = ('processes',)
1682
1683 def test_enable_logging(self):
1684 logger = multiprocessing.get_logger()
1685 logger.setLevel(util.SUBWARNING)
1686 self.assertTrue(logger is not None)
1687 logger.debug('this will not be printed')
1688 logger.info('nor will this')
1689 logger.setLevel(LOG_LEVEL)
1690
1691 def _test_level(self, conn):
1692 logger = multiprocessing.get_logger()
1693 conn.send(logger.getEffectiveLevel())
1694
1695 def test_level(self):
1696 LEVEL1 = 32
1697 LEVEL2 = 37
1698
1699 logger = multiprocessing.get_logger()
1700 root_logger = logging.getLogger()
1701 root_level = root_logger.level
1702
1703 reader, writer = multiprocessing.Pipe(duplex=False)
1704
1705 logger.setLevel(LEVEL1)
1706 self.Process(target=self._test_level, args=(writer,)).start()
1707 self.assertEqual(LEVEL1, reader.recv())
1708
1709 logger.setLevel(logging.NOTSET)
1710 root_logger.setLevel(LEVEL2)
1711 self.Process(target=self._test_level, args=(writer,)).start()
1712 self.assertEqual(LEVEL2, reader.recv())
1713
1714 root_logger.setLevel(root_level)
1715 logger.setLevel(level=LOG_LEVEL)
1716
1717#
Jesse Nollere6bab482009-03-30 16:11:16 +00001718# Test to verify handle verification, see issue 3321
1719#
1720
1721class TestInvalidHandle(unittest.TestCase):
1722
1723 def test_invalid_handles(self):
1724 if WIN32:
1725 return
1726 conn = _multiprocessing.Connection(44977608)
1727 self.assertRaises(IOError, conn.poll)
1728 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1729#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001730# Functions used to create test cases from the base ones in this module
1731#
1732
1733def get_attributes(Source, names):
1734 d = {}
1735 for name in names:
1736 obj = getattr(Source, name)
1737 if type(obj) == type(get_attributes):
1738 obj = staticmethod(obj)
1739 d[name] = obj
1740 return d
1741
1742def create_test_cases(Mixin, type):
1743 result = {}
1744 glob = globals()
1745 Type = type[0].upper() + type[1:]
1746
1747 for name in glob.keys():
1748 if name.startswith('_Test'):
1749 base = glob[name]
1750 if type in base.ALLOWED_TYPES:
1751 newname = 'With' + Type + name[1:]
1752 class Temp(base, unittest.TestCase, Mixin):
1753 pass
1754 result[newname] = Temp
1755 Temp.__name__ = newname
1756 Temp.__module__ = Mixin.__module__
1757 return result
1758
1759#
1760# Create test cases
1761#
1762
1763class ProcessesMixin(object):
1764 TYPE = 'processes'
1765 Process = multiprocessing.Process
1766 locals().update(get_attributes(multiprocessing, (
1767 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1768 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1769 'RawArray', 'current_process', 'active_children', 'Pipe',
1770 'connection', 'JoinableQueue'
1771 )))
1772
1773testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1774globals().update(testcases_processes)
1775
1776
1777class ManagerMixin(object):
1778 TYPE = 'manager'
1779 Process = multiprocessing.Process
1780 manager = object.__new__(multiprocessing.managers.SyncManager)
1781 locals().update(get_attributes(manager, (
1782 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1783 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1784 'Namespace', 'JoinableQueue'
1785 )))
1786
1787testcases_manager = create_test_cases(ManagerMixin, type='manager')
1788globals().update(testcases_manager)
1789
1790
1791class ThreadsMixin(object):
1792 TYPE = 'threads'
1793 Process = multiprocessing.dummy.Process
1794 locals().update(get_attributes(multiprocessing.dummy, (
1795 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1796 'Condition', 'Event', 'Value', 'Array', 'current_process',
1797 'active_children', 'Pipe', 'connection', 'dict', 'list',
1798 'Namespace', 'JoinableQueue'
1799 )))
1800
1801testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1802globals().update(testcases_threads)
1803
Neal Norwitz0c519b32008-08-25 01:50:24 +00001804class OtherTest(unittest.TestCase):
1805 # TODO: add more tests for deliver/answer challenge.
1806 def test_deliver_challenge_auth_failure(self):
1807 class _FakeConnection(object):
1808 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001809 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001810 def send_bytes(self, data):
1811 pass
1812 self.assertRaises(multiprocessing.AuthenticationError,
1813 multiprocessing.connection.deliver_challenge,
1814 _FakeConnection(), b'abc')
1815
1816 def test_answer_challenge_auth_failure(self):
1817 class _FakeConnection(object):
1818 def __init__(self):
1819 self.count = 0
1820 def recv_bytes(self, size):
1821 self.count += 1
1822 if self.count == 1:
1823 return multiprocessing.connection.CHALLENGE
1824 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001825 return b'something bogus'
1826 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001827 def send_bytes(self, data):
1828 pass
1829 self.assertRaises(multiprocessing.AuthenticationError,
1830 multiprocessing.connection.answer_challenge,
1831 _FakeConnection(), b'abc')
1832
R. David Murray17438dc2009-07-21 17:02:14 +00001833#
1834# Issue 5155, 5313, 5331: Test process in processes
1835# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1836#
1837
1838def _ThisSubProcess(q):
1839 try:
1840 item = q.get(block=False)
1841 except Queue.Empty:
1842 pass
1843
1844def _TestProcess(q):
1845 queue = multiprocessing.Queue()
1846 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1847 subProc.start()
1848 subProc.join()
1849
1850def _afunc(x):
1851 return x*x
1852
1853def pool_in_process():
1854 pool = multiprocessing.Pool(processes=4)
1855 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1856
1857class _file_like(object):
1858 def __init__(self, delegate):
1859 self._delegate = delegate
1860 self._pid = None
1861
1862 @property
1863 def cache(self):
1864 pid = os.getpid()
1865 # There are no race conditions since fork keeps only the running thread
1866 if pid != self._pid:
1867 self._pid = pid
1868 self._cache = []
1869 return self._cache
1870
1871 def write(self, data):
1872 self.cache.append(data)
1873
1874 def flush(self):
1875 self._delegate.write(''.join(self.cache))
1876 self._cache = []
1877
1878class TestStdinBadfiledescriptor(unittest.TestCase):
1879
1880 def test_queue_in_process(self):
1881 queue = multiprocessing.Queue()
1882 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1883 proc.start()
1884 proc.join()
1885
1886 def test_pool_in_process(self):
1887 p = multiprocessing.Process(target=pool_in_process)
1888 p.start()
1889 p.join()
1890
1891 def test_flushing(self):
1892 sio = StringIO()
1893 flike = _file_like(sio)
1894 flike.write('foo')
1895 proc = multiprocessing.Process(target=lambda: flike.flush())
1896 flike.flush()
1897 assert sio.getvalue() == 'foo'
1898
1899testcases_other = [OtherTest, TestInvalidHandle, TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001900
Benjamin Petersondfd79492008-06-13 19:13:39 +00001901#
1902#
1903#
1904
1905def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001906 if sys.platform.startswith("linux"):
1907 try:
1908 lock = multiprocessing.RLock()
1909 except OSError:
1910 from test.test_support import TestSkipped
1911 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001912
Benjamin Petersondfd79492008-06-13 19:13:39 +00001913 if run is None:
1914 from test.test_support import run_unittest as run
1915
1916 util.get_temp_dir() # creates temp directory for use by all processes
1917
1918 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1919
Jesse Noller146b7ab2008-07-02 16:44:09 +00001920 ProcessesMixin.pool = multiprocessing.Pool(4)
1921 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1922 ManagerMixin.manager.__init__()
1923 ManagerMixin.manager.start()
1924 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001925
1926 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001927 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1928 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00001929 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1930 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00001931 )
1932
1933 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1934 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1935 run(suite)
1936
Jesse Noller146b7ab2008-07-02 16:44:09 +00001937 ThreadsMixin.pool.terminate()
1938 ProcessesMixin.pool.terminate()
1939 ManagerMixin.pool.terminate()
1940 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001941
Jesse Noller146b7ab2008-07-02 16:44:09 +00001942 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001943
1944def main():
1945 test_main(unittest.TextTestRunner(verbosity=2).run)
1946
1947if __name__ == '__main__':
1948 main()