blob: 4e5d7597cbe3255b22f6ae04abc86eec39914b2b [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
22# Work around broken sem_open implementations
23try:
24 import multiprocessing.synchronize
25except ImportError as e:
26 from test.test_support import TestSkipped
27 raise TestSkipped(e)
28
Benjamin Petersone711caf2008-06-11 16:44:04 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.pool
34import _multiprocessing
35
36from multiprocessing import util
37
38#
39#
40#
41
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000042def latin(s):
43 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000044
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46# Constants
47#
48
49LOG_LEVEL = util.SUBWARNING
50#LOG_LEVEL = logging.WARNING
51
52DELTA = 0.1
53CHECK_TIMINGS = False # making true makes tests take a lot longer
54 # and can sometimes cause some non-serious
55 # failures because some calls block a bit
56 # longer than expected
57if CHECK_TIMINGS:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
59else:
60 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
61
62HAVE_GETVALUE = not getattr(_multiprocessing,
63 'HAVE_BROKEN_SEM_GETVALUE', False)
64
65#
66# Creates a wrapper for a function which records the time it takes to finish
67#
68
69class TimingWrapper(object):
70
71 def __init__(self, func):
72 self.func = func
73 self.elapsed = None
74
75 def __call__(self, *args, **kwds):
76 t = time.time()
77 try:
78 return self.func(*args, **kwds)
79 finally:
80 self.elapsed = time.time() - t
81
82#
83# Base class for test cases
84#
85
86class BaseTestCase(object):
87
88 ALLOWED_TYPES = ('processes', 'manager', 'threads')
89
90 def assertTimingAlmostEqual(self, a, b):
91 if CHECK_TIMINGS:
92 self.assertAlmostEqual(a, b, 1)
93
94 def assertReturnsIfImplemented(self, value, func, *args):
95 try:
96 res = func(*args)
97 except NotImplementedError:
98 pass
99 else:
100 return self.assertEqual(value, res)
101
102#
103# Return the value of a semaphore
104#
105
106def get_value(self):
107 try:
108 return self.get_value()
109 except AttributeError:
110 try:
111 return self._Semaphore__value
112 except AttributeError:
113 try:
114 return self._value
115 except AttributeError:
116 raise NotImplementedError
117
118#
119# Testcases
120#
121
122class _TestProcess(BaseTestCase):
123
124 ALLOWED_TYPES = ('processes', 'threads')
125
126 def test_current(self):
127 if self.TYPE == 'threads':
128 return
129
130 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000131 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000132
133 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000134 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000135 self.assertTrue(isinstance(authkey, bytes))
136 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000137 self.assertEqual(current.ident, os.getpid())
138 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139
140 def _test(self, q, *args, **kwds):
141 current = self.current_process()
142 q.put(args)
143 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000144 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000145 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000146 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147 q.put(current.pid)
148
149 def test_process(self):
150 q = self.Queue(1)
151 e = self.Event()
152 args = (q, 1, 2)
153 kwargs = {'hello':23, 'bye':2.54}
154 name = 'SomeProcess'
155 p = self.Process(
156 target=self._test, args=args, kwargs=kwargs, name=name
157 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 current = self.current_process()
160
161 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000162 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 self.assertTrue(p not in self.active_children())
166 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000167 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168
169 p.start()
170
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 self.assertEquals(p.is_alive(), True)
173 self.assertTrue(p in self.active_children())
174
175 self.assertEquals(q.get(), args[1:])
176 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000177 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000179 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 self.assertEquals(q.get(), p.pid)
181
182 p.join()
183
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000184 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 self.assertEquals(p.is_alive(), False)
186 self.assertTrue(p not in self.active_children())
187
188 def _test_terminate(self):
189 time.sleep(1000)
190
191 def test_terminate(self):
192 if self.TYPE == 'threads':
193 return
194
195 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 p.start()
198
199 self.assertEqual(p.is_alive(), True)
200 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 p.terminate()
204
205 join = TimingWrapper(p.join)
206 self.assertEqual(join(), None)
207 self.assertTimingAlmostEqual(join.elapsed, 0.0)
208
209 self.assertEqual(p.is_alive(), False)
210 self.assertTrue(p not in self.active_children())
211
212 p.join()
213
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000214 # XXX sometimes get p.exitcode == 0 on Windows ...
215 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
217 def test_cpu_count(self):
218 try:
219 cpus = multiprocessing.cpu_count()
220 except NotImplementedError:
221 cpus = 1
222 self.assertTrue(type(cpus) is int)
223 self.assertTrue(cpus >= 1)
224
225 def test_active_children(self):
226 self.assertEqual(type(self.active_children()), list)
227
228 p = self.Process(target=time.sleep, args=(DELTA,))
229 self.assertTrue(p not in self.active_children())
230
231 p.start()
232 self.assertTrue(p in self.active_children())
233
234 p.join()
235 self.assertTrue(p not in self.active_children())
236
237 def _test_recursion(self, wconn, id):
238 from multiprocessing import forking
239 wconn.send(id)
240 if len(id) < 2:
241 for i in range(2):
242 p = self.Process(
243 target=self._test_recursion, args=(wconn, id+[i])
244 )
245 p.start()
246 p.join()
247
248 def test_recursion(self):
249 rconn, wconn = self.Pipe(duplex=False)
250 self._test_recursion(wconn, [])
251
252 time.sleep(DELTA)
253 result = []
254 while rconn.poll():
255 result.append(rconn.recv())
256
257 expected = [
258 [],
259 [0],
260 [0, 0],
261 [0, 1],
262 [1],
263 [1, 0],
264 [1, 1]
265 ]
266 self.assertEqual(result, expected)
267
268#
269#
270#
271
272class _UpperCaser(multiprocessing.Process):
273
274 def __init__(self):
275 multiprocessing.Process.__init__(self)
276 self.child_conn, self.parent_conn = multiprocessing.Pipe()
277
278 def run(self):
279 self.parent_conn.close()
280 for s in iter(self.child_conn.recv, None):
281 self.child_conn.send(s.upper())
282 self.child_conn.close()
283
284 def submit(self, s):
285 assert type(s) is str
286 self.parent_conn.send(s)
287 return self.parent_conn.recv()
288
289 def stop(self):
290 self.parent_conn.send(None)
291 self.parent_conn.close()
292 self.child_conn.close()
293
294class _TestSubclassingProcess(BaseTestCase):
295
296 ALLOWED_TYPES = ('processes',)
297
298 def test_subclassing(self):
299 uppercaser = _UpperCaser()
300 uppercaser.start()
301 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
302 self.assertEqual(uppercaser.submit('world'), 'WORLD')
303 uppercaser.stop()
304 uppercaser.join()
305
306#
307#
308#
309
310def queue_empty(q):
311 if hasattr(q, 'empty'):
312 return q.empty()
313 else:
314 return q.qsize() == 0
315
316def queue_full(q, maxsize):
317 if hasattr(q, 'full'):
318 return q.full()
319 else:
320 return q.qsize() == maxsize
321
322
323class _TestQueue(BaseTestCase):
324
325
326 def _test_put(self, queue, child_can_start, parent_can_continue):
327 child_can_start.wait()
328 for i in range(6):
329 queue.get()
330 parent_can_continue.set()
331
332 def test_put(self):
333 MAXSIZE = 6
334 queue = self.Queue(maxsize=MAXSIZE)
335 child_can_start = self.Event()
336 parent_can_continue = self.Event()
337
338 proc = self.Process(
339 target=self._test_put,
340 args=(queue, child_can_start, parent_can_continue)
341 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000342 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343 proc.start()
344
345 self.assertEqual(queue_empty(queue), True)
346 self.assertEqual(queue_full(queue, MAXSIZE), False)
347
348 queue.put(1)
349 queue.put(2, True)
350 queue.put(3, True, None)
351 queue.put(4, False)
352 queue.put(5, False, None)
353 queue.put_nowait(6)
354
355 # the values may be in buffer but not yet in pipe so sleep a bit
356 time.sleep(DELTA)
357
358 self.assertEqual(queue_empty(queue), False)
359 self.assertEqual(queue_full(queue, MAXSIZE), True)
360
361 put = TimingWrapper(queue.put)
362 put_nowait = TimingWrapper(queue.put_nowait)
363
364 self.assertRaises(pyqueue.Full, put, 7, False)
365 self.assertTimingAlmostEqual(put.elapsed, 0)
366
367 self.assertRaises(pyqueue.Full, put, 7, False, None)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(pyqueue.Full, put_nowait, 7)
371 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
372
373 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
374 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
375
376 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
377 self.assertTimingAlmostEqual(put.elapsed, 0)
378
379 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
380 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
381
382 child_can_start.set()
383 parent_can_continue.wait()
384
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
387
388 proc.join()
389
390 def _test_get(self, queue, child_can_start, parent_can_continue):
391 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000392 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393 queue.put(2)
394 queue.put(3)
395 queue.put(4)
396 queue.put(5)
397 parent_can_continue.set()
398
399 def test_get(self):
400 queue = self.Queue()
401 child_can_start = self.Event()
402 parent_can_continue = self.Event()
403
404 proc = self.Process(
405 target=self._test_get,
406 args=(queue, child_can_start, parent_can_continue)
407 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000408 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409 proc.start()
410
411 self.assertEqual(queue_empty(queue), True)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 time.sleep(DELTA)
417 self.assertEqual(queue_empty(queue), False)
418
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000419 # Hangs unexpectedly, remove for now
420 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421 self.assertEqual(queue.get(True, None), 2)
422 self.assertEqual(queue.get(True), 3)
423 self.assertEqual(queue.get(timeout=1), 4)
424 self.assertEqual(queue.get_nowait(), 5)
425
426 self.assertEqual(queue_empty(queue), True)
427
428 get = TimingWrapper(queue.get)
429 get_nowait = TimingWrapper(queue.get_nowait)
430
431 self.assertRaises(pyqueue.Empty, get, False)
432 self.assertTimingAlmostEqual(get.elapsed, 0)
433
434 self.assertRaises(pyqueue.Empty, get, False, None)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(pyqueue.Empty, get_nowait)
438 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
439
440 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
442
443 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
444 self.assertTimingAlmostEqual(get.elapsed, 0)
445
446 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
447 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
448
449 proc.join()
450
451 def _test_fork(self, queue):
452 for i in range(10, 20):
453 queue.put(i)
454 # note that at this point the items may only be buffered, so the
455 # process cannot shutdown until the feeder thread has finished
456 # pushing items onto the pipe.
457
458 def test_fork(self):
459 # Old versions of Queue would fail to create a new feeder
460 # thread for a forked process if the original process had its
461 # own feeder thread. This test checks that this no longer
462 # happens.
463
464 queue = self.Queue()
465
466 # put items on queue so that main process starts a feeder thread
467 for i in range(10):
468 queue.put(i)
469
470 # wait to make sure thread starts before we fork a new process
471 time.sleep(DELTA)
472
473 # fork process
474 p = self.Process(target=self._test_fork, args=(queue,))
475 p.start()
476
477 # check that all expected items are in the queue
478 for i in range(20):
479 self.assertEqual(queue.get(), i)
480 self.assertRaises(pyqueue.Empty, queue.get, False)
481
482 p.join()
483
484 def test_qsize(self):
485 q = self.Queue()
486 try:
487 self.assertEqual(q.qsize(), 0)
488 except NotImplementedError:
489 return
490 q.put(1)
491 self.assertEqual(q.qsize(), 1)
492 q.put(5)
493 self.assertEqual(q.qsize(), 2)
494 q.get()
495 self.assertEqual(q.qsize(), 1)
496 q.get()
497 self.assertEqual(q.qsize(), 0)
498
499 def _test_task_done(self, q):
500 for obj in iter(q.get, None):
501 time.sleep(DELTA)
502 q.task_done()
503
504 def test_task_done(self):
505 queue = self.JoinableQueue()
506
507 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
508 return
509
510 workers = [self.Process(target=self._test_task_done, args=(queue,))
511 for i in range(4)]
512
513 for p in workers:
514 p.start()
515
516 for i in range(10):
517 queue.put(i)
518
519 queue.join()
520
521 for p in workers:
522 queue.put(None)
523
524 for p in workers:
525 p.join()
526
527#
528#
529#
530
531class _TestLock(BaseTestCase):
532
533 def test_lock(self):
534 lock = self.Lock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(False), False)
537 self.assertEqual(lock.release(), None)
538 self.assertRaises((ValueError, threading.ThreadError), lock.release)
539
540 def test_rlock(self):
541 lock = self.RLock()
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((AssertionError, RuntimeError), lock.release)
549
550
551class _TestSemaphore(BaseTestCase):
552
553 def _test_semaphore(self, sem):
554 self.assertReturnsIfImplemented(2, get_value, sem)
555 self.assertEqual(sem.acquire(), True)
556 self.assertReturnsIfImplemented(1, get_value, sem)
557 self.assertEqual(sem.acquire(), True)
558 self.assertReturnsIfImplemented(0, get_value, sem)
559 self.assertEqual(sem.acquire(False), False)
560 self.assertReturnsIfImplemented(0, get_value, sem)
561 self.assertEqual(sem.release(), None)
562 self.assertReturnsIfImplemented(1, get_value, sem)
563 self.assertEqual(sem.release(), None)
564 self.assertReturnsIfImplemented(2, get_value, sem)
565
566 def test_semaphore(self):
567 sem = self.Semaphore(2)
568 self._test_semaphore(sem)
569 self.assertEqual(sem.release(), None)
570 self.assertReturnsIfImplemented(3, get_value, sem)
571 self.assertEqual(sem.release(), None)
572 self.assertReturnsIfImplemented(4, get_value, sem)
573
574 def test_bounded_semaphore(self):
575 sem = self.BoundedSemaphore(2)
576 self._test_semaphore(sem)
577 # Currently fails on OS/X
578 #if HAVE_GETVALUE:
579 # self.assertRaises(ValueError, sem.release)
580 # self.assertReturnsIfImplemented(2, get_value, sem)
581
582 def test_timeout(self):
583 if self.TYPE != 'processes':
584 return
585
586 sem = self.Semaphore(0)
587 acquire = TimingWrapper(sem.acquire)
588
589 self.assertEqual(acquire(False), False)
590 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
591
592 self.assertEqual(acquire(False, None), False)
593 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
594
595 self.assertEqual(acquire(False, TIMEOUT1), False)
596 self.assertTimingAlmostEqual(acquire.elapsed, 0)
597
598 self.assertEqual(acquire(True, TIMEOUT2), False)
599 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
600
601 self.assertEqual(acquire(timeout=TIMEOUT3), False)
602 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
603
604
605class _TestCondition(BaseTestCase):
606
607 def f(self, cond, sleeping, woken, timeout=None):
608 cond.acquire()
609 sleeping.release()
610 cond.wait(timeout)
611 woken.release()
612 cond.release()
613
614 def check_invariant(self, cond):
615 # this is only supposed to succeed when there are no sleepers
616 if self.TYPE == 'processes':
617 try:
618 sleepers = (cond._sleeping_count.get_value() -
619 cond._woken_count.get_value())
620 self.assertEqual(sleepers, 0)
621 self.assertEqual(cond._wait_semaphore.get_value(), 0)
622 except NotImplementedError:
623 pass
624
625 def test_notify(self):
626 cond = self.Condition()
627 sleeping = self.Semaphore(0)
628 woken = self.Semaphore(0)
629
630 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000631 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000632 p.start()
633
634 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000635 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000636 p.start()
637
638 # wait for both children to start sleeping
639 sleeping.acquire()
640 sleeping.acquire()
641
642 # check no process/thread has woken up
643 time.sleep(DELTA)
644 self.assertReturnsIfImplemented(0, get_value, woken)
645
646 # wake up one process/thread
647 cond.acquire()
648 cond.notify()
649 cond.release()
650
651 # check one process/thread has woken up
652 time.sleep(DELTA)
653 self.assertReturnsIfImplemented(1, get_value, woken)
654
655 # wake up another
656 cond.acquire()
657 cond.notify()
658 cond.release()
659
660 # check other has woken up
661 time.sleep(DELTA)
662 self.assertReturnsIfImplemented(2, get_value, woken)
663
664 # check state is not mucked up
665 self.check_invariant(cond)
666 p.join()
667
668 def test_notify_all(self):
669 cond = self.Condition()
670 sleeping = self.Semaphore(0)
671 woken = self.Semaphore(0)
672
673 # start some threads/processes which will timeout
674 for i in range(3):
675 p = self.Process(target=self.f,
676 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000677 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000678 p.start()
679
680 t = threading.Thread(target=self.f,
681 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000682 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683 t.start()
684
685 # wait for them all to sleep
686 for i in range(6):
687 sleeping.acquire()
688
689 # check they have all timed out
690 for i in range(6):
691 woken.acquire()
692 self.assertReturnsIfImplemented(0, get_value, woken)
693
694 # check state is not mucked up
695 self.check_invariant(cond)
696
697 # start some more threads/processes
698 for i in range(3):
699 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000700 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701 p.start()
702
703 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000704 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000705 t.start()
706
707 # wait for them to all sleep
708 for i in range(6):
709 sleeping.acquire()
710
711 # check no process/thread has woken up
712 time.sleep(DELTA)
713 self.assertReturnsIfImplemented(0, get_value, woken)
714
715 # wake them all up
716 cond.acquire()
717 cond.notify_all()
718 cond.release()
719
720 # check they have all woken
721 time.sleep(DELTA)
722 self.assertReturnsIfImplemented(6, get_value, woken)
723
724 # check state is not mucked up
725 self.check_invariant(cond)
726
727 def test_timeout(self):
728 cond = self.Condition()
729 wait = TimingWrapper(cond.wait)
730 cond.acquire()
731 res = wait(TIMEOUT1)
732 cond.release()
733 self.assertEqual(res, None)
734 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
735
736
737class _TestEvent(BaseTestCase):
738
739 def _test_event(self, event):
740 time.sleep(TIMEOUT2)
741 event.set()
742
743 def test_event(self):
744 event = self.Event()
745 wait = TimingWrapper(event.wait)
746
747 # Removed temporaily, due to API shear, this does not
748 # work with threading._Event objects. is_set == isSet
749 #self.assertEqual(event.is_set(), False)
750
751 self.assertEqual(wait(0.0), None)
752 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
753 self.assertEqual(wait(TIMEOUT1), None)
754 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
755
756 event.set()
757
758 # See note above on the API differences
759 # self.assertEqual(event.is_set(), True)
760 self.assertEqual(wait(), None)
761 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
762 self.assertEqual(wait(TIMEOUT1), None)
763 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
764 # self.assertEqual(event.is_set(), True)
765
766 event.clear()
767
768 #self.assertEqual(event.is_set(), False)
769
770 self.Process(target=self._test_event, args=(event,)).start()
771 self.assertEqual(wait(), None)
772
773#
774#
775#
776
777class _TestValue(BaseTestCase):
778
779 codes_values = [
780 ('i', 4343, 24234),
781 ('d', 3.625, -4.25),
782 ('h', -232, 234),
783 ('c', latin('x'), latin('y'))
784 ]
785
786 def _test(self, values):
787 for sv, cv in zip(values, self.codes_values):
788 sv.value = cv[2]
789
790
791 def test_value(self, raw=False):
792 if self.TYPE != 'processes':
793 return
794
795 if raw:
796 values = [self.RawValue(code, value)
797 for code, value, _ in self.codes_values]
798 else:
799 values = [self.Value(code, value)
800 for code, value, _ in self.codes_values]
801
802 for sv, cv in zip(values, self.codes_values):
803 self.assertEqual(sv.value, cv[1])
804
805 proc = self.Process(target=self._test, args=(values,))
806 proc.start()
807 proc.join()
808
809 for sv, cv in zip(values, self.codes_values):
810 self.assertEqual(sv.value, cv[2])
811
812 def test_rawvalue(self):
813 self.test_value(raw=True)
814
815 def test_getobj_getlock(self):
816 if self.TYPE != 'processes':
817 return
818
819 val1 = self.Value('i', 5)
820 lock1 = val1.get_lock()
821 obj1 = val1.get_obj()
822
823 val2 = self.Value('i', 5, lock=None)
824 lock2 = val2.get_lock()
825 obj2 = val2.get_obj()
826
827 lock = self.Lock()
828 val3 = self.Value('i', 5, lock=lock)
829 lock3 = val3.get_lock()
830 obj3 = val3.get_obj()
831 self.assertEqual(lock, lock3)
832
Jesse Nollerb0516a62009-01-18 03:11:38 +0000833 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 self.assertFalse(hasattr(arr4, 'get_lock'))
835 self.assertFalse(hasattr(arr4, 'get_obj'))
836
Jesse Nollerb0516a62009-01-18 03:11:38 +0000837 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
838
839 arr5 = self.RawValue('i', 5)
840 self.assertFalse(hasattr(arr5, 'get_lock'))
841 self.assertFalse(hasattr(arr5, 'get_obj'))
842
Benjamin Petersone711caf2008-06-11 16:44:04 +0000843
844class _TestArray(BaseTestCase):
845
846 def f(self, seq):
847 for i in range(1, len(seq)):
848 seq[i] += seq[i-1]
849
850 def test_array(self, raw=False):
851 if self.TYPE != 'processes':
852 return
853
854 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
855 if raw:
856 arr = self.RawArray('i', seq)
857 else:
858 arr = self.Array('i', seq)
859
860 self.assertEqual(len(arr), len(seq))
861 self.assertEqual(arr[3], seq[3])
862 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
863
864 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
865
866 self.assertEqual(list(arr[:]), seq)
867
868 self.f(seq)
869
870 p = self.Process(target=self.f, args=(arr,))
871 p.start()
872 p.join()
873
874 self.assertEqual(list(arr[:]), seq)
875
876 def test_rawarray(self):
877 self.test_array(raw=True)
878
879 def test_getobj_getlock_obj(self):
880 if self.TYPE != 'processes':
881 return
882
883 arr1 = self.Array('i', list(range(10)))
884 lock1 = arr1.get_lock()
885 obj1 = arr1.get_obj()
886
887 arr2 = self.Array('i', list(range(10)), lock=None)
888 lock2 = arr2.get_lock()
889 obj2 = arr2.get_obj()
890
891 lock = self.Lock()
892 arr3 = self.Array('i', list(range(10)), lock=lock)
893 lock3 = arr3.get_lock()
894 obj3 = arr3.get_obj()
895 self.assertEqual(lock, lock3)
896
Jesse Nollerb0516a62009-01-18 03:11:38 +0000897 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898 self.assertFalse(hasattr(arr4, 'get_lock'))
899 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000900 self.assertRaises(AttributeError,
901 self.Array, 'i', range(10), lock='notalock')
902
903 arr5 = self.RawArray('i', range(10))
904 self.assertFalse(hasattr(arr5, 'get_lock'))
905 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000906
907#
908#
909#
910
911class _TestContainers(BaseTestCase):
912
913 ALLOWED_TYPES = ('manager',)
914
915 def test_list(self):
916 a = self.list(list(range(10)))
917 self.assertEqual(a[:], list(range(10)))
918
919 b = self.list()
920 self.assertEqual(b[:], [])
921
922 b.extend(list(range(5)))
923 self.assertEqual(b[:], list(range(5)))
924
925 self.assertEqual(b[2], 2)
926 self.assertEqual(b[2:10], [2,3,4])
927
928 b *= 2
929 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
930
931 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
932
933 self.assertEqual(a[:], list(range(10)))
934
935 d = [a, b]
936 e = self.list(d)
937 self.assertEqual(
938 e[:],
939 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
940 )
941
942 f = self.list([a])
943 a.append('hello')
944 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
945
946 def test_dict(self):
947 d = self.dict()
948 indices = list(range(65, 70))
949 for i in indices:
950 d[i] = chr(i)
951 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
952 self.assertEqual(sorted(d.keys()), indices)
953 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
954 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
955
956 def test_namespace(self):
957 n = self.Namespace()
958 n.name = 'Bob'
959 n.job = 'Builder'
960 n._hidden = 'hidden'
961 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
962 del n.job
963 self.assertEqual(str(n), "Namespace(name='Bob')")
964 self.assertTrue(hasattr(n, 'name'))
965 self.assertTrue(not hasattr(n, 'job'))
966
967#
968#
969#
970
971def sqr(x, wait=0.0):
972 time.sleep(wait)
973 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000974class _TestPool(BaseTestCase):
975
976 def test_apply(self):
977 papply = self.pool.apply
978 self.assertEqual(papply(sqr, (5,)), sqr(5))
979 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
980
981 def test_map(self):
982 pmap = self.pool.map
983 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
984 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
985 list(map(sqr, list(range(100)))))
986
987 def test_async(self):
988 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
989 get = TimingWrapper(res.get)
990 self.assertEqual(get(), 49)
991 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
992
993 def test_async_timeout(self):
994 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
995 get = TimingWrapper(res.get)
996 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
997 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
998
999 def test_imap(self):
1000 it = self.pool.imap(sqr, list(range(10)))
1001 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1002
1003 it = self.pool.imap(sqr, list(range(10)))
1004 for i in range(10):
1005 self.assertEqual(next(it), i*i)
1006 self.assertRaises(StopIteration, it.__next__)
1007
1008 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1009 for i in range(1000):
1010 self.assertEqual(next(it), i*i)
1011 self.assertRaises(StopIteration, it.__next__)
1012
1013 def test_imap_unordered(self):
1014 it = self.pool.imap_unordered(sqr, list(range(1000)))
1015 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1016
1017 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1018 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1019
1020 def test_make_pool(self):
1021 p = multiprocessing.Pool(3)
1022 self.assertEqual(3, len(p._pool))
1023 p.close()
1024 p.join()
1025
1026 def test_terminate(self):
1027 if self.TYPE == 'manager':
1028 # On Unix a forked process increfs each shared object to
1029 # which its parent process held a reference. If the
1030 # forked process gets terminated then there is likely to
1031 # be a reference leak. So to prevent
1032 # _TestZZZNumberOfObjects from failing we skip this test
1033 # when using a manager.
1034 return
1035
1036 result = self.pool.map_async(
1037 time.sleep, [0.1 for i in range(10000)], chunksize=1
1038 )
1039 self.pool.terminate()
1040 join = TimingWrapper(self.pool.join)
1041 join()
1042 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001043#
1044# Test that manager has expected number of shared objects left
1045#
1046
1047class _TestZZZNumberOfObjects(BaseTestCase):
1048 # Because test cases are sorted alphabetically, this one will get
1049 # run after all the other tests for the manager. It tests that
1050 # there have been no "reference leaks" for the manager's shared
1051 # objects. Note the comment in _TestPool.test_terminate().
1052 ALLOWED_TYPES = ('manager',)
1053
1054 def test_number_of_objects(self):
1055 EXPECTED_NUMBER = 1 # the pool object is still alive
1056 multiprocessing.active_children() # discard dead process objs
1057 gc.collect() # do garbage collection
1058 refs = self.manager._number_of_objects()
1059 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001060 print(self.manager._debug_info())
Benjamin Petersone711caf2008-06-11 16:44:04 +00001061
1062 self.assertEqual(refs, EXPECTED_NUMBER)
1063
1064#
1065# Test of creating a customized manager class
1066#
1067
1068from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1069
1070class FooBar(object):
1071 def f(self):
1072 return 'f()'
1073 def g(self):
1074 raise ValueError
1075 def _h(self):
1076 return '_h()'
1077
1078def baz():
1079 for i in range(10):
1080 yield i*i
1081
1082class IteratorProxy(BaseProxy):
1083 _exposed_ = ('next', '__next__')
1084 def __iter__(self):
1085 return self
1086 def __next__(self):
1087 return self._callmethod('next')
1088 def __next__(self):
1089 return self._callmethod('__next__')
1090
1091class MyManager(BaseManager):
1092 pass
1093
1094MyManager.register('Foo', callable=FooBar)
1095MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1096MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1097
1098
1099class _TestMyManager(BaseTestCase):
1100
1101 ALLOWED_TYPES = ('manager',)
1102
1103 def test_mymanager(self):
1104 manager = MyManager()
1105 manager.start()
1106
1107 foo = manager.Foo()
1108 bar = manager.Bar()
1109 baz = manager.baz()
1110
1111 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1112 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1113
1114 self.assertEqual(foo_methods, ['f', 'g'])
1115 self.assertEqual(bar_methods, ['f', '_h'])
1116
1117 self.assertEqual(foo.f(), 'f()')
1118 self.assertRaises(ValueError, foo.g)
1119 self.assertEqual(foo._callmethod('f'), 'f()')
1120 self.assertRaises(RemoteError, foo._callmethod, '_h')
1121
1122 self.assertEqual(bar.f(), 'f()')
1123 self.assertEqual(bar._h(), '_h()')
1124 self.assertEqual(bar._callmethod('f'), 'f()')
1125 self.assertEqual(bar._callmethod('_h'), '_h()')
1126
1127 self.assertEqual(list(baz), [i*i for i in range(10)])
1128
1129 manager.shutdown()
1130
1131#
1132# Test of connecting to a remote server and using xmlrpclib for serialization
1133#
1134
1135_queue = pyqueue.Queue()
1136def get_queue():
1137 return _queue
1138
1139class QueueManager(BaseManager):
1140 '''manager class used by server process'''
1141QueueManager.register('get_queue', callable=get_queue)
1142
1143class QueueManager2(BaseManager):
1144 '''manager class which specifies the same interface as QueueManager'''
1145QueueManager2.register('get_queue')
1146
1147
1148SERIALIZER = 'xmlrpclib'
1149
1150class _TestRemoteManager(BaseTestCase):
1151
1152 ALLOWED_TYPES = ('manager',)
1153
1154 def _putter(self, address, authkey):
1155 manager = QueueManager2(
1156 address=address, authkey=authkey, serializer=SERIALIZER
1157 )
1158 manager.connect()
1159 queue = manager.get_queue()
1160 queue.put(('hello world', None, True, 2.25))
1161
1162 def test_remote(self):
1163 authkey = os.urandom(32)
1164
1165 manager = QueueManager(
1166 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1167 )
1168 manager.start()
1169
1170 p = self.Process(target=self._putter, args=(manager.address, authkey))
1171 p.start()
1172
1173 manager2 = QueueManager2(
1174 address=manager.address, authkey=authkey, serializer=SERIALIZER
1175 )
1176 manager2.connect()
1177 queue = manager2.get_queue()
1178
1179 # Note that xmlrpclib will deserialize object as a list not a tuple
1180 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1181
1182 # Because we are using xmlrpclib for serialization instead of
1183 # pickle this will cause a serialization error.
1184 self.assertRaises(Exception, queue.put, time.sleep)
1185
1186 # Make queue finalizer run before the server is stopped
1187 del queue
1188 manager.shutdown()
1189
1190#
1191#
1192#
1193
1194SENTINEL = latin('')
1195
1196class _TestConnection(BaseTestCase):
1197
1198 ALLOWED_TYPES = ('processes', 'threads')
1199
1200 def _echo(self, conn):
1201 for msg in iter(conn.recv_bytes, SENTINEL):
1202 conn.send_bytes(msg)
1203 conn.close()
1204
1205 def test_connection(self):
1206 conn, child_conn = self.Pipe()
1207
1208 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001209 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001210 p.start()
1211
1212 seq = [1, 2.25, None]
1213 msg = latin('hello world')
1214 longmsg = msg * 10
1215 arr = array.array('i', list(range(4)))
1216
1217 if self.TYPE == 'processes':
1218 self.assertEqual(type(conn.fileno()), int)
1219
1220 self.assertEqual(conn.send(seq), None)
1221 self.assertEqual(conn.recv(), seq)
1222
1223 self.assertEqual(conn.send_bytes(msg), None)
1224 self.assertEqual(conn.recv_bytes(), msg)
1225
1226 if self.TYPE == 'processes':
1227 buffer = array.array('i', [0]*10)
1228 expected = list(arr) + [0] * (10 - len(arr))
1229 self.assertEqual(conn.send_bytes(arr), None)
1230 self.assertEqual(conn.recv_bytes_into(buffer),
1231 len(arr) * buffer.itemsize)
1232 self.assertEqual(list(buffer), expected)
1233
1234 buffer = array.array('i', [0]*10)
1235 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1236 self.assertEqual(conn.send_bytes(arr), None)
1237 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1238 len(arr) * buffer.itemsize)
1239 self.assertEqual(list(buffer), expected)
1240
1241 buffer = bytearray(latin(' ' * 40))
1242 self.assertEqual(conn.send_bytes(longmsg), None)
1243 try:
1244 res = conn.recv_bytes_into(buffer)
1245 except multiprocessing.BufferTooShort as e:
1246 self.assertEqual(e.args, (longmsg,))
1247 else:
1248 self.fail('expected BufferTooShort, got %s' % res)
1249
1250 poll = TimingWrapper(conn.poll)
1251
1252 self.assertEqual(poll(), False)
1253 self.assertTimingAlmostEqual(poll.elapsed, 0)
1254
1255 self.assertEqual(poll(TIMEOUT1), False)
1256 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1257
1258 conn.send(None)
1259
1260 self.assertEqual(poll(TIMEOUT1), True)
1261 self.assertTimingAlmostEqual(poll.elapsed, 0)
1262
1263 self.assertEqual(conn.recv(), None)
1264
1265 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1266 conn.send_bytes(really_big_msg)
1267 self.assertEqual(conn.recv_bytes(), really_big_msg)
1268
1269 conn.send_bytes(SENTINEL) # tell child to quit
1270 child_conn.close()
1271
1272 if self.TYPE == 'processes':
1273 self.assertEqual(conn.readable, True)
1274 self.assertEqual(conn.writable, True)
1275 self.assertRaises(EOFError, conn.recv)
1276 self.assertRaises(EOFError, conn.recv_bytes)
1277
1278 p.join()
1279
1280 def test_duplex_false(self):
1281 reader, writer = self.Pipe(duplex=False)
1282 self.assertEqual(writer.send(1), None)
1283 self.assertEqual(reader.recv(), 1)
1284 if self.TYPE == 'processes':
1285 self.assertEqual(reader.readable, True)
1286 self.assertEqual(reader.writable, False)
1287 self.assertEqual(writer.readable, False)
1288 self.assertEqual(writer.writable, True)
1289 self.assertRaises(IOError, reader.send, 2)
1290 self.assertRaises(IOError, writer.recv)
1291 self.assertRaises(IOError, writer.poll)
1292
1293 def test_spawn_close(self):
1294 # We test that a pipe connection can be closed by parent
1295 # process immediately after child is spawned. On Windows this
1296 # would have sometimes failed on old versions because
1297 # child_conn would be closed before the child got a chance to
1298 # duplicate it.
1299 conn, child_conn = self.Pipe()
1300
1301 p = self.Process(target=self._echo, args=(child_conn,))
1302 p.start()
1303 child_conn.close() # this might complete before child initializes
1304
1305 msg = latin('hello')
1306 conn.send_bytes(msg)
1307 self.assertEqual(conn.recv_bytes(), msg)
1308
1309 conn.send_bytes(SENTINEL)
1310 conn.close()
1311 p.join()
1312
1313 def test_sendbytes(self):
1314 if self.TYPE != 'processes':
1315 return
1316
1317 msg = latin('abcdefghijklmnopqrstuvwxyz')
1318 a, b = self.Pipe()
1319
1320 a.send_bytes(msg)
1321 self.assertEqual(b.recv_bytes(), msg)
1322
1323 a.send_bytes(msg, 5)
1324 self.assertEqual(b.recv_bytes(), msg[5:])
1325
1326 a.send_bytes(msg, 7, 8)
1327 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1328
1329 a.send_bytes(msg, 26)
1330 self.assertEqual(b.recv_bytes(), latin(''))
1331
1332 a.send_bytes(msg, 26, 0)
1333 self.assertEqual(b.recv_bytes(), latin(''))
1334
1335 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1336
1337 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1338
1339 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1340
1341 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1342
1343 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1344
Benjamin Petersone711caf2008-06-11 16:44:04 +00001345class _TestListenerClient(BaseTestCase):
1346
1347 ALLOWED_TYPES = ('processes', 'threads')
1348
1349 def _test(self, address):
1350 conn = self.connection.Client(address)
1351 conn.send('hello')
1352 conn.close()
1353
1354 def test_listener_client(self):
1355 for family in self.connection.families:
1356 l = self.connection.Listener(family=family)
1357 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001358 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001359 p.start()
1360 conn = l.accept()
1361 self.assertEqual(conn.recv(), 'hello')
1362 p.join()
1363 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001364#
1365# Test of sending connection and socket objects between processes
1366#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001367"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001368class _TestPicklingConnections(BaseTestCase):
1369
1370 ALLOWED_TYPES = ('processes',)
1371
1372 def _listener(self, conn, families):
1373 for fam in families:
1374 l = self.connection.Listener(family=fam)
1375 conn.send(l.address)
1376 new_conn = l.accept()
1377 conn.send(new_conn)
1378
1379 if self.TYPE == 'processes':
1380 l = socket.socket()
1381 l.bind(('localhost', 0))
1382 conn.send(l.getsockname())
1383 l.listen(1)
1384 new_conn, addr = l.accept()
1385 conn.send(new_conn)
1386
1387 conn.recv()
1388
1389 def _remote(self, conn):
1390 for (address, msg) in iter(conn.recv, None):
1391 client = self.connection.Client(address)
1392 client.send(msg.upper())
1393 client.close()
1394
1395 if self.TYPE == 'processes':
1396 address, msg = conn.recv()
1397 client = socket.socket()
1398 client.connect(address)
1399 client.sendall(msg.upper())
1400 client.close()
1401
1402 conn.close()
1403
1404 def test_pickling(self):
1405 try:
1406 multiprocessing.allow_connection_pickling()
1407 except ImportError:
1408 return
1409
1410 families = self.connection.families
1411
1412 lconn, lconn0 = self.Pipe()
1413 lp = self.Process(target=self._listener, args=(lconn0, families))
1414 lp.start()
1415 lconn0.close()
1416
1417 rconn, rconn0 = self.Pipe()
1418 rp = self.Process(target=self._remote, args=(rconn0,))
1419 rp.start()
1420 rconn0.close()
1421
1422 for fam in families:
1423 msg = ('This connection uses family %s' % fam).encode('ascii')
1424 address = lconn.recv()
1425 rconn.send((address, msg))
1426 new_conn = lconn.recv()
1427 self.assertEqual(new_conn.recv(), msg.upper())
1428
1429 rconn.send(None)
1430
1431 if self.TYPE == 'processes':
1432 msg = latin('This connection uses a normal socket')
1433 address = lconn.recv()
1434 rconn.send((address, msg))
1435 if hasattr(socket, 'fromfd'):
1436 new_conn = lconn.recv()
1437 self.assertEqual(new_conn.recv(100), msg.upper())
1438 else:
1439 # XXX On Windows with Py2.6 need to backport fromfd()
1440 discard = lconn.recv_bytes()
1441
1442 lconn.send(None)
1443
1444 rconn.close()
1445 lconn.close()
1446
1447 lp.join()
1448 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001449"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001450#
1451#
1452#
1453
1454class _TestHeap(BaseTestCase):
1455
1456 ALLOWED_TYPES = ('processes',)
1457
1458 def test_heap(self):
1459 iterations = 5000
1460 maxblocks = 50
1461 blocks = []
1462
1463 # create and destroy lots of blocks of different sizes
1464 for i in range(iterations):
1465 size = int(random.lognormvariate(0, 1) * 1000)
1466 b = multiprocessing.heap.BufferWrapper(size)
1467 blocks.append(b)
1468 if len(blocks) > maxblocks:
1469 i = random.randrange(maxblocks)
1470 del blocks[i]
1471
1472 # get the heap object
1473 heap = multiprocessing.heap.BufferWrapper._heap
1474
1475 # verify the state of the heap
1476 all = []
1477 occupied = 0
1478 for L in list(heap._len_to_seq.values()):
1479 for arena, start, stop in L:
1480 all.append((heap._arenas.index(arena), start, stop,
1481 stop-start, 'free'))
1482 for arena, start, stop in heap._allocated_blocks:
1483 all.append((heap._arenas.index(arena), start, stop,
1484 stop-start, 'occupied'))
1485 occupied += (stop-start)
1486
1487 all.sort()
1488
1489 for i in range(len(all)-1):
1490 (arena, start, stop) = all[i][:3]
1491 (narena, nstart, nstop) = all[i+1][:3]
1492 self.assertTrue((arena != narena and nstart == 0) or
1493 (stop == nstart))
1494
1495#
1496#
1497#
1498
1499try:
1500 from ctypes import Structure, Value, copy, c_int, c_double
1501except ImportError:
1502 Structure = object
1503 c_int = c_double = None
1504
1505class _Foo(Structure):
1506 _fields_ = [
1507 ('x', c_int),
1508 ('y', c_double)
1509 ]
1510
1511class _TestSharedCTypes(BaseTestCase):
1512
1513 ALLOWED_TYPES = ('processes',)
1514
1515 def _double(self, x, y, foo, arr, string):
1516 x.value *= 2
1517 y.value *= 2
1518 foo.x *= 2
1519 foo.y *= 2
1520 string.value *= 2
1521 for i in range(len(arr)):
1522 arr[i] *= 2
1523
1524 def test_sharedctypes(self, lock=False):
1525 if c_int is None:
1526 return
1527
1528 x = Value('i', 7, lock=lock)
1529 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1530 foo = Value(_Foo, 3, 2, lock=lock)
1531 arr = Array('d', list(range(10)), lock=lock)
1532 string = Array('c', 20, lock=lock)
1533 string.value = 'hello'
1534
1535 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1536 p.start()
1537 p.join()
1538
1539 self.assertEqual(x.value, 14)
1540 self.assertAlmostEqual(y.value, 2.0/3.0)
1541 self.assertEqual(foo.x, 6)
1542 self.assertAlmostEqual(foo.y, 4.0)
1543 for i in range(10):
1544 self.assertAlmostEqual(arr[i], i*2)
1545 self.assertEqual(string.value, latin('hellohello'))
1546
1547 def test_synchronize(self):
1548 self.test_sharedctypes(lock=True)
1549
1550 def test_copy(self):
1551 if c_int is None:
1552 return
1553
1554 foo = _Foo(2, 5.0)
1555 bar = copy(foo)
1556 foo.x = 0
1557 foo.y = 0
1558 self.assertEqual(bar.x, 2)
1559 self.assertAlmostEqual(bar.y, 5.0)
1560
1561#
1562#
1563#
1564
1565class _TestFinalize(BaseTestCase):
1566
1567 ALLOWED_TYPES = ('processes',)
1568
1569 def _test_finalize(self, conn):
1570 class Foo(object):
1571 pass
1572
1573 a = Foo()
1574 util.Finalize(a, conn.send, args=('a',))
1575 del a # triggers callback for a
1576
1577 b = Foo()
1578 close_b = util.Finalize(b, conn.send, args=('b',))
1579 close_b() # triggers callback for b
1580 close_b() # does nothing because callback has already been called
1581 del b # does nothing because callback has already been called
1582
1583 c = Foo()
1584 util.Finalize(c, conn.send, args=('c',))
1585
1586 d10 = Foo()
1587 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1588
1589 d01 = Foo()
1590 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1591 d02 = Foo()
1592 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1593 d03 = Foo()
1594 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1595
1596 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1597
1598 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1599
1600 # call mutliprocessing's cleanup function then exit process without
1601 # garbage collecting locals
1602 util._exit_function()
1603 conn.close()
1604 os._exit(0)
1605
1606 def test_finalize(self):
1607 conn, child_conn = self.Pipe()
1608
1609 p = self.Process(target=self._test_finalize, args=(child_conn,))
1610 p.start()
1611 p.join()
1612
1613 result = [obj for obj in iter(conn.recv, 'STOP')]
1614 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1615
1616#
1617# Test that from ... import * works for each module
1618#
1619
1620class _TestImportStar(BaseTestCase):
1621
1622 ALLOWED_TYPES = ('processes',)
1623
1624 def test_import(self):
1625 modules = (
1626 'multiprocessing', 'multiprocessing.connection',
1627 'multiprocessing.heap', 'multiprocessing.managers',
1628 'multiprocessing.pool', 'multiprocessing.process',
1629 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1630 'multiprocessing.synchronize', 'multiprocessing.util'
1631 )
1632
1633 for name in modules:
1634 __import__(name)
1635 mod = sys.modules[name]
1636
1637 for attr in getattr(mod, '__all__', ()):
1638 self.assertTrue(
1639 hasattr(mod, attr),
1640 '%r does not have attribute %r' % (mod, attr)
1641 )
1642
1643#
1644# Quick test that logging works -- does not test logging output
1645#
1646
1647class _TestLogging(BaseTestCase):
1648
1649 ALLOWED_TYPES = ('processes',)
1650
1651 def test_enable_logging(self):
1652 logger = multiprocessing.get_logger()
1653 logger.setLevel(util.SUBWARNING)
1654 self.assertTrue(logger is not None)
1655 logger.debug('this will not be printed')
1656 logger.info('nor will this')
1657 logger.setLevel(LOG_LEVEL)
1658
1659 def _test_level(self, conn):
1660 logger = multiprocessing.get_logger()
1661 conn.send(logger.getEffectiveLevel())
1662
1663 def test_level(self):
1664 LEVEL1 = 32
1665 LEVEL2 = 37
1666
1667 logger = multiprocessing.get_logger()
1668 root_logger = logging.getLogger()
1669 root_level = root_logger.level
1670
1671 reader, writer = multiprocessing.Pipe(duplex=False)
1672
1673 logger.setLevel(LEVEL1)
1674 self.Process(target=self._test_level, args=(writer,)).start()
1675 self.assertEqual(LEVEL1, reader.recv())
1676
1677 logger.setLevel(logging.NOTSET)
1678 root_logger.setLevel(LEVEL2)
1679 self.Process(target=self._test_level, args=(writer,)).start()
1680 self.assertEqual(LEVEL2, reader.recv())
1681
1682 root_logger.setLevel(root_level)
1683 logger.setLevel(level=LOG_LEVEL)
1684
1685#
1686# Functions used to create test cases from the base ones in this module
1687#
1688
1689def get_attributes(Source, names):
1690 d = {}
1691 for name in names:
1692 obj = getattr(Source, name)
1693 if type(obj) == type(get_attributes):
1694 obj = staticmethod(obj)
1695 d[name] = obj
1696 return d
1697
1698def create_test_cases(Mixin, type):
1699 result = {}
1700 glob = globals()
1701 Type = type[0].upper() + type[1:]
1702
1703 for name in list(glob.keys()):
1704 if name.startswith('_Test'):
1705 base = glob[name]
1706 if type in base.ALLOWED_TYPES:
1707 newname = 'With' + Type + name[1:]
1708 class Temp(base, unittest.TestCase, Mixin):
1709 pass
1710 result[newname] = Temp
1711 Temp.__name__ = newname
1712 Temp.__module__ = Mixin.__module__
1713 return result
1714
1715#
1716# Create test cases
1717#
1718
1719class ProcessesMixin(object):
1720 TYPE = 'processes'
1721 Process = multiprocessing.Process
1722 locals().update(get_attributes(multiprocessing, (
1723 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1724 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1725 'RawArray', 'current_process', 'active_children', 'Pipe',
1726 'connection', 'JoinableQueue'
1727 )))
1728
1729testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1730globals().update(testcases_processes)
1731
1732
1733class ManagerMixin(object):
1734 TYPE = 'manager'
1735 Process = multiprocessing.Process
1736 manager = object.__new__(multiprocessing.managers.SyncManager)
1737 locals().update(get_attributes(manager, (
1738 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1739 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1740 'Namespace', 'JoinableQueue'
1741 )))
1742
1743testcases_manager = create_test_cases(ManagerMixin, type='manager')
1744globals().update(testcases_manager)
1745
1746
1747class ThreadsMixin(object):
1748 TYPE = 'threads'
1749 Process = multiprocessing.dummy.Process
1750 locals().update(get_attributes(multiprocessing.dummy, (
1751 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1752 'Condition', 'Event', 'Value', 'Array', 'current_process',
1753 'active_children', 'Pipe', 'connection', 'dict', 'list',
1754 'Namespace', 'JoinableQueue'
1755 )))
1756
1757testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1758globals().update(testcases_threads)
1759
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001760class OtherTest(unittest.TestCase):
1761 # TODO: add more tests for deliver/answer challenge.
1762 def test_deliver_challenge_auth_failure(self):
1763 class _FakeConnection(object):
1764 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001765 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001766 def send_bytes(self, data):
1767 pass
1768 self.assertRaises(multiprocessing.AuthenticationError,
1769 multiprocessing.connection.deliver_challenge,
1770 _FakeConnection(), b'abc')
1771
1772 def test_answer_challenge_auth_failure(self):
1773 class _FakeConnection(object):
1774 def __init__(self):
1775 self.count = 0
1776 def recv_bytes(self, size):
1777 self.count += 1
1778 if self.count == 1:
1779 return multiprocessing.connection.CHALLENGE
1780 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001781 return b'something bogus'
1782 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001783 def send_bytes(self, data):
1784 pass
1785 self.assertRaises(multiprocessing.AuthenticationError,
1786 multiprocessing.connection.answer_challenge,
1787 _FakeConnection(), b'abc')
1788
1789testcases_other = [OtherTest]
1790
Benjamin Petersone711caf2008-06-11 16:44:04 +00001791#
1792#
1793#
1794
1795def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001796 if sys.platform.startswith("linux"):
1797 try:
1798 lock = multiprocessing.RLock()
1799 except OSError:
Amaury Forgeot d'Arc620626b2008-06-19 22:03:50 +00001800 from test.support import TestSkipped
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001801 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001802
Benjamin Petersone711caf2008-06-11 16:44:04 +00001803 if run is None:
1804 from test.support import run_unittest as run
1805
1806 util.get_temp_dir() # creates temp directory for use by all processes
1807
1808 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1809
Benjamin Peterson41181742008-07-02 20:22:54 +00001810 ProcessesMixin.pool = multiprocessing.Pool(4)
1811 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1812 ManagerMixin.manager.__init__()
1813 ManagerMixin.manager.start()
1814 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001815
1816 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001817 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1818 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001819 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1820 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001821 )
1822
1823 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1824 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1825 run(suite)
1826
Benjamin Peterson41181742008-07-02 20:22:54 +00001827 ThreadsMixin.pool.terminate()
1828 ProcessesMixin.pool.terminate()
1829 ManagerMixin.pool.terminate()
1830 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001831
Benjamin Peterson41181742008-07-02 20:22:54 +00001832 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001833
1834def main():
1835 test_main(unittest.TextTestRunner(verbosity=2).run)
1836
1837if __name__ == '__main__':
1838 main()