blob: 9a307dee7ac0a3cca63658243782a3721c65c907 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
22# Work around broken sem_open implementations
23try:
24 import multiprocessing.synchronize
25except ImportError as e:
26 from test.test_support import TestSkipped
27 raise TestSkipped(e)
28
Benjamin Petersone711caf2008-06-11 16:44:04 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.pool
34import _multiprocessing
35
36from multiprocessing import util
37
38#
39#
40#
41
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000042def latin(s):
43 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000044
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46# Constants
47#
48
49LOG_LEVEL = util.SUBWARNING
50#LOG_LEVEL = logging.WARNING
51
52DELTA = 0.1
53CHECK_TIMINGS = False # making true makes tests take a lot longer
54 # and can sometimes cause some non-serious
55 # failures because some calls block a bit
56 # longer than expected
57if CHECK_TIMINGS:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
59else:
60 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
61
62HAVE_GETVALUE = not getattr(_multiprocessing,
63 'HAVE_BROKEN_SEM_GETVALUE', False)
64
Jesse Noller6214edd2009-01-19 16:23:53 +000065WIN32 = (sys.platform == "win32")
66
Benjamin Petersone711caf2008-06-11 16:44:04 +000067#
68# Creates a wrapper for a function which records the time it takes to finish
69#
70
71class TimingWrapper(object):
72
73 def __init__(self, func):
74 self.func = func
75 self.elapsed = None
76
77 def __call__(self, *args, **kwds):
78 t = time.time()
79 try:
80 return self.func(*args, **kwds)
81 finally:
82 self.elapsed = time.time() - t
83
84#
85# Base class for test cases
86#
87
88class BaseTestCase(object):
89
90 ALLOWED_TYPES = ('processes', 'manager', 'threads')
91
92 def assertTimingAlmostEqual(self, a, b):
93 if CHECK_TIMINGS:
94 self.assertAlmostEqual(a, b, 1)
95
96 def assertReturnsIfImplemented(self, value, func, *args):
97 try:
98 res = func(*args)
99 except NotImplementedError:
100 pass
101 else:
102 return self.assertEqual(value, res)
103
104#
105# Return the value of a semaphore
106#
107
108def get_value(self):
109 try:
110 return self.get_value()
111 except AttributeError:
112 try:
113 return self._Semaphore__value
114 except AttributeError:
115 try:
116 return self._value
117 except AttributeError:
118 raise NotImplementedError
119
120#
121# Testcases
122#
123
124class _TestProcess(BaseTestCase):
125
126 ALLOWED_TYPES = ('processes', 'threads')
127
128 def test_current(self):
129 if self.TYPE == 'threads':
130 return
131
132 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000133 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000134
135 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000136 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000137 self.assertTrue(isinstance(authkey, bytes))
138 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000139 self.assertEqual(current.ident, os.getpid())
140 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141
142 def _test(self, q, *args, **kwds):
143 current = self.current_process()
144 q.put(args)
145 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000146 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000148 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 q.put(current.pid)
150
151 def test_process(self):
152 q = self.Queue(1)
153 e = self.Event()
154 args = (q, 1, 2)
155 kwargs = {'hello':23, 'bye':2.54}
156 name = 'SomeProcess'
157 p = self.Process(
158 target=self._test, args=args, kwargs=kwargs, name=name
159 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161 current = self.current_process()
162
163 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000166 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000167 self.assertTrue(p not in self.active_children())
168 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000169 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170
171 p.start()
172
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000173 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 self.assertEquals(p.is_alive(), True)
175 self.assertTrue(p in self.active_children())
176
177 self.assertEquals(q.get(), args[1:])
178 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000179 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000181 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182 self.assertEquals(q.get(), p.pid)
183
184 p.join()
185
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 self.assertEquals(p.is_alive(), False)
188 self.assertTrue(p not in self.active_children())
189
190 def _test_terminate(self):
191 time.sleep(1000)
192
193 def test_terminate(self):
194 if self.TYPE == 'threads':
195 return
196
197 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 p.start()
200
201 self.assertEqual(p.is_alive(), True)
202 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204
205 p.terminate()
206
207 join = TimingWrapper(p.join)
208 self.assertEqual(join(), None)
209 self.assertTimingAlmostEqual(join.elapsed, 0.0)
210
211 self.assertEqual(p.is_alive(), False)
212 self.assertTrue(p not in self.active_children())
213
214 p.join()
215
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000216 # XXX sometimes get p.exitcode == 0 on Windows ...
217 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218
219 def test_cpu_count(self):
220 try:
221 cpus = multiprocessing.cpu_count()
222 except NotImplementedError:
223 cpus = 1
224 self.assertTrue(type(cpus) is int)
225 self.assertTrue(cpus >= 1)
226
227 def test_active_children(self):
228 self.assertEqual(type(self.active_children()), list)
229
230 p = self.Process(target=time.sleep, args=(DELTA,))
231 self.assertTrue(p not in self.active_children())
232
233 p.start()
234 self.assertTrue(p in self.active_children())
235
236 p.join()
237 self.assertTrue(p not in self.active_children())
238
239 def _test_recursion(self, wconn, id):
240 from multiprocessing import forking
241 wconn.send(id)
242 if len(id) < 2:
243 for i in range(2):
244 p = self.Process(
245 target=self._test_recursion, args=(wconn, id+[i])
246 )
247 p.start()
248 p.join()
249
250 def test_recursion(self):
251 rconn, wconn = self.Pipe(duplex=False)
252 self._test_recursion(wconn, [])
253
254 time.sleep(DELTA)
255 result = []
256 while rconn.poll():
257 result.append(rconn.recv())
258
259 expected = [
260 [],
261 [0],
262 [0, 0],
263 [0, 1],
264 [1],
265 [1, 0],
266 [1, 1]
267 ]
268 self.assertEqual(result, expected)
269
270#
271#
272#
273
274class _UpperCaser(multiprocessing.Process):
275
276 def __init__(self):
277 multiprocessing.Process.__init__(self)
278 self.child_conn, self.parent_conn = multiprocessing.Pipe()
279
280 def run(self):
281 self.parent_conn.close()
282 for s in iter(self.child_conn.recv, None):
283 self.child_conn.send(s.upper())
284 self.child_conn.close()
285
286 def submit(self, s):
287 assert type(s) is str
288 self.parent_conn.send(s)
289 return self.parent_conn.recv()
290
291 def stop(self):
292 self.parent_conn.send(None)
293 self.parent_conn.close()
294 self.child_conn.close()
295
296class _TestSubclassingProcess(BaseTestCase):
297
298 ALLOWED_TYPES = ('processes',)
299
300 def test_subclassing(self):
301 uppercaser = _UpperCaser()
302 uppercaser.start()
303 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
304 self.assertEqual(uppercaser.submit('world'), 'WORLD')
305 uppercaser.stop()
306 uppercaser.join()
307
308#
309#
310#
311
312def queue_empty(q):
313 if hasattr(q, 'empty'):
314 return q.empty()
315 else:
316 return q.qsize() == 0
317
318def queue_full(q, maxsize):
319 if hasattr(q, 'full'):
320 return q.full()
321 else:
322 return q.qsize() == maxsize
323
324
325class _TestQueue(BaseTestCase):
326
327
328 def _test_put(self, queue, child_can_start, parent_can_continue):
329 child_can_start.wait()
330 for i in range(6):
331 queue.get()
332 parent_can_continue.set()
333
334 def test_put(self):
335 MAXSIZE = 6
336 queue = self.Queue(maxsize=MAXSIZE)
337 child_can_start = self.Event()
338 parent_can_continue = self.Event()
339
340 proc = self.Process(
341 target=self._test_put,
342 args=(queue, child_can_start, parent_can_continue)
343 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000344 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345 proc.start()
346
347 self.assertEqual(queue_empty(queue), True)
348 self.assertEqual(queue_full(queue, MAXSIZE), False)
349
350 queue.put(1)
351 queue.put(2, True)
352 queue.put(3, True, None)
353 queue.put(4, False)
354 queue.put(5, False, None)
355 queue.put_nowait(6)
356
357 # the values may be in buffer but not yet in pipe so sleep a bit
358 time.sleep(DELTA)
359
360 self.assertEqual(queue_empty(queue), False)
361 self.assertEqual(queue_full(queue, MAXSIZE), True)
362
363 put = TimingWrapper(queue.put)
364 put_nowait = TimingWrapper(queue.put_nowait)
365
366 self.assertRaises(pyqueue.Full, put, 7, False)
367 self.assertTimingAlmostEqual(put.elapsed, 0)
368
369 self.assertRaises(pyqueue.Full, put, 7, False, None)
370 self.assertTimingAlmostEqual(put.elapsed, 0)
371
372 self.assertRaises(pyqueue.Full, put_nowait, 7)
373 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
374
375 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
376 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
377
378 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
379 self.assertTimingAlmostEqual(put.elapsed, 0)
380
381 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
382 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
383
384 child_can_start.set()
385 parent_can_continue.wait()
386
387 self.assertEqual(queue_empty(queue), True)
388 self.assertEqual(queue_full(queue, MAXSIZE), False)
389
390 proc.join()
391
392 def _test_get(self, queue, child_can_start, parent_can_continue):
393 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000394 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395 queue.put(2)
396 queue.put(3)
397 queue.put(4)
398 queue.put(5)
399 parent_can_continue.set()
400
401 def test_get(self):
402 queue = self.Queue()
403 child_can_start = self.Event()
404 parent_can_continue = self.Event()
405
406 proc = self.Process(
407 target=self._test_get,
408 args=(queue, child_can_start, parent_can_continue)
409 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000410 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000411 proc.start()
412
413 self.assertEqual(queue_empty(queue), True)
414
415 child_can_start.set()
416 parent_can_continue.wait()
417
418 time.sleep(DELTA)
419 self.assertEqual(queue_empty(queue), False)
420
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000421 # Hangs unexpectedly, remove for now
422 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 self.assertEqual(queue.get(True, None), 2)
424 self.assertEqual(queue.get(True), 3)
425 self.assertEqual(queue.get(timeout=1), 4)
426 self.assertEqual(queue.get_nowait(), 5)
427
428 self.assertEqual(queue_empty(queue), True)
429
430 get = TimingWrapper(queue.get)
431 get_nowait = TimingWrapper(queue.get_nowait)
432
433 self.assertRaises(pyqueue.Empty, get, False)
434 self.assertTimingAlmostEqual(get.elapsed, 0)
435
436 self.assertRaises(pyqueue.Empty, get, False, None)
437 self.assertTimingAlmostEqual(get.elapsed, 0)
438
439 self.assertRaises(pyqueue.Empty, get_nowait)
440 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
441
442 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
443 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
444
445 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
446 self.assertTimingAlmostEqual(get.elapsed, 0)
447
448 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
449 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
450
451 proc.join()
452
453 def _test_fork(self, queue):
454 for i in range(10, 20):
455 queue.put(i)
456 # note that at this point the items may only be buffered, so the
457 # process cannot shutdown until the feeder thread has finished
458 # pushing items onto the pipe.
459
460 def test_fork(self):
461 # Old versions of Queue would fail to create a new feeder
462 # thread for a forked process if the original process had its
463 # own feeder thread. This test checks that this no longer
464 # happens.
465
466 queue = self.Queue()
467
468 # put items on queue so that main process starts a feeder thread
469 for i in range(10):
470 queue.put(i)
471
472 # wait to make sure thread starts before we fork a new process
473 time.sleep(DELTA)
474
475 # fork process
476 p = self.Process(target=self._test_fork, args=(queue,))
477 p.start()
478
479 # check that all expected items are in the queue
480 for i in range(20):
481 self.assertEqual(queue.get(), i)
482 self.assertRaises(pyqueue.Empty, queue.get, False)
483
484 p.join()
485
486 def test_qsize(self):
487 q = self.Queue()
488 try:
489 self.assertEqual(q.qsize(), 0)
490 except NotImplementedError:
491 return
492 q.put(1)
493 self.assertEqual(q.qsize(), 1)
494 q.put(5)
495 self.assertEqual(q.qsize(), 2)
496 q.get()
497 self.assertEqual(q.qsize(), 1)
498 q.get()
499 self.assertEqual(q.qsize(), 0)
500
501 def _test_task_done(self, q):
502 for obj in iter(q.get, None):
503 time.sleep(DELTA)
504 q.task_done()
505
506 def test_task_done(self):
507 queue = self.JoinableQueue()
508
509 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
510 return
511
512 workers = [self.Process(target=self._test_task_done, args=(queue,))
513 for i in range(4)]
514
515 for p in workers:
516 p.start()
517
518 for i in range(10):
519 queue.put(i)
520
521 queue.join()
522
523 for p in workers:
524 queue.put(None)
525
526 for p in workers:
527 p.join()
528
529#
530#
531#
532
533class _TestLock(BaseTestCase):
534
535 def test_lock(self):
536 lock = self.Lock()
537 self.assertEqual(lock.acquire(), True)
538 self.assertEqual(lock.acquire(False), False)
539 self.assertEqual(lock.release(), None)
540 self.assertRaises((ValueError, threading.ThreadError), lock.release)
541
542 def test_rlock(self):
543 lock = self.RLock()
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.acquire(), True)
547 self.assertEqual(lock.release(), None)
548 self.assertEqual(lock.release(), None)
549 self.assertEqual(lock.release(), None)
550 self.assertRaises((AssertionError, RuntimeError), lock.release)
551
552
553class _TestSemaphore(BaseTestCase):
554
555 def _test_semaphore(self, sem):
556 self.assertReturnsIfImplemented(2, get_value, sem)
557 self.assertEqual(sem.acquire(), True)
558 self.assertReturnsIfImplemented(1, get_value, sem)
559 self.assertEqual(sem.acquire(), True)
560 self.assertReturnsIfImplemented(0, get_value, sem)
561 self.assertEqual(sem.acquire(False), False)
562 self.assertReturnsIfImplemented(0, get_value, sem)
563 self.assertEqual(sem.release(), None)
564 self.assertReturnsIfImplemented(1, get_value, sem)
565 self.assertEqual(sem.release(), None)
566 self.assertReturnsIfImplemented(2, get_value, sem)
567
568 def test_semaphore(self):
569 sem = self.Semaphore(2)
570 self._test_semaphore(sem)
571 self.assertEqual(sem.release(), None)
572 self.assertReturnsIfImplemented(3, get_value, sem)
573 self.assertEqual(sem.release(), None)
574 self.assertReturnsIfImplemented(4, get_value, sem)
575
576 def test_bounded_semaphore(self):
577 sem = self.BoundedSemaphore(2)
578 self._test_semaphore(sem)
579 # Currently fails on OS/X
580 #if HAVE_GETVALUE:
581 # self.assertRaises(ValueError, sem.release)
582 # self.assertReturnsIfImplemented(2, get_value, sem)
583
584 def test_timeout(self):
585 if self.TYPE != 'processes':
586 return
587
588 sem = self.Semaphore(0)
589 acquire = TimingWrapper(sem.acquire)
590
591 self.assertEqual(acquire(False), False)
592 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
593
594 self.assertEqual(acquire(False, None), False)
595 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
596
597 self.assertEqual(acquire(False, TIMEOUT1), False)
598 self.assertTimingAlmostEqual(acquire.elapsed, 0)
599
600 self.assertEqual(acquire(True, TIMEOUT2), False)
601 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
602
603 self.assertEqual(acquire(timeout=TIMEOUT3), False)
604 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
605
606
607class _TestCondition(BaseTestCase):
608
609 def f(self, cond, sleeping, woken, timeout=None):
610 cond.acquire()
611 sleeping.release()
612 cond.wait(timeout)
613 woken.release()
614 cond.release()
615
616 def check_invariant(self, cond):
617 # this is only supposed to succeed when there are no sleepers
618 if self.TYPE == 'processes':
619 try:
620 sleepers = (cond._sleeping_count.get_value() -
621 cond._woken_count.get_value())
622 self.assertEqual(sleepers, 0)
623 self.assertEqual(cond._wait_semaphore.get_value(), 0)
624 except NotImplementedError:
625 pass
626
627 def test_notify(self):
628 cond = self.Condition()
629 sleeping = self.Semaphore(0)
630 woken = self.Semaphore(0)
631
632 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000633 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000634 p.start()
635
636 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000637 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000638 p.start()
639
640 # wait for both children to start sleeping
641 sleeping.acquire()
642 sleeping.acquire()
643
644 # check no process/thread has woken up
645 time.sleep(DELTA)
646 self.assertReturnsIfImplemented(0, get_value, woken)
647
648 # wake up one process/thread
649 cond.acquire()
650 cond.notify()
651 cond.release()
652
653 # check one process/thread has woken up
654 time.sleep(DELTA)
655 self.assertReturnsIfImplemented(1, get_value, woken)
656
657 # wake up another
658 cond.acquire()
659 cond.notify()
660 cond.release()
661
662 # check other has woken up
663 time.sleep(DELTA)
664 self.assertReturnsIfImplemented(2, get_value, woken)
665
666 # check state is not mucked up
667 self.check_invariant(cond)
668 p.join()
669
670 def test_notify_all(self):
671 cond = self.Condition()
672 sleeping = self.Semaphore(0)
673 woken = self.Semaphore(0)
674
675 # start some threads/processes which will timeout
676 for i in range(3):
677 p = self.Process(target=self.f,
678 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000679 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000680 p.start()
681
682 t = threading.Thread(target=self.f,
683 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000684 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000685 t.start()
686
687 # wait for them all to sleep
688 for i in range(6):
689 sleeping.acquire()
690
691 # check they have all timed out
692 for i in range(6):
693 woken.acquire()
694 self.assertReturnsIfImplemented(0, get_value, woken)
695
696 # check state is not mucked up
697 self.check_invariant(cond)
698
699 # start some more threads/processes
700 for i in range(3):
701 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000702 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000703 p.start()
704
705 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000706 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000707 t.start()
708
709 # wait for them to all sleep
710 for i in range(6):
711 sleeping.acquire()
712
713 # check no process/thread has woken up
714 time.sleep(DELTA)
715 self.assertReturnsIfImplemented(0, get_value, woken)
716
717 # wake them all up
718 cond.acquire()
719 cond.notify_all()
720 cond.release()
721
722 # check they have all woken
723 time.sleep(DELTA)
724 self.assertReturnsIfImplemented(6, get_value, woken)
725
726 # check state is not mucked up
727 self.check_invariant(cond)
728
729 def test_timeout(self):
730 cond = self.Condition()
731 wait = TimingWrapper(cond.wait)
732 cond.acquire()
733 res = wait(TIMEOUT1)
734 cond.release()
735 self.assertEqual(res, None)
736 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
737
738
739class _TestEvent(BaseTestCase):
740
741 def _test_event(self, event):
742 time.sleep(TIMEOUT2)
743 event.set()
744
745 def test_event(self):
746 event = self.Event()
747 wait = TimingWrapper(event.wait)
748
749 # Removed temporaily, due to API shear, this does not
750 # work with threading._Event objects. is_set == isSet
751 #self.assertEqual(event.is_set(), False)
752
753 self.assertEqual(wait(0.0), None)
754 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
755 self.assertEqual(wait(TIMEOUT1), None)
756 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
757
758 event.set()
759
760 # See note above on the API differences
761 # self.assertEqual(event.is_set(), True)
762 self.assertEqual(wait(), None)
763 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
764 self.assertEqual(wait(TIMEOUT1), None)
765 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
766 # self.assertEqual(event.is_set(), True)
767
768 event.clear()
769
770 #self.assertEqual(event.is_set(), False)
771
772 self.Process(target=self._test_event, args=(event,)).start()
773 self.assertEqual(wait(), None)
774
775#
776#
777#
778
779class _TestValue(BaseTestCase):
780
781 codes_values = [
782 ('i', 4343, 24234),
783 ('d', 3.625, -4.25),
784 ('h', -232, 234),
785 ('c', latin('x'), latin('y'))
786 ]
787
788 def _test(self, values):
789 for sv, cv in zip(values, self.codes_values):
790 sv.value = cv[2]
791
792
793 def test_value(self, raw=False):
794 if self.TYPE != 'processes':
795 return
796
797 if raw:
798 values = [self.RawValue(code, value)
799 for code, value, _ in self.codes_values]
800 else:
801 values = [self.Value(code, value)
802 for code, value, _ in self.codes_values]
803
804 for sv, cv in zip(values, self.codes_values):
805 self.assertEqual(sv.value, cv[1])
806
807 proc = self.Process(target=self._test, args=(values,))
808 proc.start()
809 proc.join()
810
811 for sv, cv in zip(values, self.codes_values):
812 self.assertEqual(sv.value, cv[2])
813
814 def test_rawvalue(self):
815 self.test_value(raw=True)
816
817 def test_getobj_getlock(self):
818 if self.TYPE != 'processes':
819 return
820
821 val1 = self.Value('i', 5)
822 lock1 = val1.get_lock()
823 obj1 = val1.get_obj()
824
825 val2 = self.Value('i', 5, lock=None)
826 lock2 = val2.get_lock()
827 obj2 = val2.get_obj()
828
829 lock = self.Lock()
830 val3 = self.Value('i', 5, lock=lock)
831 lock3 = val3.get_lock()
832 obj3 = val3.get_obj()
833 self.assertEqual(lock, lock3)
834
Jesse Nollerb0516a62009-01-18 03:11:38 +0000835 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 self.assertFalse(hasattr(arr4, 'get_lock'))
837 self.assertFalse(hasattr(arr4, 'get_obj'))
838
Jesse Nollerb0516a62009-01-18 03:11:38 +0000839 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
840
841 arr5 = self.RawValue('i', 5)
842 self.assertFalse(hasattr(arr5, 'get_lock'))
843 self.assertFalse(hasattr(arr5, 'get_obj'))
844
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845
846class _TestArray(BaseTestCase):
847
848 def f(self, seq):
849 for i in range(1, len(seq)):
850 seq[i] += seq[i-1]
851
852 def test_array(self, raw=False):
853 if self.TYPE != 'processes':
854 return
855
856 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
857 if raw:
858 arr = self.RawArray('i', seq)
859 else:
860 arr = self.Array('i', seq)
861
862 self.assertEqual(len(arr), len(seq))
863 self.assertEqual(arr[3], seq[3])
864 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
865
866 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
867
868 self.assertEqual(list(arr[:]), seq)
869
870 self.f(seq)
871
872 p = self.Process(target=self.f, args=(arr,))
873 p.start()
874 p.join()
875
876 self.assertEqual(list(arr[:]), seq)
877
878 def test_rawarray(self):
879 self.test_array(raw=True)
880
881 def test_getobj_getlock_obj(self):
882 if self.TYPE != 'processes':
883 return
884
885 arr1 = self.Array('i', list(range(10)))
886 lock1 = arr1.get_lock()
887 obj1 = arr1.get_obj()
888
889 arr2 = self.Array('i', list(range(10)), lock=None)
890 lock2 = arr2.get_lock()
891 obj2 = arr2.get_obj()
892
893 lock = self.Lock()
894 arr3 = self.Array('i', list(range(10)), lock=lock)
895 lock3 = arr3.get_lock()
896 obj3 = arr3.get_obj()
897 self.assertEqual(lock, lock3)
898
Jesse Nollerb0516a62009-01-18 03:11:38 +0000899 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 self.assertFalse(hasattr(arr4, 'get_lock'))
901 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000902 self.assertRaises(AttributeError,
903 self.Array, 'i', range(10), lock='notalock')
904
905 arr5 = self.RawArray('i', range(10))
906 self.assertFalse(hasattr(arr5, 'get_lock'))
907 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908
909#
910#
911#
912
913class _TestContainers(BaseTestCase):
914
915 ALLOWED_TYPES = ('manager',)
916
917 def test_list(self):
918 a = self.list(list(range(10)))
919 self.assertEqual(a[:], list(range(10)))
920
921 b = self.list()
922 self.assertEqual(b[:], [])
923
924 b.extend(list(range(5)))
925 self.assertEqual(b[:], list(range(5)))
926
927 self.assertEqual(b[2], 2)
928 self.assertEqual(b[2:10], [2,3,4])
929
930 b *= 2
931 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
932
933 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
934
935 self.assertEqual(a[:], list(range(10)))
936
937 d = [a, b]
938 e = self.list(d)
939 self.assertEqual(
940 e[:],
941 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
942 )
943
944 f = self.list([a])
945 a.append('hello')
946 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
947
948 def test_dict(self):
949 d = self.dict()
950 indices = list(range(65, 70))
951 for i in indices:
952 d[i] = chr(i)
953 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
954 self.assertEqual(sorted(d.keys()), indices)
955 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
956 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
957
958 def test_namespace(self):
959 n = self.Namespace()
960 n.name = 'Bob'
961 n.job = 'Builder'
962 n._hidden = 'hidden'
963 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
964 del n.job
965 self.assertEqual(str(n), "Namespace(name='Bob')")
966 self.assertTrue(hasattr(n, 'name'))
967 self.assertTrue(not hasattr(n, 'job'))
968
969#
970#
971#
972
973def sqr(x, wait=0.0):
974 time.sleep(wait)
975 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976class _TestPool(BaseTestCase):
977
978 def test_apply(self):
979 papply = self.pool.apply
980 self.assertEqual(papply(sqr, (5,)), sqr(5))
981 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
982
983 def test_map(self):
984 pmap = self.pool.map
985 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
986 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
987 list(map(sqr, list(range(100)))))
988
989 def test_async(self):
990 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
991 get = TimingWrapper(res.get)
992 self.assertEqual(get(), 49)
993 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
994
995 def test_async_timeout(self):
996 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
997 get = TimingWrapper(res.get)
998 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
999 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1000
1001 def test_imap(self):
1002 it = self.pool.imap(sqr, list(range(10)))
1003 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1004
1005 it = self.pool.imap(sqr, list(range(10)))
1006 for i in range(10):
1007 self.assertEqual(next(it), i*i)
1008 self.assertRaises(StopIteration, it.__next__)
1009
1010 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1011 for i in range(1000):
1012 self.assertEqual(next(it), i*i)
1013 self.assertRaises(StopIteration, it.__next__)
1014
1015 def test_imap_unordered(self):
1016 it = self.pool.imap_unordered(sqr, list(range(1000)))
1017 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1018
1019 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1020 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1021
1022 def test_make_pool(self):
1023 p = multiprocessing.Pool(3)
1024 self.assertEqual(3, len(p._pool))
1025 p.close()
1026 p.join()
1027
1028 def test_terminate(self):
1029 if self.TYPE == 'manager':
1030 # On Unix a forked process increfs each shared object to
1031 # which its parent process held a reference. If the
1032 # forked process gets terminated then there is likely to
1033 # be a reference leak. So to prevent
1034 # _TestZZZNumberOfObjects from failing we skip this test
1035 # when using a manager.
1036 return
1037
1038 result = self.pool.map_async(
1039 time.sleep, [0.1 for i in range(10000)], chunksize=1
1040 )
1041 self.pool.terminate()
1042 join = TimingWrapper(self.pool.join)
1043 join()
1044 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001045#
1046# Test that manager has expected number of shared objects left
1047#
1048
1049class _TestZZZNumberOfObjects(BaseTestCase):
1050 # Because test cases are sorted alphabetically, this one will get
1051 # run after all the other tests for the manager. It tests that
1052 # there have been no "reference leaks" for the manager's shared
1053 # objects. Note the comment in _TestPool.test_terminate().
1054 ALLOWED_TYPES = ('manager',)
1055
1056 def test_number_of_objects(self):
1057 EXPECTED_NUMBER = 1 # the pool object is still alive
1058 multiprocessing.active_children() # discard dead process objs
1059 gc.collect() # do garbage collection
1060 refs = self.manager._number_of_objects()
1061 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001062 print(self.manager._debug_info())
Benjamin Petersone711caf2008-06-11 16:44:04 +00001063
1064 self.assertEqual(refs, EXPECTED_NUMBER)
1065
1066#
1067# Test of creating a customized manager class
1068#
1069
1070from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1071
1072class FooBar(object):
1073 def f(self):
1074 return 'f()'
1075 def g(self):
1076 raise ValueError
1077 def _h(self):
1078 return '_h()'
1079
1080def baz():
1081 for i in range(10):
1082 yield i*i
1083
1084class IteratorProxy(BaseProxy):
1085 _exposed_ = ('next', '__next__')
1086 def __iter__(self):
1087 return self
1088 def __next__(self):
1089 return self._callmethod('next')
1090 def __next__(self):
1091 return self._callmethod('__next__')
1092
1093class MyManager(BaseManager):
1094 pass
1095
1096MyManager.register('Foo', callable=FooBar)
1097MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1098MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1099
1100
1101class _TestMyManager(BaseTestCase):
1102
1103 ALLOWED_TYPES = ('manager',)
1104
1105 def test_mymanager(self):
1106 manager = MyManager()
1107 manager.start()
1108
1109 foo = manager.Foo()
1110 bar = manager.Bar()
1111 baz = manager.baz()
1112
1113 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1114 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1115
1116 self.assertEqual(foo_methods, ['f', 'g'])
1117 self.assertEqual(bar_methods, ['f', '_h'])
1118
1119 self.assertEqual(foo.f(), 'f()')
1120 self.assertRaises(ValueError, foo.g)
1121 self.assertEqual(foo._callmethod('f'), 'f()')
1122 self.assertRaises(RemoteError, foo._callmethod, '_h')
1123
1124 self.assertEqual(bar.f(), 'f()')
1125 self.assertEqual(bar._h(), '_h()')
1126 self.assertEqual(bar._callmethod('f'), 'f()')
1127 self.assertEqual(bar._callmethod('_h'), '_h()')
1128
1129 self.assertEqual(list(baz), [i*i for i in range(10)])
1130
1131 manager.shutdown()
1132
1133#
1134# Test of connecting to a remote server and using xmlrpclib for serialization
1135#
1136
1137_queue = pyqueue.Queue()
1138def get_queue():
1139 return _queue
1140
1141class QueueManager(BaseManager):
1142 '''manager class used by server process'''
1143QueueManager.register('get_queue', callable=get_queue)
1144
1145class QueueManager2(BaseManager):
1146 '''manager class which specifies the same interface as QueueManager'''
1147QueueManager2.register('get_queue')
1148
1149
1150SERIALIZER = 'xmlrpclib'
1151
1152class _TestRemoteManager(BaseTestCase):
1153
1154 ALLOWED_TYPES = ('manager',)
1155
1156 def _putter(self, address, authkey):
1157 manager = QueueManager2(
1158 address=address, authkey=authkey, serializer=SERIALIZER
1159 )
1160 manager.connect()
1161 queue = manager.get_queue()
1162 queue.put(('hello world', None, True, 2.25))
1163
1164 def test_remote(self):
1165 authkey = os.urandom(32)
1166
1167 manager = QueueManager(
1168 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1169 )
1170 manager.start()
1171
1172 p = self.Process(target=self._putter, args=(manager.address, authkey))
1173 p.start()
1174
1175 manager2 = QueueManager2(
1176 address=manager.address, authkey=authkey, serializer=SERIALIZER
1177 )
1178 manager2.connect()
1179 queue = manager2.get_queue()
1180
1181 # Note that xmlrpclib will deserialize object as a list not a tuple
1182 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1183
1184 # Because we are using xmlrpclib for serialization instead of
1185 # pickle this will cause a serialization error.
1186 self.assertRaises(Exception, queue.put, time.sleep)
1187
1188 # Make queue finalizer run before the server is stopped
1189 del queue
1190 manager.shutdown()
1191
1192#
1193#
1194#
1195
1196SENTINEL = latin('')
1197
1198class _TestConnection(BaseTestCase):
1199
1200 ALLOWED_TYPES = ('processes', 'threads')
1201
1202 def _echo(self, conn):
1203 for msg in iter(conn.recv_bytes, SENTINEL):
1204 conn.send_bytes(msg)
1205 conn.close()
1206
1207 def test_connection(self):
1208 conn, child_conn = self.Pipe()
1209
1210 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001211 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001212 p.start()
1213
1214 seq = [1, 2.25, None]
1215 msg = latin('hello world')
1216 longmsg = msg * 10
1217 arr = array.array('i', list(range(4)))
1218
1219 if self.TYPE == 'processes':
1220 self.assertEqual(type(conn.fileno()), int)
1221
1222 self.assertEqual(conn.send(seq), None)
1223 self.assertEqual(conn.recv(), seq)
1224
1225 self.assertEqual(conn.send_bytes(msg), None)
1226 self.assertEqual(conn.recv_bytes(), msg)
1227
1228 if self.TYPE == 'processes':
1229 buffer = array.array('i', [0]*10)
1230 expected = list(arr) + [0] * (10 - len(arr))
1231 self.assertEqual(conn.send_bytes(arr), None)
1232 self.assertEqual(conn.recv_bytes_into(buffer),
1233 len(arr) * buffer.itemsize)
1234 self.assertEqual(list(buffer), expected)
1235
1236 buffer = array.array('i', [0]*10)
1237 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1238 self.assertEqual(conn.send_bytes(arr), None)
1239 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1240 len(arr) * buffer.itemsize)
1241 self.assertEqual(list(buffer), expected)
1242
1243 buffer = bytearray(latin(' ' * 40))
1244 self.assertEqual(conn.send_bytes(longmsg), None)
1245 try:
1246 res = conn.recv_bytes_into(buffer)
1247 except multiprocessing.BufferTooShort as e:
1248 self.assertEqual(e.args, (longmsg,))
1249 else:
1250 self.fail('expected BufferTooShort, got %s' % res)
1251
1252 poll = TimingWrapper(conn.poll)
1253
1254 self.assertEqual(poll(), False)
1255 self.assertTimingAlmostEqual(poll.elapsed, 0)
1256
1257 self.assertEqual(poll(TIMEOUT1), False)
1258 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1259
1260 conn.send(None)
1261
1262 self.assertEqual(poll(TIMEOUT1), True)
1263 self.assertTimingAlmostEqual(poll.elapsed, 0)
1264
1265 self.assertEqual(conn.recv(), None)
1266
1267 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1268 conn.send_bytes(really_big_msg)
1269 self.assertEqual(conn.recv_bytes(), really_big_msg)
1270
1271 conn.send_bytes(SENTINEL) # tell child to quit
1272 child_conn.close()
1273
1274 if self.TYPE == 'processes':
1275 self.assertEqual(conn.readable, True)
1276 self.assertEqual(conn.writable, True)
1277 self.assertRaises(EOFError, conn.recv)
1278 self.assertRaises(EOFError, conn.recv_bytes)
1279
1280 p.join()
1281
1282 def test_duplex_false(self):
1283 reader, writer = self.Pipe(duplex=False)
1284 self.assertEqual(writer.send(1), None)
1285 self.assertEqual(reader.recv(), 1)
1286 if self.TYPE == 'processes':
1287 self.assertEqual(reader.readable, True)
1288 self.assertEqual(reader.writable, False)
1289 self.assertEqual(writer.readable, False)
1290 self.assertEqual(writer.writable, True)
1291 self.assertRaises(IOError, reader.send, 2)
1292 self.assertRaises(IOError, writer.recv)
1293 self.assertRaises(IOError, writer.poll)
1294
1295 def test_spawn_close(self):
1296 # We test that a pipe connection can be closed by parent
1297 # process immediately after child is spawned. On Windows this
1298 # would have sometimes failed on old versions because
1299 # child_conn would be closed before the child got a chance to
1300 # duplicate it.
1301 conn, child_conn = self.Pipe()
1302
1303 p = self.Process(target=self._echo, args=(child_conn,))
1304 p.start()
1305 child_conn.close() # this might complete before child initializes
1306
1307 msg = latin('hello')
1308 conn.send_bytes(msg)
1309 self.assertEqual(conn.recv_bytes(), msg)
1310
1311 conn.send_bytes(SENTINEL)
1312 conn.close()
1313 p.join()
1314
1315 def test_sendbytes(self):
1316 if self.TYPE != 'processes':
1317 return
1318
1319 msg = latin('abcdefghijklmnopqrstuvwxyz')
1320 a, b = self.Pipe()
1321
1322 a.send_bytes(msg)
1323 self.assertEqual(b.recv_bytes(), msg)
1324
1325 a.send_bytes(msg, 5)
1326 self.assertEqual(b.recv_bytes(), msg[5:])
1327
1328 a.send_bytes(msg, 7, 8)
1329 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1330
1331 a.send_bytes(msg, 26)
1332 self.assertEqual(b.recv_bytes(), latin(''))
1333
1334 a.send_bytes(msg, 26, 0)
1335 self.assertEqual(b.recv_bytes(), latin(''))
1336
1337 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1338
1339 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1340
1341 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1342
1343 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1344
1345 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1346
Benjamin Petersone711caf2008-06-11 16:44:04 +00001347class _TestListenerClient(BaseTestCase):
1348
1349 ALLOWED_TYPES = ('processes', 'threads')
1350
1351 def _test(self, address):
1352 conn = self.connection.Client(address)
1353 conn.send('hello')
1354 conn.close()
1355
1356 def test_listener_client(self):
1357 for family in self.connection.families:
1358 l = self.connection.Listener(family=family)
1359 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001360 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001361 p.start()
1362 conn = l.accept()
1363 self.assertEqual(conn.recv(), 'hello')
1364 p.join()
1365 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001366#
1367# Test of sending connection and socket objects between processes
1368#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001369"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001370class _TestPicklingConnections(BaseTestCase):
1371
1372 ALLOWED_TYPES = ('processes',)
1373
1374 def _listener(self, conn, families):
1375 for fam in families:
1376 l = self.connection.Listener(family=fam)
1377 conn.send(l.address)
1378 new_conn = l.accept()
1379 conn.send(new_conn)
1380
1381 if self.TYPE == 'processes':
1382 l = socket.socket()
1383 l.bind(('localhost', 0))
1384 conn.send(l.getsockname())
1385 l.listen(1)
1386 new_conn, addr = l.accept()
1387 conn.send(new_conn)
1388
1389 conn.recv()
1390
1391 def _remote(self, conn):
1392 for (address, msg) in iter(conn.recv, None):
1393 client = self.connection.Client(address)
1394 client.send(msg.upper())
1395 client.close()
1396
1397 if self.TYPE == 'processes':
1398 address, msg = conn.recv()
1399 client = socket.socket()
1400 client.connect(address)
1401 client.sendall(msg.upper())
1402 client.close()
1403
1404 conn.close()
1405
1406 def test_pickling(self):
1407 try:
1408 multiprocessing.allow_connection_pickling()
1409 except ImportError:
1410 return
1411
1412 families = self.connection.families
1413
1414 lconn, lconn0 = self.Pipe()
1415 lp = self.Process(target=self._listener, args=(lconn0, families))
1416 lp.start()
1417 lconn0.close()
1418
1419 rconn, rconn0 = self.Pipe()
1420 rp = self.Process(target=self._remote, args=(rconn0,))
1421 rp.start()
1422 rconn0.close()
1423
1424 for fam in families:
1425 msg = ('This connection uses family %s' % fam).encode('ascii')
1426 address = lconn.recv()
1427 rconn.send((address, msg))
1428 new_conn = lconn.recv()
1429 self.assertEqual(new_conn.recv(), msg.upper())
1430
1431 rconn.send(None)
1432
1433 if self.TYPE == 'processes':
1434 msg = latin('This connection uses a normal socket')
1435 address = lconn.recv()
1436 rconn.send((address, msg))
1437 if hasattr(socket, 'fromfd'):
1438 new_conn = lconn.recv()
1439 self.assertEqual(new_conn.recv(100), msg.upper())
1440 else:
1441 # XXX On Windows with Py2.6 need to backport fromfd()
1442 discard = lconn.recv_bytes()
1443
1444 lconn.send(None)
1445
1446 rconn.close()
1447 lconn.close()
1448
1449 lp.join()
1450 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001451"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001452#
1453#
1454#
1455
1456class _TestHeap(BaseTestCase):
1457
1458 ALLOWED_TYPES = ('processes',)
1459
1460 def test_heap(self):
1461 iterations = 5000
1462 maxblocks = 50
1463 blocks = []
1464
1465 # create and destroy lots of blocks of different sizes
1466 for i in range(iterations):
1467 size = int(random.lognormvariate(0, 1) * 1000)
1468 b = multiprocessing.heap.BufferWrapper(size)
1469 blocks.append(b)
1470 if len(blocks) > maxblocks:
1471 i = random.randrange(maxblocks)
1472 del blocks[i]
1473
1474 # get the heap object
1475 heap = multiprocessing.heap.BufferWrapper._heap
1476
1477 # verify the state of the heap
1478 all = []
1479 occupied = 0
1480 for L in list(heap._len_to_seq.values()):
1481 for arena, start, stop in L:
1482 all.append((heap._arenas.index(arena), start, stop,
1483 stop-start, 'free'))
1484 for arena, start, stop in heap._allocated_blocks:
1485 all.append((heap._arenas.index(arena), start, stop,
1486 stop-start, 'occupied'))
1487 occupied += (stop-start)
1488
1489 all.sort()
1490
1491 for i in range(len(all)-1):
1492 (arena, start, stop) = all[i][:3]
1493 (narena, nstart, nstop) = all[i+1][:3]
1494 self.assertTrue((arena != narena and nstart == 0) or
1495 (stop == nstart))
1496
1497#
1498#
1499#
1500
1501try:
1502 from ctypes import Structure, Value, copy, c_int, c_double
1503except ImportError:
1504 Structure = object
1505 c_int = c_double = None
1506
1507class _Foo(Structure):
1508 _fields_ = [
1509 ('x', c_int),
1510 ('y', c_double)
1511 ]
1512
1513class _TestSharedCTypes(BaseTestCase):
1514
1515 ALLOWED_TYPES = ('processes',)
1516
1517 def _double(self, x, y, foo, arr, string):
1518 x.value *= 2
1519 y.value *= 2
1520 foo.x *= 2
1521 foo.y *= 2
1522 string.value *= 2
1523 for i in range(len(arr)):
1524 arr[i] *= 2
1525
1526 def test_sharedctypes(self, lock=False):
1527 if c_int is None:
1528 return
1529
1530 x = Value('i', 7, lock=lock)
1531 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1532 foo = Value(_Foo, 3, 2, lock=lock)
1533 arr = Array('d', list(range(10)), lock=lock)
1534 string = Array('c', 20, lock=lock)
1535 string.value = 'hello'
1536
1537 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1538 p.start()
1539 p.join()
1540
1541 self.assertEqual(x.value, 14)
1542 self.assertAlmostEqual(y.value, 2.0/3.0)
1543 self.assertEqual(foo.x, 6)
1544 self.assertAlmostEqual(foo.y, 4.0)
1545 for i in range(10):
1546 self.assertAlmostEqual(arr[i], i*2)
1547 self.assertEqual(string.value, latin('hellohello'))
1548
1549 def test_synchronize(self):
1550 self.test_sharedctypes(lock=True)
1551
1552 def test_copy(self):
1553 if c_int is None:
1554 return
1555
1556 foo = _Foo(2, 5.0)
1557 bar = copy(foo)
1558 foo.x = 0
1559 foo.y = 0
1560 self.assertEqual(bar.x, 2)
1561 self.assertAlmostEqual(bar.y, 5.0)
1562
1563#
1564#
1565#
1566
1567class _TestFinalize(BaseTestCase):
1568
1569 ALLOWED_TYPES = ('processes',)
1570
1571 def _test_finalize(self, conn):
1572 class Foo(object):
1573 pass
1574
1575 a = Foo()
1576 util.Finalize(a, conn.send, args=('a',))
1577 del a # triggers callback for a
1578
1579 b = Foo()
1580 close_b = util.Finalize(b, conn.send, args=('b',))
1581 close_b() # triggers callback for b
1582 close_b() # does nothing because callback has already been called
1583 del b # does nothing because callback has already been called
1584
1585 c = Foo()
1586 util.Finalize(c, conn.send, args=('c',))
1587
1588 d10 = Foo()
1589 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1590
1591 d01 = Foo()
1592 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1593 d02 = Foo()
1594 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1595 d03 = Foo()
1596 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1597
1598 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1599
1600 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1601
1602 # call mutliprocessing's cleanup function then exit process without
1603 # garbage collecting locals
1604 util._exit_function()
1605 conn.close()
1606 os._exit(0)
1607
1608 def test_finalize(self):
1609 conn, child_conn = self.Pipe()
1610
1611 p = self.Process(target=self._test_finalize, args=(child_conn,))
1612 p.start()
1613 p.join()
1614
1615 result = [obj for obj in iter(conn.recv, 'STOP')]
1616 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1617
1618#
1619# Test that from ... import * works for each module
1620#
1621
1622class _TestImportStar(BaseTestCase):
1623
1624 ALLOWED_TYPES = ('processes',)
1625
1626 def test_import(self):
1627 modules = (
1628 'multiprocessing', 'multiprocessing.connection',
1629 'multiprocessing.heap', 'multiprocessing.managers',
1630 'multiprocessing.pool', 'multiprocessing.process',
1631 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1632 'multiprocessing.synchronize', 'multiprocessing.util'
1633 )
1634
1635 for name in modules:
1636 __import__(name)
1637 mod = sys.modules[name]
1638
1639 for attr in getattr(mod, '__all__', ()):
1640 self.assertTrue(
1641 hasattr(mod, attr),
1642 '%r does not have attribute %r' % (mod, attr)
1643 )
1644
1645#
1646# Quick test that logging works -- does not test logging output
1647#
1648
1649class _TestLogging(BaseTestCase):
1650
1651 ALLOWED_TYPES = ('processes',)
1652
1653 def test_enable_logging(self):
1654 logger = multiprocessing.get_logger()
1655 logger.setLevel(util.SUBWARNING)
1656 self.assertTrue(logger is not None)
1657 logger.debug('this will not be printed')
1658 logger.info('nor will this')
1659 logger.setLevel(LOG_LEVEL)
1660
1661 def _test_level(self, conn):
1662 logger = multiprocessing.get_logger()
1663 conn.send(logger.getEffectiveLevel())
1664
1665 def test_level(self):
1666 LEVEL1 = 32
1667 LEVEL2 = 37
1668
1669 logger = multiprocessing.get_logger()
1670 root_logger = logging.getLogger()
1671 root_level = root_logger.level
1672
1673 reader, writer = multiprocessing.Pipe(duplex=False)
1674
1675 logger.setLevel(LEVEL1)
1676 self.Process(target=self._test_level, args=(writer,)).start()
1677 self.assertEqual(LEVEL1, reader.recv())
1678
1679 logger.setLevel(logging.NOTSET)
1680 root_logger.setLevel(LEVEL2)
1681 self.Process(target=self._test_level, args=(writer,)).start()
1682 self.assertEqual(LEVEL2, reader.recv())
1683
1684 root_logger.setLevel(root_level)
1685 logger.setLevel(level=LOG_LEVEL)
1686
1687#
Jesse Noller6214edd2009-01-19 16:23:53 +00001688# Test to verify handle verification, see issue 3321
1689#
1690
1691class TestInvalidHandle(unittest.TestCase):
1692
1693 def test_invalid_handles(self):
1694 if WIN32:
1695 return
1696 conn = _multiprocessing.Connection(44977608)
1697 self.assertRaises(IOError, conn.poll)
1698 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1699#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001700# Functions used to create test cases from the base ones in this module
1701#
1702
1703def get_attributes(Source, names):
1704 d = {}
1705 for name in names:
1706 obj = getattr(Source, name)
1707 if type(obj) == type(get_attributes):
1708 obj = staticmethod(obj)
1709 d[name] = obj
1710 return d
1711
1712def create_test_cases(Mixin, type):
1713 result = {}
1714 glob = globals()
1715 Type = type[0].upper() + type[1:]
1716
1717 for name in list(glob.keys()):
1718 if name.startswith('_Test'):
1719 base = glob[name]
1720 if type in base.ALLOWED_TYPES:
1721 newname = 'With' + Type + name[1:]
1722 class Temp(base, unittest.TestCase, Mixin):
1723 pass
1724 result[newname] = Temp
1725 Temp.__name__ = newname
1726 Temp.__module__ = Mixin.__module__
1727 return result
1728
1729#
1730# Create test cases
1731#
1732
1733class ProcessesMixin(object):
1734 TYPE = 'processes'
1735 Process = multiprocessing.Process
1736 locals().update(get_attributes(multiprocessing, (
1737 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1738 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1739 'RawArray', 'current_process', 'active_children', 'Pipe',
1740 'connection', 'JoinableQueue'
1741 )))
1742
1743testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1744globals().update(testcases_processes)
1745
1746
1747class ManagerMixin(object):
1748 TYPE = 'manager'
1749 Process = multiprocessing.Process
1750 manager = object.__new__(multiprocessing.managers.SyncManager)
1751 locals().update(get_attributes(manager, (
1752 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1753 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1754 'Namespace', 'JoinableQueue'
1755 )))
1756
1757testcases_manager = create_test_cases(ManagerMixin, type='manager')
1758globals().update(testcases_manager)
1759
1760
1761class ThreadsMixin(object):
1762 TYPE = 'threads'
1763 Process = multiprocessing.dummy.Process
1764 locals().update(get_attributes(multiprocessing.dummy, (
1765 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1766 'Condition', 'Event', 'Value', 'Array', 'current_process',
1767 'active_children', 'Pipe', 'connection', 'dict', 'list',
1768 'Namespace', 'JoinableQueue'
1769 )))
1770
1771testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1772globals().update(testcases_threads)
1773
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001774class OtherTest(unittest.TestCase):
1775 # TODO: add more tests for deliver/answer challenge.
1776 def test_deliver_challenge_auth_failure(self):
1777 class _FakeConnection(object):
1778 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001779 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001780 def send_bytes(self, data):
1781 pass
1782 self.assertRaises(multiprocessing.AuthenticationError,
1783 multiprocessing.connection.deliver_challenge,
1784 _FakeConnection(), b'abc')
1785
1786 def test_answer_challenge_auth_failure(self):
1787 class _FakeConnection(object):
1788 def __init__(self):
1789 self.count = 0
1790 def recv_bytes(self, size):
1791 self.count += 1
1792 if self.count == 1:
1793 return multiprocessing.connection.CHALLENGE
1794 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001795 return b'something bogus'
1796 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001797 def send_bytes(self, data):
1798 pass
1799 self.assertRaises(multiprocessing.AuthenticationError,
1800 multiprocessing.connection.answer_challenge,
1801 _FakeConnection(), b'abc')
1802
Jesse Noller6214edd2009-01-19 16:23:53 +00001803testcases_other = [OtherTest, TestInvalidHandle]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001804
Benjamin Petersone711caf2008-06-11 16:44:04 +00001805#
1806#
1807#
1808
1809def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001810 if sys.platform.startswith("linux"):
1811 try:
1812 lock = multiprocessing.RLock()
1813 except OSError:
Amaury Forgeot d'Arc620626b2008-06-19 22:03:50 +00001814 from test.support import TestSkipped
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001815 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001816
Benjamin Petersone711caf2008-06-11 16:44:04 +00001817 if run is None:
1818 from test.support import run_unittest as run
1819
1820 util.get_temp_dir() # creates temp directory for use by all processes
1821
1822 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1823
Benjamin Peterson41181742008-07-02 20:22:54 +00001824 ProcessesMixin.pool = multiprocessing.Pool(4)
1825 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1826 ManagerMixin.manager.__init__()
1827 ManagerMixin.manager.start()
1828 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001829
1830 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001831 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1832 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001833 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1834 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001835 )
1836
1837 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1838 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1839 run(suite)
1840
Benjamin Peterson41181742008-07-02 20:22:54 +00001841 ThreadsMixin.pool.terminate()
1842 ProcessesMixin.pool.terminate()
1843 ManagerMixin.pool.terminate()
1844 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001845
Benjamin Peterson41181742008-07-02 20:22:54 +00001846 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001847
1848def main():
1849 test_main(unittest.TextTestRunner(verbosity=2).run)
1850
1851if __name__ == '__main__':
1852 main()