blob: a8600c0fc74cd30e453a298337a93c03f1fb57ff [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
22# Work around broken sem_open implementations
23try:
24 import multiprocessing.synchronize
25except ImportError as e:
26 from test.test_support import TestSkipped
27 raise TestSkipped(e)
28
Benjamin Petersone711caf2008-06-11 16:44:04 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.pool
34import _multiprocessing
35
36from multiprocessing import util
37
38#
39#
40#
41
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000042def latin(s):
43 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000044
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46# Constants
47#
48
49LOG_LEVEL = util.SUBWARNING
50#LOG_LEVEL = logging.WARNING
51
52DELTA = 0.1
53CHECK_TIMINGS = False # making true makes tests take a lot longer
54 # and can sometimes cause some non-serious
55 # failures because some calls block a bit
56 # longer than expected
57if CHECK_TIMINGS:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
59else:
60 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
61
62HAVE_GETVALUE = not getattr(_multiprocessing,
63 'HAVE_BROKEN_SEM_GETVALUE', False)
64
65#
66# Creates a wrapper for a function which records the time it takes to finish
67#
68
69class TimingWrapper(object):
70
71 def __init__(self, func):
72 self.func = func
73 self.elapsed = None
74
75 def __call__(self, *args, **kwds):
76 t = time.time()
77 try:
78 return self.func(*args, **kwds)
79 finally:
80 self.elapsed = time.time() - t
81
82#
83# Base class for test cases
84#
85
86class BaseTestCase(object):
87
88 ALLOWED_TYPES = ('processes', 'manager', 'threads')
89
90 def assertTimingAlmostEqual(self, a, b):
91 if CHECK_TIMINGS:
92 self.assertAlmostEqual(a, b, 1)
93
94 def assertReturnsIfImplemented(self, value, func, *args):
95 try:
96 res = func(*args)
97 except NotImplementedError:
98 pass
99 else:
100 return self.assertEqual(value, res)
101
102#
103# Return the value of a semaphore
104#
105
106def get_value(self):
107 try:
108 return self.get_value()
109 except AttributeError:
110 try:
111 return self._Semaphore__value
112 except AttributeError:
113 try:
114 return self._value
115 except AttributeError:
116 raise NotImplementedError
117
118#
119# Testcases
120#
121
122class _TestProcess(BaseTestCase):
123
124 ALLOWED_TYPES = ('processes', 'threads')
125
126 def test_current(self):
127 if self.TYPE == 'threads':
128 return
129
130 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000131 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000132
133 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000134 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000135 self.assertTrue(isinstance(authkey, bytes))
136 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000137 self.assertEqual(current.ident, os.getpid())
138 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139
140 def _test(self, q, *args, **kwds):
141 current = self.current_process()
142 q.put(args)
143 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000144 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000145 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000146 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147 q.put(current.pid)
148
149 def test_process(self):
150 q = self.Queue(1)
151 e = self.Event()
152 args = (q, 1, 2)
153 kwargs = {'hello':23, 'bye':2.54}
154 name = 'SomeProcess'
155 p = self.Process(
156 target=self._test, args=args, kwargs=kwargs, name=name
157 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 current = self.current_process()
160
161 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000162 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 self.assertTrue(p not in self.active_children())
166 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000167 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168
169 p.start()
170
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 self.assertEquals(p.is_alive(), True)
173 self.assertTrue(p in self.active_children())
174
175 self.assertEquals(q.get(), args[1:])
176 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000177 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000179 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 self.assertEquals(q.get(), p.pid)
181
182 p.join()
183
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000184 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 self.assertEquals(p.is_alive(), False)
186 self.assertTrue(p not in self.active_children())
187
188 def _test_terminate(self):
189 time.sleep(1000)
190
191 def test_terminate(self):
192 if self.TYPE == 'threads':
193 return
194
195 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 p.start()
198
199 self.assertEqual(p.is_alive(), True)
200 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 p.terminate()
204
205 join = TimingWrapper(p.join)
206 self.assertEqual(join(), None)
207 self.assertTimingAlmostEqual(join.elapsed, 0.0)
208
209 self.assertEqual(p.is_alive(), False)
210 self.assertTrue(p not in self.active_children())
211
212 p.join()
213
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000214 # XXX sometimes get p.exitcode == 0 on Windows ...
215 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
217 def test_cpu_count(self):
218 try:
219 cpus = multiprocessing.cpu_count()
220 except NotImplementedError:
221 cpus = 1
222 self.assertTrue(type(cpus) is int)
223 self.assertTrue(cpus >= 1)
224
225 def test_active_children(self):
226 self.assertEqual(type(self.active_children()), list)
227
228 p = self.Process(target=time.sleep, args=(DELTA,))
229 self.assertTrue(p not in self.active_children())
230
231 p.start()
232 self.assertTrue(p in self.active_children())
233
234 p.join()
235 self.assertTrue(p not in self.active_children())
236
237 def _test_recursion(self, wconn, id):
238 from multiprocessing import forking
239 wconn.send(id)
240 if len(id) < 2:
241 for i in range(2):
242 p = self.Process(
243 target=self._test_recursion, args=(wconn, id+[i])
244 )
245 p.start()
246 p.join()
247
248 def test_recursion(self):
249 rconn, wconn = self.Pipe(duplex=False)
250 self._test_recursion(wconn, [])
251
252 time.sleep(DELTA)
253 result = []
254 while rconn.poll():
255 result.append(rconn.recv())
256
257 expected = [
258 [],
259 [0],
260 [0, 0],
261 [0, 1],
262 [1],
263 [1, 0],
264 [1, 1]
265 ]
266 self.assertEqual(result, expected)
267
268#
269#
270#
271
272class _UpperCaser(multiprocessing.Process):
273
274 def __init__(self):
275 multiprocessing.Process.__init__(self)
276 self.child_conn, self.parent_conn = multiprocessing.Pipe()
277
278 def run(self):
279 self.parent_conn.close()
280 for s in iter(self.child_conn.recv, None):
281 self.child_conn.send(s.upper())
282 self.child_conn.close()
283
284 def submit(self, s):
285 assert type(s) is str
286 self.parent_conn.send(s)
287 return self.parent_conn.recv()
288
289 def stop(self):
290 self.parent_conn.send(None)
291 self.parent_conn.close()
292 self.child_conn.close()
293
294class _TestSubclassingProcess(BaseTestCase):
295
296 ALLOWED_TYPES = ('processes',)
297
298 def test_subclassing(self):
299 uppercaser = _UpperCaser()
300 uppercaser.start()
301 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
302 self.assertEqual(uppercaser.submit('world'), 'WORLD')
303 uppercaser.stop()
304 uppercaser.join()
305
306#
307#
308#
309
310def queue_empty(q):
311 if hasattr(q, 'empty'):
312 return q.empty()
313 else:
314 return q.qsize() == 0
315
316def queue_full(q, maxsize):
317 if hasattr(q, 'full'):
318 return q.full()
319 else:
320 return q.qsize() == maxsize
321
322
323class _TestQueue(BaseTestCase):
324
325
326 def _test_put(self, queue, child_can_start, parent_can_continue):
327 child_can_start.wait()
328 for i in range(6):
329 queue.get()
330 parent_can_continue.set()
331
332 def test_put(self):
333 MAXSIZE = 6
334 queue = self.Queue(maxsize=MAXSIZE)
335 child_can_start = self.Event()
336 parent_can_continue = self.Event()
337
338 proc = self.Process(
339 target=self._test_put,
340 args=(queue, child_can_start, parent_can_continue)
341 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000342 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343 proc.start()
344
345 self.assertEqual(queue_empty(queue), True)
346 self.assertEqual(queue_full(queue, MAXSIZE), False)
347
348 queue.put(1)
349 queue.put(2, True)
350 queue.put(3, True, None)
351 queue.put(4, False)
352 queue.put(5, False, None)
353 queue.put_nowait(6)
354
355 # the values may be in buffer but not yet in pipe so sleep a bit
356 time.sleep(DELTA)
357
358 self.assertEqual(queue_empty(queue), False)
359 self.assertEqual(queue_full(queue, MAXSIZE), True)
360
361 put = TimingWrapper(queue.put)
362 put_nowait = TimingWrapper(queue.put_nowait)
363
364 self.assertRaises(pyqueue.Full, put, 7, False)
365 self.assertTimingAlmostEqual(put.elapsed, 0)
366
367 self.assertRaises(pyqueue.Full, put, 7, False, None)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(pyqueue.Full, put_nowait, 7)
371 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
372
373 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
374 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
375
376 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
377 self.assertTimingAlmostEqual(put.elapsed, 0)
378
379 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
380 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
381
382 child_can_start.set()
383 parent_can_continue.wait()
384
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
387
388 proc.join()
389
390 def _test_get(self, queue, child_can_start, parent_can_continue):
391 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000392 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393 queue.put(2)
394 queue.put(3)
395 queue.put(4)
396 queue.put(5)
397 parent_can_continue.set()
398
399 def test_get(self):
400 queue = self.Queue()
401 child_can_start = self.Event()
402 parent_can_continue = self.Event()
403
404 proc = self.Process(
405 target=self._test_get,
406 args=(queue, child_can_start, parent_can_continue)
407 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000408 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409 proc.start()
410
411 self.assertEqual(queue_empty(queue), True)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 time.sleep(DELTA)
417 self.assertEqual(queue_empty(queue), False)
418
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000419 # Hangs unexpectedly, remove for now
420 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421 self.assertEqual(queue.get(True, None), 2)
422 self.assertEqual(queue.get(True), 3)
423 self.assertEqual(queue.get(timeout=1), 4)
424 self.assertEqual(queue.get_nowait(), 5)
425
426 self.assertEqual(queue_empty(queue), True)
427
428 get = TimingWrapper(queue.get)
429 get_nowait = TimingWrapper(queue.get_nowait)
430
431 self.assertRaises(pyqueue.Empty, get, False)
432 self.assertTimingAlmostEqual(get.elapsed, 0)
433
434 self.assertRaises(pyqueue.Empty, get, False, None)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(pyqueue.Empty, get_nowait)
438 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
439
440 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
442
443 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
444 self.assertTimingAlmostEqual(get.elapsed, 0)
445
446 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
447 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
448
449 proc.join()
450
451 def _test_fork(self, queue):
452 for i in range(10, 20):
453 queue.put(i)
454 # note that at this point the items may only be buffered, so the
455 # process cannot shutdown until the feeder thread has finished
456 # pushing items onto the pipe.
457
458 def test_fork(self):
459 # Old versions of Queue would fail to create a new feeder
460 # thread for a forked process if the original process had its
461 # own feeder thread. This test checks that this no longer
462 # happens.
463
464 queue = self.Queue()
465
466 # put items on queue so that main process starts a feeder thread
467 for i in range(10):
468 queue.put(i)
469
470 # wait to make sure thread starts before we fork a new process
471 time.sleep(DELTA)
472
473 # fork process
474 p = self.Process(target=self._test_fork, args=(queue,))
475 p.start()
476
477 # check that all expected items are in the queue
478 for i in range(20):
479 self.assertEqual(queue.get(), i)
480 self.assertRaises(pyqueue.Empty, queue.get, False)
481
482 p.join()
483
484 def test_qsize(self):
485 q = self.Queue()
486 try:
487 self.assertEqual(q.qsize(), 0)
488 except NotImplementedError:
489 return
490 q.put(1)
491 self.assertEqual(q.qsize(), 1)
492 q.put(5)
493 self.assertEqual(q.qsize(), 2)
494 q.get()
495 self.assertEqual(q.qsize(), 1)
496 q.get()
497 self.assertEqual(q.qsize(), 0)
498
499 def _test_task_done(self, q):
500 for obj in iter(q.get, None):
501 time.sleep(DELTA)
502 q.task_done()
503
504 def test_task_done(self):
505 queue = self.JoinableQueue()
506
507 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
508 return
509
510 workers = [self.Process(target=self._test_task_done, args=(queue,))
511 for i in range(4)]
512
513 for p in workers:
514 p.start()
515
516 for i in range(10):
517 queue.put(i)
518
519 queue.join()
520
521 for p in workers:
522 queue.put(None)
523
524 for p in workers:
525 p.join()
526
527#
528#
529#
530
531class _TestLock(BaseTestCase):
532
533 def test_lock(self):
534 lock = self.Lock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(False), False)
537 self.assertEqual(lock.release(), None)
538 self.assertRaises((ValueError, threading.ThreadError), lock.release)
539
540 def test_rlock(self):
541 lock = self.RLock()
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((AssertionError, RuntimeError), lock.release)
549
550
551class _TestSemaphore(BaseTestCase):
552
553 def _test_semaphore(self, sem):
554 self.assertReturnsIfImplemented(2, get_value, sem)
555 self.assertEqual(sem.acquire(), True)
556 self.assertReturnsIfImplemented(1, get_value, sem)
557 self.assertEqual(sem.acquire(), True)
558 self.assertReturnsIfImplemented(0, get_value, sem)
559 self.assertEqual(sem.acquire(False), False)
560 self.assertReturnsIfImplemented(0, get_value, sem)
561 self.assertEqual(sem.release(), None)
562 self.assertReturnsIfImplemented(1, get_value, sem)
563 self.assertEqual(sem.release(), None)
564 self.assertReturnsIfImplemented(2, get_value, sem)
565
566 def test_semaphore(self):
567 sem = self.Semaphore(2)
568 self._test_semaphore(sem)
569 self.assertEqual(sem.release(), None)
570 self.assertReturnsIfImplemented(3, get_value, sem)
571 self.assertEqual(sem.release(), None)
572 self.assertReturnsIfImplemented(4, get_value, sem)
573
574 def test_bounded_semaphore(self):
575 sem = self.BoundedSemaphore(2)
576 self._test_semaphore(sem)
577 # Currently fails on OS/X
578 #if HAVE_GETVALUE:
579 # self.assertRaises(ValueError, sem.release)
580 # self.assertReturnsIfImplemented(2, get_value, sem)
581
582 def test_timeout(self):
583 if self.TYPE != 'processes':
584 return
585
586 sem = self.Semaphore(0)
587 acquire = TimingWrapper(sem.acquire)
588
589 self.assertEqual(acquire(False), False)
590 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
591
592 self.assertEqual(acquire(False, None), False)
593 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
594
595 self.assertEqual(acquire(False, TIMEOUT1), False)
596 self.assertTimingAlmostEqual(acquire.elapsed, 0)
597
598 self.assertEqual(acquire(True, TIMEOUT2), False)
599 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
600
601 self.assertEqual(acquire(timeout=TIMEOUT3), False)
602 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
603
604
605class _TestCondition(BaseTestCase):
606
607 def f(self, cond, sleeping, woken, timeout=None):
608 cond.acquire()
609 sleeping.release()
610 cond.wait(timeout)
611 woken.release()
612 cond.release()
613
614 def check_invariant(self, cond):
615 # this is only supposed to succeed when there are no sleepers
616 if self.TYPE == 'processes':
617 try:
618 sleepers = (cond._sleeping_count.get_value() -
619 cond._woken_count.get_value())
620 self.assertEqual(sleepers, 0)
621 self.assertEqual(cond._wait_semaphore.get_value(), 0)
622 except NotImplementedError:
623 pass
624
625 def test_notify(self):
626 cond = self.Condition()
627 sleeping = self.Semaphore(0)
628 woken = self.Semaphore(0)
629
630 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000631 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000632 p.start()
633
634 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000635 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000636 p.start()
637
638 # wait for both children to start sleeping
639 sleeping.acquire()
640 sleeping.acquire()
641
642 # check no process/thread has woken up
643 time.sleep(DELTA)
644 self.assertReturnsIfImplemented(0, get_value, woken)
645
646 # wake up one process/thread
647 cond.acquire()
648 cond.notify()
649 cond.release()
650
651 # check one process/thread has woken up
652 time.sleep(DELTA)
653 self.assertReturnsIfImplemented(1, get_value, woken)
654
655 # wake up another
656 cond.acquire()
657 cond.notify()
658 cond.release()
659
660 # check other has woken up
661 time.sleep(DELTA)
662 self.assertReturnsIfImplemented(2, get_value, woken)
663
664 # check state is not mucked up
665 self.check_invariant(cond)
666 p.join()
667
668 def test_notify_all(self):
669 cond = self.Condition()
670 sleeping = self.Semaphore(0)
671 woken = self.Semaphore(0)
672
673 # start some threads/processes which will timeout
674 for i in range(3):
675 p = self.Process(target=self.f,
676 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000677 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000678 p.start()
679
680 t = threading.Thread(target=self.f,
681 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000682 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683 t.start()
684
685 # wait for them all to sleep
686 for i in range(6):
687 sleeping.acquire()
688
689 # check they have all timed out
690 for i in range(6):
691 woken.acquire()
692 self.assertReturnsIfImplemented(0, get_value, woken)
693
694 # check state is not mucked up
695 self.check_invariant(cond)
696
697 # start some more threads/processes
698 for i in range(3):
699 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000700 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701 p.start()
702
703 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000704 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000705 t.start()
706
707 # wait for them to all sleep
708 for i in range(6):
709 sleeping.acquire()
710
711 # check no process/thread has woken up
712 time.sleep(DELTA)
713 self.assertReturnsIfImplemented(0, get_value, woken)
714
715 # wake them all up
716 cond.acquire()
717 cond.notify_all()
718 cond.release()
719
720 # check they have all woken
721 time.sleep(DELTA)
722 self.assertReturnsIfImplemented(6, get_value, woken)
723
724 # check state is not mucked up
725 self.check_invariant(cond)
726
727 def test_timeout(self):
728 cond = self.Condition()
729 wait = TimingWrapper(cond.wait)
730 cond.acquire()
731 res = wait(TIMEOUT1)
732 cond.release()
733 self.assertEqual(res, None)
734 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
735
736
737class _TestEvent(BaseTestCase):
738
739 def _test_event(self, event):
740 time.sleep(TIMEOUT2)
741 event.set()
742
743 def test_event(self):
744 event = self.Event()
745 wait = TimingWrapper(event.wait)
746
747 # Removed temporaily, due to API shear, this does not
748 # work with threading._Event objects. is_set == isSet
749 #self.assertEqual(event.is_set(), False)
750
751 self.assertEqual(wait(0.0), None)
752 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
753 self.assertEqual(wait(TIMEOUT1), None)
754 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
755
756 event.set()
757
758 # See note above on the API differences
759 # self.assertEqual(event.is_set(), True)
760 self.assertEqual(wait(), None)
761 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
762 self.assertEqual(wait(TIMEOUT1), None)
763 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
764 # self.assertEqual(event.is_set(), True)
765
766 event.clear()
767
768 #self.assertEqual(event.is_set(), False)
769
770 self.Process(target=self._test_event, args=(event,)).start()
771 self.assertEqual(wait(), None)
772
773#
774#
775#
776
777class _TestValue(BaseTestCase):
778
779 codes_values = [
780 ('i', 4343, 24234),
781 ('d', 3.625, -4.25),
782 ('h', -232, 234),
783 ('c', latin('x'), latin('y'))
784 ]
785
786 def _test(self, values):
787 for sv, cv in zip(values, self.codes_values):
788 sv.value = cv[2]
789
790
791 def test_value(self, raw=False):
792 if self.TYPE != 'processes':
793 return
794
795 if raw:
796 values = [self.RawValue(code, value)
797 for code, value, _ in self.codes_values]
798 else:
799 values = [self.Value(code, value)
800 for code, value, _ in self.codes_values]
801
802 for sv, cv in zip(values, self.codes_values):
803 self.assertEqual(sv.value, cv[1])
804
805 proc = self.Process(target=self._test, args=(values,))
806 proc.start()
807 proc.join()
808
809 for sv, cv in zip(values, self.codes_values):
810 self.assertEqual(sv.value, cv[2])
811
812 def test_rawvalue(self):
813 self.test_value(raw=True)
814
815 def test_getobj_getlock(self):
816 if self.TYPE != 'processes':
817 return
818
819 val1 = self.Value('i', 5)
820 lock1 = val1.get_lock()
821 obj1 = val1.get_obj()
822
823 val2 = self.Value('i', 5, lock=None)
824 lock2 = val2.get_lock()
825 obj2 = val2.get_obj()
826
827 lock = self.Lock()
828 val3 = self.Value('i', 5, lock=lock)
829 lock3 = val3.get_lock()
830 obj3 = val3.get_obj()
831 self.assertEqual(lock, lock3)
832
833 arr4 = self.RawValue('i', 5)
834 self.assertFalse(hasattr(arr4, 'get_lock'))
835 self.assertFalse(hasattr(arr4, 'get_obj'))
836
837
838class _TestArray(BaseTestCase):
839
840 def f(self, seq):
841 for i in range(1, len(seq)):
842 seq[i] += seq[i-1]
843
844 def test_array(self, raw=False):
845 if self.TYPE != 'processes':
846 return
847
848 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
849 if raw:
850 arr = self.RawArray('i', seq)
851 else:
852 arr = self.Array('i', seq)
853
854 self.assertEqual(len(arr), len(seq))
855 self.assertEqual(arr[3], seq[3])
856 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
857
858 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
859
860 self.assertEqual(list(arr[:]), seq)
861
862 self.f(seq)
863
864 p = self.Process(target=self.f, args=(arr,))
865 p.start()
866 p.join()
867
868 self.assertEqual(list(arr[:]), seq)
869
870 def test_rawarray(self):
871 self.test_array(raw=True)
872
873 def test_getobj_getlock_obj(self):
874 if self.TYPE != 'processes':
875 return
876
877 arr1 = self.Array('i', list(range(10)))
878 lock1 = arr1.get_lock()
879 obj1 = arr1.get_obj()
880
881 arr2 = self.Array('i', list(range(10)), lock=None)
882 lock2 = arr2.get_lock()
883 obj2 = arr2.get_obj()
884
885 lock = self.Lock()
886 arr3 = self.Array('i', list(range(10)), lock=lock)
887 lock3 = arr3.get_lock()
888 obj3 = arr3.get_obj()
889 self.assertEqual(lock, lock3)
890
891 arr4 = self.RawArray('i', list(range(10)))
892 self.assertFalse(hasattr(arr4, 'get_lock'))
893 self.assertFalse(hasattr(arr4, 'get_obj'))
894
895#
896#
897#
898
899class _TestContainers(BaseTestCase):
900
901 ALLOWED_TYPES = ('manager',)
902
903 def test_list(self):
904 a = self.list(list(range(10)))
905 self.assertEqual(a[:], list(range(10)))
906
907 b = self.list()
908 self.assertEqual(b[:], [])
909
910 b.extend(list(range(5)))
911 self.assertEqual(b[:], list(range(5)))
912
913 self.assertEqual(b[2], 2)
914 self.assertEqual(b[2:10], [2,3,4])
915
916 b *= 2
917 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
918
919 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
920
921 self.assertEqual(a[:], list(range(10)))
922
923 d = [a, b]
924 e = self.list(d)
925 self.assertEqual(
926 e[:],
927 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
928 )
929
930 f = self.list([a])
931 a.append('hello')
932 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
933
934 def test_dict(self):
935 d = self.dict()
936 indices = list(range(65, 70))
937 for i in indices:
938 d[i] = chr(i)
939 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
940 self.assertEqual(sorted(d.keys()), indices)
941 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
942 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
943
944 def test_namespace(self):
945 n = self.Namespace()
946 n.name = 'Bob'
947 n.job = 'Builder'
948 n._hidden = 'hidden'
949 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
950 del n.job
951 self.assertEqual(str(n), "Namespace(name='Bob')")
952 self.assertTrue(hasattr(n, 'name'))
953 self.assertTrue(not hasattr(n, 'job'))
954
955#
956#
957#
958
959def sqr(x, wait=0.0):
960 time.sleep(wait)
961 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000962class _TestPool(BaseTestCase):
963
964 def test_apply(self):
965 papply = self.pool.apply
966 self.assertEqual(papply(sqr, (5,)), sqr(5))
967 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
968
969 def test_map(self):
970 pmap = self.pool.map
971 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
972 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
973 list(map(sqr, list(range(100)))))
974
975 def test_async(self):
976 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
977 get = TimingWrapper(res.get)
978 self.assertEqual(get(), 49)
979 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
980
981 def test_async_timeout(self):
982 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
983 get = TimingWrapper(res.get)
984 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
985 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
986
987 def test_imap(self):
988 it = self.pool.imap(sqr, list(range(10)))
989 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
990
991 it = self.pool.imap(sqr, list(range(10)))
992 for i in range(10):
993 self.assertEqual(next(it), i*i)
994 self.assertRaises(StopIteration, it.__next__)
995
996 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
997 for i in range(1000):
998 self.assertEqual(next(it), i*i)
999 self.assertRaises(StopIteration, it.__next__)
1000
1001 def test_imap_unordered(self):
1002 it = self.pool.imap_unordered(sqr, list(range(1000)))
1003 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1004
1005 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1006 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1007
1008 def test_make_pool(self):
1009 p = multiprocessing.Pool(3)
1010 self.assertEqual(3, len(p._pool))
1011 p.close()
1012 p.join()
1013
1014 def test_terminate(self):
1015 if self.TYPE == 'manager':
1016 # On Unix a forked process increfs each shared object to
1017 # which its parent process held a reference. If the
1018 # forked process gets terminated then there is likely to
1019 # be a reference leak. So to prevent
1020 # _TestZZZNumberOfObjects from failing we skip this test
1021 # when using a manager.
1022 return
1023
1024 result = self.pool.map_async(
1025 time.sleep, [0.1 for i in range(10000)], chunksize=1
1026 )
1027 self.pool.terminate()
1028 join = TimingWrapper(self.pool.join)
1029 join()
1030 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001031#
1032# Test that manager has expected number of shared objects left
1033#
1034
1035class _TestZZZNumberOfObjects(BaseTestCase):
1036 # Because test cases are sorted alphabetically, this one will get
1037 # run after all the other tests for the manager. It tests that
1038 # there have been no "reference leaks" for the manager's shared
1039 # objects. Note the comment in _TestPool.test_terminate().
1040 ALLOWED_TYPES = ('manager',)
1041
1042 def test_number_of_objects(self):
1043 EXPECTED_NUMBER = 1 # the pool object is still alive
1044 multiprocessing.active_children() # discard dead process objs
1045 gc.collect() # do garbage collection
1046 refs = self.manager._number_of_objects()
1047 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001048 print(self.manager._debug_info())
Benjamin Petersone711caf2008-06-11 16:44:04 +00001049
1050 self.assertEqual(refs, EXPECTED_NUMBER)
1051
1052#
1053# Test of creating a customized manager class
1054#
1055
1056from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1057
1058class FooBar(object):
1059 def f(self):
1060 return 'f()'
1061 def g(self):
1062 raise ValueError
1063 def _h(self):
1064 return '_h()'
1065
1066def baz():
1067 for i in range(10):
1068 yield i*i
1069
1070class IteratorProxy(BaseProxy):
1071 _exposed_ = ('next', '__next__')
1072 def __iter__(self):
1073 return self
1074 def __next__(self):
1075 return self._callmethod('next')
1076 def __next__(self):
1077 return self._callmethod('__next__')
1078
1079class MyManager(BaseManager):
1080 pass
1081
1082MyManager.register('Foo', callable=FooBar)
1083MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1084MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1085
1086
1087class _TestMyManager(BaseTestCase):
1088
1089 ALLOWED_TYPES = ('manager',)
1090
1091 def test_mymanager(self):
1092 manager = MyManager()
1093 manager.start()
1094
1095 foo = manager.Foo()
1096 bar = manager.Bar()
1097 baz = manager.baz()
1098
1099 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1100 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1101
1102 self.assertEqual(foo_methods, ['f', 'g'])
1103 self.assertEqual(bar_methods, ['f', '_h'])
1104
1105 self.assertEqual(foo.f(), 'f()')
1106 self.assertRaises(ValueError, foo.g)
1107 self.assertEqual(foo._callmethod('f'), 'f()')
1108 self.assertRaises(RemoteError, foo._callmethod, '_h')
1109
1110 self.assertEqual(bar.f(), 'f()')
1111 self.assertEqual(bar._h(), '_h()')
1112 self.assertEqual(bar._callmethod('f'), 'f()')
1113 self.assertEqual(bar._callmethod('_h'), '_h()')
1114
1115 self.assertEqual(list(baz), [i*i for i in range(10)])
1116
1117 manager.shutdown()
1118
1119#
1120# Test of connecting to a remote server and using xmlrpclib for serialization
1121#
1122
1123_queue = pyqueue.Queue()
1124def get_queue():
1125 return _queue
1126
1127class QueueManager(BaseManager):
1128 '''manager class used by server process'''
1129QueueManager.register('get_queue', callable=get_queue)
1130
1131class QueueManager2(BaseManager):
1132 '''manager class which specifies the same interface as QueueManager'''
1133QueueManager2.register('get_queue')
1134
1135
1136SERIALIZER = 'xmlrpclib'
1137
1138class _TestRemoteManager(BaseTestCase):
1139
1140 ALLOWED_TYPES = ('manager',)
1141
1142 def _putter(self, address, authkey):
1143 manager = QueueManager2(
1144 address=address, authkey=authkey, serializer=SERIALIZER
1145 )
1146 manager.connect()
1147 queue = manager.get_queue()
1148 queue.put(('hello world', None, True, 2.25))
1149
1150 def test_remote(self):
1151 authkey = os.urandom(32)
1152
1153 manager = QueueManager(
1154 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1155 )
1156 manager.start()
1157
1158 p = self.Process(target=self._putter, args=(manager.address, authkey))
1159 p.start()
1160
1161 manager2 = QueueManager2(
1162 address=manager.address, authkey=authkey, serializer=SERIALIZER
1163 )
1164 manager2.connect()
1165 queue = manager2.get_queue()
1166
1167 # Note that xmlrpclib will deserialize object as a list not a tuple
1168 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1169
1170 # Because we are using xmlrpclib for serialization instead of
1171 # pickle this will cause a serialization error.
1172 self.assertRaises(Exception, queue.put, time.sleep)
1173
1174 # Make queue finalizer run before the server is stopped
1175 del queue
1176 manager.shutdown()
1177
1178#
1179#
1180#
1181
1182SENTINEL = latin('')
1183
1184class _TestConnection(BaseTestCase):
1185
1186 ALLOWED_TYPES = ('processes', 'threads')
1187
1188 def _echo(self, conn):
1189 for msg in iter(conn.recv_bytes, SENTINEL):
1190 conn.send_bytes(msg)
1191 conn.close()
1192
1193 def test_connection(self):
1194 conn, child_conn = self.Pipe()
1195
1196 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001197 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001198 p.start()
1199
1200 seq = [1, 2.25, None]
1201 msg = latin('hello world')
1202 longmsg = msg * 10
1203 arr = array.array('i', list(range(4)))
1204
1205 if self.TYPE == 'processes':
1206 self.assertEqual(type(conn.fileno()), int)
1207
1208 self.assertEqual(conn.send(seq), None)
1209 self.assertEqual(conn.recv(), seq)
1210
1211 self.assertEqual(conn.send_bytes(msg), None)
1212 self.assertEqual(conn.recv_bytes(), msg)
1213
1214 if self.TYPE == 'processes':
1215 buffer = array.array('i', [0]*10)
1216 expected = list(arr) + [0] * (10 - len(arr))
1217 self.assertEqual(conn.send_bytes(arr), None)
1218 self.assertEqual(conn.recv_bytes_into(buffer),
1219 len(arr) * buffer.itemsize)
1220 self.assertEqual(list(buffer), expected)
1221
1222 buffer = array.array('i', [0]*10)
1223 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1224 self.assertEqual(conn.send_bytes(arr), None)
1225 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1226 len(arr) * buffer.itemsize)
1227 self.assertEqual(list(buffer), expected)
1228
1229 buffer = bytearray(latin(' ' * 40))
1230 self.assertEqual(conn.send_bytes(longmsg), None)
1231 try:
1232 res = conn.recv_bytes_into(buffer)
1233 except multiprocessing.BufferTooShort as e:
1234 self.assertEqual(e.args, (longmsg,))
1235 else:
1236 self.fail('expected BufferTooShort, got %s' % res)
1237
1238 poll = TimingWrapper(conn.poll)
1239
1240 self.assertEqual(poll(), False)
1241 self.assertTimingAlmostEqual(poll.elapsed, 0)
1242
1243 self.assertEqual(poll(TIMEOUT1), False)
1244 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1245
1246 conn.send(None)
1247
1248 self.assertEqual(poll(TIMEOUT1), True)
1249 self.assertTimingAlmostEqual(poll.elapsed, 0)
1250
1251 self.assertEqual(conn.recv(), None)
1252
1253 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1254 conn.send_bytes(really_big_msg)
1255 self.assertEqual(conn.recv_bytes(), really_big_msg)
1256
1257 conn.send_bytes(SENTINEL) # tell child to quit
1258 child_conn.close()
1259
1260 if self.TYPE == 'processes':
1261 self.assertEqual(conn.readable, True)
1262 self.assertEqual(conn.writable, True)
1263 self.assertRaises(EOFError, conn.recv)
1264 self.assertRaises(EOFError, conn.recv_bytes)
1265
1266 p.join()
1267
1268 def test_duplex_false(self):
1269 reader, writer = self.Pipe(duplex=False)
1270 self.assertEqual(writer.send(1), None)
1271 self.assertEqual(reader.recv(), 1)
1272 if self.TYPE == 'processes':
1273 self.assertEqual(reader.readable, True)
1274 self.assertEqual(reader.writable, False)
1275 self.assertEqual(writer.readable, False)
1276 self.assertEqual(writer.writable, True)
1277 self.assertRaises(IOError, reader.send, 2)
1278 self.assertRaises(IOError, writer.recv)
1279 self.assertRaises(IOError, writer.poll)
1280
1281 def test_spawn_close(self):
1282 # We test that a pipe connection can be closed by parent
1283 # process immediately after child is spawned. On Windows this
1284 # would have sometimes failed on old versions because
1285 # child_conn would be closed before the child got a chance to
1286 # duplicate it.
1287 conn, child_conn = self.Pipe()
1288
1289 p = self.Process(target=self._echo, args=(child_conn,))
1290 p.start()
1291 child_conn.close() # this might complete before child initializes
1292
1293 msg = latin('hello')
1294 conn.send_bytes(msg)
1295 self.assertEqual(conn.recv_bytes(), msg)
1296
1297 conn.send_bytes(SENTINEL)
1298 conn.close()
1299 p.join()
1300
1301 def test_sendbytes(self):
1302 if self.TYPE != 'processes':
1303 return
1304
1305 msg = latin('abcdefghijklmnopqrstuvwxyz')
1306 a, b = self.Pipe()
1307
1308 a.send_bytes(msg)
1309 self.assertEqual(b.recv_bytes(), msg)
1310
1311 a.send_bytes(msg, 5)
1312 self.assertEqual(b.recv_bytes(), msg[5:])
1313
1314 a.send_bytes(msg, 7, 8)
1315 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1316
1317 a.send_bytes(msg, 26)
1318 self.assertEqual(b.recv_bytes(), latin(''))
1319
1320 a.send_bytes(msg, 26, 0)
1321 self.assertEqual(b.recv_bytes(), latin(''))
1322
1323 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1324
1325 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1326
1327 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1328
1329 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1330
1331 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1332
Benjamin Petersone711caf2008-06-11 16:44:04 +00001333class _TestListenerClient(BaseTestCase):
1334
1335 ALLOWED_TYPES = ('processes', 'threads')
1336
1337 def _test(self, address):
1338 conn = self.connection.Client(address)
1339 conn.send('hello')
1340 conn.close()
1341
1342 def test_listener_client(self):
1343 for family in self.connection.families:
1344 l = self.connection.Listener(family=family)
1345 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001346 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001347 p.start()
1348 conn = l.accept()
1349 self.assertEqual(conn.recv(), 'hello')
1350 p.join()
1351 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001352#
1353# Test of sending connection and socket objects between processes
1354#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001355"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001356class _TestPicklingConnections(BaseTestCase):
1357
1358 ALLOWED_TYPES = ('processes',)
1359
1360 def _listener(self, conn, families):
1361 for fam in families:
1362 l = self.connection.Listener(family=fam)
1363 conn.send(l.address)
1364 new_conn = l.accept()
1365 conn.send(new_conn)
1366
1367 if self.TYPE == 'processes':
1368 l = socket.socket()
1369 l.bind(('localhost', 0))
1370 conn.send(l.getsockname())
1371 l.listen(1)
1372 new_conn, addr = l.accept()
1373 conn.send(new_conn)
1374
1375 conn.recv()
1376
1377 def _remote(self, conn):
1378 for (address, msg) in iter(conn.recv, None):
1379 client = self.connection.Client(address)
1380 client.send(msg.upper())
1381 client.close()
1382
1383 if self.TYPE == 'processes':
1384 address, msg = conn.recv()
1385 client = socket.socket()
1386 client.connect(address)
1387 client.sendall(msg.upper())
1388 client.close()
1389
1390 conn.close()
1391
1392 def test_pickling(self):
1393 try:
1394 multiprocessing.allow_connection_pickling()
1395 except ImportError:
1396 return
1397
1398 families = self.connection.families
1399
1400 lconn, lconn0 = self.Pipe()
1401 lp = self.Process(target=self._listener, args=(lconn0, families))
1402 lp.start()
1403 lconn0.close()
1404
1405 rconn, rconn0 = self.Pipe()
1406 rp = self.Process(target=self._remote, args=(rconn0,))
1407 rp.start()
1408 rconn0.close()
1409
1410 for fam in families:
1411 msg = ('This connection uses family %s' % fam).encode('ascii')
1412 address = lconn.recv()
1413 rconn.send((address, msg))
1414 new_conn = lconn.recv()
1415 self.assertEqual(new_conn.recv(), msg.upper())
1416
1417 rconn.send(None)
1418
1419 if self.TYPE == 'processes':
1420 msg = latin('This connection uses a normal socket')
1421 address = lconn.recv()
1422 rconn.send((address, msg))
1423 if hasattr(socket, 'fromfd'):
1424 new_conn = lconn.recv()
1425 self.assertEqual(new_conn.recv(100), msg.upper())
1426 else:
1427 # XXX On Windows with Py2.6 need to backport fromfd()
1428 discard = lconn.recv_bytes()
1429
1430 lconn.send(None)
1431
1432 rconn.close()
1433 lconn.close()
1434
1435 lp.join()
1436 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001437"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001438#
1439#
1440#
1441
1442class _TestHeap(BaseTestCase):
1443
1444 ALLOWED_TYPES = ('processes',)
1445
1446 def test_heap(self):
1447 iterations = 5000
1448 maxblocks = 50
1449 blocks = []
1450
1451 # create and destroy lots of blocks of different sizes
1452 for i in range(iterations):
1453 size = int(random.lognormvariate(0, 1) * 1000)
1454 b = multiprocessing.heap.BufferWrapper(size)
1455 blocks.append(b)
1456 if len(blocks) > maxblocks:
1457 i = random.randrange(maxblocks)
1458 del blocks[i]
1459
1460 # get the heap object
1461 heap = multiprocessing.heap.BufferWrapper._heap
1462
1463 # verify the state of the heap
1464 all = []
1465 occupied = 0
1466 for L in list(heap._len_to_seq.values()):
1467 for arena, start, stop in L:
1468 all.append((heap._arenas.index(arena), start, stop,
1469 stop-start, 'free'))
1470 for arena, start, stop in heap._allocated_blocks:
1471 all.append((heap._arenas.index(arena), start, stop,
1472 stop-start, 'occupied'))
1473 occupied += (stop-start)
1474
1475 all.sort()
1476
1477 for i in range(len(all)-1):
1478 (arena, start, stop) = all[i][:3]
1479 (narena, nstart, nstop) = all[i+1][:3]
1480 self.assertTrue((arena != narena and nstart == 0) or
1481 (stop == nstart))
1482
1483#
1484#
1485#
1486
1487try:
1488 from ctypes import Structure, Value, copy, c_int, c_double
1489except ImportError:
1490 Structure = object
1491 c_int = c_double = None
1492
1493class _Foo(Structure):
1494 _fields_ = [
1495 ('x', c_int),
1496 ('y', c_double)
1497 ]
1498
1499class _TestSharedCTypes(BaseTestCase):
1500
1501 ALLOWED_TYPES = ('processes',)
1502
1503 def _double(self, x, y, foo, arr, string):
1504 x.value *= 2
1505 y.value *= 2
1506 foo.x *= 2
1507 foo.y *= 2
1508 string.value *= 2
1509 for i in range(len(arr)):
1510 arr[i] *= 2
1511
1512 def test_sharedctypes(self, lock=False):
1513 if c_int is None:
1514 return
1515
1516 x = Value('i', 7, lock=lock)
1517 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1518 foo = Value(_Foo, 3, 2, lock=lock)
1519 arr = Array('d', list(range(10)), lock=lock)
1520 string = Array('c', 20, lock=lock)
1521 string.value = 'hello'
1522
1523 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1524 p.start()
1525 p.join()
1526
1527 self.assertEqual(x.value, 14)
1528 self.assertAlmostEqual(y.value, 2.0/3.0)
1529 self.assertEqual(foo.x, 6)
1530 self.assertAlmostEqual(foo.y, 4.0)
1531 for i in range(10):
1532 self.assertAlmostEqual(arr[i], i*2)
1533 self.assertEqual(string.value, latin('hellohello'))
1534
1535 def test_synchronize(self):
1536 self.test_sharedctypes(lock=True)
1537
1538 def test_copy(self):
1539 if c_int is None:
1540 return
1541
1542 foo = _Foo(2, 5.0)
1543 bar = copy(foo)
1544 foo.x = 0
1545 foo.y = 0
1546 self.assertEqual(bar.x, 2)
1547 self.assertAlmostEqual(bar.y, 5.0)
1548
1549#
1550#
1551#
1552
1553class _TestFinalize(BaseTestCase):
1554
1555 ALLOWED_TYPES = ('processes',)
1556
1557 def _test_finalize(self, conn):
1558 class Foo(object):
1559 pass
1560
1561 a = Foo()
1562 util.Finalize(a, conn.send, args=('a',))
1563 del a # triggers callback for a
1564
1565 b = Foo()
1566 close_b = util.Finalize(b, conn.send, args=('b',))
1567 close_b() # triggers callback for b
1568 close_b() # does nothing because callback has already been called
1569 del b # does nothing because callback has already been called
1570
1571 c = Foo()
1572 util.Finalize(c, conn.send, args=('c',))
1573
1574 d10 = Foo()
1575 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1576
1577 d01 = Foo()
1578 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1579 d02 = Foo()
1580 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1581 d03 = Foo()
1582 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1583
1584 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1585
1586 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1587
1588 # call mutliprocessing's cleanup function then exit process without
1589 # garbage collecting locals
1590 util._exit_function()
1591 conn.close()
1592 os._exit(0)
1593
1594 def test_finalize(self):
1595 conn, child_conn = self.Pipe()
1596
1597 p = self.Process(target=self._test_finalize, args=(child_conn,))
1598 p.start()
1599 p.join()
1600
1601 result = [obj for obj in iter(conn.recv, 'STOP')]
1602 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1603
1604#
1605# Test that from ... import * works for each module
1606#
1607
1608class _TestImportStar(BaseTestCase):
1609
1610 ALLOWED_TYPES = ('processes',)
1611
1612 def test_import(self):
1613 modules = (
1614 'multiprocessing', 'multiprocessing.connection',
1615 'multiprocessing.heap', 'multiprocessing.managers',
1616 'multiprocessing.pool', 'multiprocessing.process',
1617 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1618 'multiprocessing.synchronize', 'multiprocessing.util'
1619 )
1620
1621 for name in modules:
1622 __import__(name)
1623 mod = sys.modules[name]
1624
1625 for attr in getattr(mod, '__all__', ()):
1626 self.assertTrue(
1627 hasattr(mod, attr),
1628 '%r does not have attribute %r' % (mod, attr)
1629 )
1630
1631#
1632# Quick test that logging works -- does not test logging output
1633#
1634
1635class _TestLogging(BaseTestCase):
1636
1637 ALLOWED_TYPES = ('processes',)
1638
1639 def test_enable_logging(self):
1640 logger = multiprocessing.get_logger()
1641 logger.setLevel(util.SUBWARNING)
1642 self.assertTrue(logger is not None)
1643 logger.debug('this will not be printed')
1644 logger.info('nor will this')
1645 logger.setLevel(LOG_LEVEL)
1646
1647 def _test_level(self, conn):
1648 logger = multiprocessing.get_logger()
1649 conn.send(logger.getEffectiveLevel())
1650
1651 def test_level(self):
1652 LEVEL1 = 32
1653 LEVEL2 = 37
1654
1655 logger = multiprocessing.get_logger()
1656 root_logger = logging.getLogger()
1657 root_level = root_logger.level
1658
1659 reader, writer = multiprocessing.Pipe(duplex=False)
1660
1661 logger.setLevel(LEVEL1)
1662 self.Process(target=self._test_level, args=(writer,)).start()
1663 self.assertEqual(LEVEL1, reader.recv())
1664
1665 logger.setLevel(logging.NOTSET)
1666 root_logger.setLevel(LEVEL2)
1667 self.Process(target=self._test_level, args=(writer,)).start()
1668 self.assertEqual(LEVEL2, reader.recv())
1669
1670 root_logger.setLevel(root_level)
1671 logger.setLevel(level=LOG_LEVEL)
1672
1673#
1674# Functions used to create test cases from the base ones in this module
1675#
1676
1677def get_attributes(Source, names):
1678 d = {}
1679 for name in names:
1680 obj = getattr(Source, name)
1681 if type(obj) == type(get_attributes):
1682 obj = staticmethod(obj)
1683 d[name] = obj
1684 return d
1685
1686def create_test_cases(Mixin, type):
1687 result = {}
1688 glob = globals()
1689 Type = type[0].upper() + type[1:]
1690
1691 for name in list(glob.keys()):
1692 if name.startswith('_Test'):
1693 base = glob[name]
1694 if type in base.ALLOWED_TYPES:
1695 newname = 'With' + Type + name[1:]
1696 class Temp(base, unittest.TestCase, Mixin):
1697 pass
1698 result[newname] = Temp
1699 Temp.__name__ = newname
1700 Temp.__module__ = Mixin.__module__
1701 return result
1702
1703#
1704# Create test cases
1705#
1706
1707class ProcessesMixin(object):
1708 TYPE = 'processes'
1709 Process = multiprocessing.Process
1710 locals().update(get_attributes(multiprocessing, (
1711 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1712 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1713 'RawArray', 'current_process', 'active_children', 'Pipe',
1714 'connection', 'JoinableQueue'
1715 )))
1716
1717testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1718globals().update(testcases_processes)
1719
1720
1721class ManagerMixin(object):
1722 TYPE = 'manager'
1723 Process = multiprocessing.Process
1724 manager = object.__new__(multiprocessing.managers.SyncManager)
1725 locals().update(get_attributes(manager, (
1726 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1727 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1728 'Namespace', 'JoinableQueue'
1729 )))
1730
1731testcases_manager = create_test_cases(ManagerMixin, type='manager')
1732globals().update(testcases_manager)
1733
1734
1735class ThreadsMixin(object):
1736 TYPE = 'threads'
1737 Process = multiprocessing.dummy.Process
1738 locals().update(get_attributes(multiprocessing.dummy, (
1739 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1740 'Condition', 'Event', 'Value', 'Array', 'current_process',
1741 'active_children', 'Pipe', 'connection', 'dict', 'list',
1742 'Namespace', 'JoinableQueue'
1743 )))
1744
1745testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1746globals().update(testcases_threads)
1747
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001748class OtherTest(unittest.TestCase):
1749 # TODO: add more tests for deliver/answer challenge.
1750 def test_deliver_challenge_auth_failure(self):
1751 class _FakeConnection(object):
1752 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001753 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001754 def send_bytes(self, data):
1755 pass
1756 self.assertRaises(multiprocessing.AuthenticationError,
1757 multiprocessing.connection.deliver_challenge,
1758 _FakeConnection(), b'abc')
1759
1760 def test_answer_challenge_auth_failure(self):
1761 class _FakeConnection(object):
1762 def __init__(self):
1763 self.count = 0
1764 def recv_bytes(self, size):
1765 self.count += 1
1766 if self.count == 1:
1767 return multiprocessing.connection.CHALLENGE
1768 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001769 return b'something bogus'
1770 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001771 def send_bytes(self, data):
1772 pass
1773 self.assertRaises(multiprocessing.AuthenticationError,
1774 multiprocessing.connection.answer_challenge,
1775 _FakeConnection(), b'abc')
1776
1777testcases_other = [OtherTest]
1778
Benjamin Petersone711caf2008-06-11 16:44:04 +00001779#
1780#
1781#
1782
1783def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001784 if sys.platform.startswith("linux"):
1785 try:
1786 lock = multiprocessing.RLock()
1787 except OSError:
Amaury Forgeot d'Arc620626b2008-06-19 22:03:50 +00001788 from test.support import TestSkipped
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001789 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001790
Benjamin Petersone711caf2008-06-11 16:44:04 +00001791 if run is None:
1792 from test.support import run_unittest as run
1793
1794 util.get_temp_dir() # creates temp directory for use by all processes
1795
1796 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1797
Benjamin Peterson41181742008-07-02 20:22:54 +00001798 ProcessesMixin.pool = multiprocessing.Pool(4)
1799 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1800 ManagerMixin.manager.__init__()
1801 ManagerMixin.manager.start()
1802 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001803
1804 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001805 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1806 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001807 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1808 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001809 )
1810
1811 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1812 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1813 run(suite)
1814
Benjamin Peterson41181742008-07-02 20:22:54 +00001815 ThreadsMixin.pool.terminate()
1816 ProcessesMixin.pool.terminate()
1817 ManagerMixin.pool.terminate()
1818 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001819
Benjamin Peterson41181742008-07-02 20:22:54 +00001820 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001821
1822def main():
1823 test_main(unittest.TextTestRunner(verbosity=2).run)
1824
1825if __name__ == '__main__':
1826 main()