blob: be108b1d30fe906e9a92cd418c99a93178b8b005 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000033
34from multiprocessing import util
35
36#
37#
38#
39
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000040def latin(s):
41 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000042
Benjamin Petersone711caf2008-06-11 16:44:04 +000043#
44# Constants
45#
46
47LOG_LEVEL = util.SUBWARNING
48#LOG_LEVEL = logging.WARNING
49
50DELTA = 0.1
51CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
59
60HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
62
Jesse Noller6214edd2009-01-19 16:23:53 +000063WIN32 = (sys.platform == "win32")
64
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Creates a wrapper for a function which records the time it takes to finish
67#
68
69class TimingWrapper(object):
70
71 def __init__(self, func):
72 self.func = func
73 self.elapsed = None
74
75 def __call__(self, *args, **kwds):
76 t = time.time()
77 try:
78 return self.func(*args, **kwds)
79 finally:
80 self.elapsed = time.time() - t
81
82#
83# Base class for test cases
84#
85
86class BaseTestCase(object):
87
88 ALLOWED_TYPES = ('processes', 'manager', 'threads')
89
90 def assertTimingAlmostEqual(self, a, b):
91 if CHECK_TIMINGS:
92 self.assertAlmostEqual(a, b, 1)
93
94 def assertReturnsIfImplemented(self, value, func, *args):
95 try:
96 res = func(*args)
97 except NotImplementedError:
98 pass
99 else:
100 return self.assertEqual(value, res)
101
102#
103# Return the value of a semaphore
104#
105
106def get_value(self):
107 try:
108 return self.get_value()
109 except AttributeError:
110 try:
111 return self._Semaphore__value
112 except AttributeError:
113 try:
114 return self._value
115 except AttributeError:
116 raise NotImplementedError
117
118#
119# Testcases
120#
121
122class _TestProcess(BaseTestCase):
123
124 ALLOWED_TYPES = ('processes', 'threads')
125
126 def test_current(self):
127 if self.TYPE == 'threads':
128 return
129
130 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000131 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000132
133 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000134 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000135 self.assertTrue(isinstance(authkey, bytes))
136 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000137 self.assertEqual(current.ident, os.getpid())
138 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139
140 def _test(self, q, *args, **kwds):
141 current = self.current_process()
142 q.put(args)
143 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000144 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000145 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000146 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147 q.put(current.pid)
148
149 def test_process(self):
150 q = self.Queue(1)
151 e = self.Event()
152 args = (q, 1, 2)
153 kwargs = {'hello':23, 'bye':2.54}
154 name = 'SomeProcess'
155 p = self.Process(
156 target=self._test, args=args, kwargs=kwargs, name=name
157 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 current = self.current_process()
160
161 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000162 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 self.assertTrue(p not in self.active_children())
166 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000167 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168
169 p.start()
170
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 self.assertEquals(p.is_alive(), True)
173 self.assertTrue(p in self.active_children())
174
175 self.assertEquals(q.get(), args[1:])
176 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000177 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000179 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 self.assertEquals(q.get(), p.pid)
181
182 p.join()
183
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000184 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 self.assertEquals(p.is_alive(), False)
186 self.assertTrue(p not in self.active_children())
187
188 def _test_terminate(self):
189 time.sleep(1000)
190
191 def test_terminate(self):
192 if self.TYPE == 'threads':
193 return
194
195 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 p.start()
198
199 self.assertEqual(p.is_alive(), True)
200 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 p.terminate()
204
205 join = TimingWrapper(p.join)
206 self.assertEqual(join(), None)
207 self.assertTimingAlmostEqual(join.elapsed, 0.0)
208
209 self.assertEqual(p.is_alive(), False)
210 self.assertTrue(p not in self.active_children())
211
212 p.join()
213
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000214 # XXX sometimes get p.exitcode == 0 on Windows ...
215 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
217 def test_cpu_count(self):
218 try:
219 cpus = multiprocessing.cpu_count()
220 except NotImplementedError:
221 cpus = 1
222 self.assertTrue(type(cpus) is int)
223 self.assertTrue(cpus >= 1)
224
225 def test_active_children(self):
226 self.assertEqual(type(self.active_children()), list)
227
228 p = self.Process(target=time.sleep, args=(DELTA,))
229 self.assertTrue(p not in self.active_children())
230
231 p.start()
232 self.assertTrue(p in self.active_children())
233
234 p.join()
235 self.assertTrue(p not in self.active_children())
236
237 def _test_recursion(self, wconn, id):
238 from multiprocessing import forking
239 wconn.send(id)
240 if len(id) < 2:
241 for i in range(2):
242 p = self.Process(
243 target=self._test_recursion, args=(wconn, id+[i])
244 )
245 p.start()
246 p.join()
247
248 def test_recursion(self):
249 rconn, wconn = self.Pipe(duplex=False)
250 self._test_recursion(wconn, [])
251
252 time.sleep(DELTA)
253 result = []
254 while rconn.poll():
255 result.append(rconn.recv())
256
257 expected = [
258 [],
259 [0],
260 [0, 0],
261 [0, 1],
262 [1],
263 [1, 0],
264 [1, 1]
265 ]
266 self.assertEqual(result, expected)
267
268#
269#
270#
271
272class _UpperCaser(multiprocessing.Process):
273
274 def __init__(self):
275 multiprocessing.Process.__init__(self)
276 self.child_conn, self.parent_conn = multiprocessing.Pipe()
277
278 def run(self):
279 self.parent_conn.close()
280 for s in iter(self.child_conn.recv, None):
281 self.child_conn.send(s.upper())
282 self.child_conn.close()
283
284 def submit(self, s):
285 assert type(s) is str
286 self.parent_conn.send(s)
287 return self.parent_conn.recv()
288
289 def stop(self):
290 self.parent_conn.send(None)
291 self.parent_conn.close()
292 self.child_conn.close()
293
294class _TestSubclassingProcess(BaseTestCase):
295
296 ALLOWED_TYPES = ('processes',)
297
298 def test_subclassing(self):
299 uppercaser = _UpperCaser()
300 uppercaser.start()
301 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
302 self.assertEqual(uppercaser.submit('world'), 'WORLD')
303 uppercaser.stop()
304 uppercaser.join()
305
306#
307#
308#
309
310def queue_empty(q):
311 if hasattr(q, 'empty'):
312 return q.empty()
313 else:
314 return q.qsize() == 0
315
316def queue_full(q, maxsize):
317 if hasattr(q, 'full'):
318 return q.full()
319 else:
320 return q.qsize() == maxsize
321
322
323class _TestQueue(BaseTestCase):
324
325
326 def _test_put(self, queue, child_can_start, parent_can_continue):
327 child_can_start.wait()
328 for i in range(6):
329 queue.get()
330 parent_can_continue.set()
331
332 def test_put(self):
333 MAXSIZE = 6
334 queue = self.Queue(maxsize=MAXSIZE)
335 child_can_start = self.Event()
336 parent_can_continue = self.Event()
337
338 proc = self.Process(
339 target=self._test_put,
340 args=(queue, child_can_start, parent_can_continue)
341 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000342 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343 proc.start()
344
345 self.assertEqual(queue_empty(queue), True)
346 self.assertEqual(queue_full(queue, MAXSIZE), False)
347
348 queue.put(1)
349 queue.put(2, True)
350 queue.put(3, True, None)
351 queue.put(4, False)
352 queue.put(5, False, None)
353 queue.put_nowait(6)
354
355 # the values may be in buffer but not yet in pipe so sleep a bit
356 time.sleep(DELTA)
357
358 self.assertEqual(queue_empty(queue), False)
359 self.assertEqual(queue_full(queue, MAXSIZE), True)
360
361 put = TimingWrapper(queue.put)
362 put_nowait = TimingWrapper(queue.put_nowait)
363
364 self.assertRaises(pyqueue.Full, put, 7, False)
365 self.assertTimingAlmostEqual(put.elapsed, 0)
366
367 self.assertRaises(pyqueue.Full, put, 7, False, None)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(pyqueue.Full, put_nowait, 7)
371 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
372
373 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
374 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
375
376 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
377 self.assertTimingAlmostEqual(put.elapsed, 0)
378
379 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
380 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
381
382 child_can_start.set()
383 parent_can_continue.wait()
384
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
387
388 proc.join()
389
390 def _test_get(self, queue, child_can_start, parent_can_continue):
391 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000392 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393 queue.put(2)
394 queue.put(3)
395 queue.put(4)
396 queue.put(5)
397 parent_can_continue.set()
398
399 def test_get(self):
400 queue = self.Queue()
401 child_can_start = self.Event()
402 parent_can_continue = self.Event()
403
404 proc = self.Process(
405 target=self._test_get,
406 args=(queue, child_can_start, parent_can_continue)
407 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000408 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409 proc.start()
410
411 self.assertEqual(queue_empty(queue), True)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 time.sleep(DELTA)
417 self.assertEqual(queue_empty(queue), False)
418
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000419 # Hangs unexpectedly, remove for now
420 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421 self.assertEqual(queue.get(True, None), 2)
422 self.assertEqual(queue.get(True), 3)
423 self.assertEqual(queue.get(timeout=1), 4)
424 self.assertEqual(queue.get_nowait(), 5)
425
426 self.assertEqual(queue_empty(queue), True)
427
428 get = TimingWrapper(queue.get)
429 get_nowait = TimingWrapper(queue.get_nowait)
430
431 self.assertRaises(pyqueue.Empty, get, False)
432 self.assertTimingAlmostEqual(get.elapsed, 0)
433
434 self.assertRaises(pyqueue.Empty, get, False, None)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(pyqueue.Empty, get_nowait)
438 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
439
440 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
442
443 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
444 self.assertTimingAlmostEqual(get.elapsed, 0)
445
446 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
447 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
448
449 proc.join()
450
451 def _test_fork(self, queue):
452 for i in range(10, 20):
453 queue.put(i)
454 # note that at this point the items may only be buffered, so the
455 # process cannot shutdown until the feeder thread has finished
456 # pushing items onto the pipe.
457
458 def test_fork(self):
459 # Old versions of Queue would fail to create a new feeder
460 # thread for a forked process if the original process had its
461 # own feeder thread. This test checks that this no longer
462 # happens.
463
464 queue = self.Queue()
465
466 # put items on queue so that main process starts a feeder thread
467 for i in range(10):
468 queue.put(i)
469
470 # wait to make sure thread starts before we fork a new process
471 time.sleep(DELTA)
472
473 # fork process
474 p = self.Process(target=self._test_fork, args=(queue,))
475 p.start()
476
477 # check that all expected items are in the queue
478 for i in range(20):
479 self.assertEqual(queue.get(), i)
480 self.assertRaises(pyqueue.Empty, queue.get, False)
481
482 p.join()
483
484 def test_qsize(self):
485 q = self.Queue()
486 try:
487 self.assertEqual(q.qsize(), 0)
488 except NotImplementedError:
489 return
490 q.put(1)
491 self.assertEqual(q.qsize(), 1)
492 q.put(5)
493 self.assertEqual(q.qsize(), 2)
494 q.get()
495 self.assertEqual(q.qsize(), 1)
496 q.get()
497 self.assertEqual(q.qsize(), 0)
498
499 def _test_task_done(self, q):
500 for obj in iter(q.get, None):
501 time.sleep(DELTA)
502 q.task_done()
503
504 def test_task_done(self):
505 queue = self.JoinableQueue()
506
507 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
508 return
509
510 workers = [self.Process(target=self._test_task_done, args=(queue,))
511 for i in range(4)]
512
513 for p in workers:
514 p.start()
515
516 for i in range(10):
517 queue.put(i)
518
519 queue.join()
520
521 for p in workers:
522 queue.put(None)
523
524 for p in workers:
525 p.join()
526
527#
528#
529#
530
531class _TestLock(BaseTestCase):
532
533 def test_lock(self):
534 lock = self.Lock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(False), False)
537 self.assertEqual(lock.release(), None)
538 self.assertRaises((ValueError, threading.ThreadError), lock.release)
539
540 def test_rlock(self):
541 lock = self.RLock()
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((AssertionError, RuntimeError), lock.release)
549
Jesse Nollerf8d00852009-03-31 03:25:07 +0000550 def test_lock_context(self):
551 with self.Lock():
552 pass
553
Benjamin Petersone711caf2008-06-11 16:44:04 +0000554
555class _TestSemaphore(BaseTestCase):
556
557 def _test_semaphore(self, sem):
558 self.assertReturnsIfImplemented(2, get_value, sem)
559 self.assertEqual(sem.acquire(), True)
560 self.assertReturnsIfImplemented(1, get_value, sem)
561 self.assertEqual(sem.acquire(), True)
562 self.assertReturnsIfImplemented(0, get_value, sem)
563 self.assertEqual(sem.acquire(False), False)
564 self.assertReturnsIfImplemented(0, get_value, sem)
565 self.assertEqual(sem.release(), None)
566 self.assertReturnsIfImplemented(1, get_value, sem)
567 self.assertEqual(sem.release(), None)
568 self.assertReturnsIfImplemented(2, get_value, sem)
569
570 def test_semaphore(self):
571 sem = self.Semaphore(2)
572 self._test_semaphore(sem)
573 self.assertEqual(sem.release(), None)
574 self.assertReturnsIfImplemented(3, get_value, sem)
575 self.assertEqual(sem.release(), None)
576 self.assertReturnsIfImplemented(4, get_value, sem)
577
578 def test_bounded_semaphore(self):
579 sem = self.BoundedSemaphore(2)
580 self._test_semaphore(sem)
581 # Currently fails on OS/X
582 #if HAVE_GETVALUE:
583 # self.assertRaises(ValueError, sem.release)
584 # self.assertReturnsIfImplemented(2, get_value, sem)
585
586 def test_timeout(self):
587 if self.TYPE != 'processes':
588 return
589
590 sem = self.Semaphore(0)
591 acquire = TimingWrapper(sem.acquire)
592
593 self.assertEqual(acquire(False), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
595
596 self.assertEqual(acquire(False, None), False)
597 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
598
599 self.assertEqual(acquire(False, TIMEOUT1), False)
600 self.assertTimingAlmostEqual(acquire.elapsed, 0)
601
602 self.assertEqual(acquire(True, TIMEOUT2), False)
603 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
604
605 self.assertEqual(acquire(timeout=TIMEOUT3), False)
606 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
607
608
609class _TestCondition(BaseTestCase):
610
611 def f(self, cond, sleeping, woken, timeout=None):
612 cond.acquire()
613 sleeping.release()
614 cond.wait(timeout)
615 woken.release()
616 cond.release()
617
618 def check_invariant(self, cond):
619 # this is only supposed to succeed when there are no sleepers
620 if self.TYPE == 'processes':
621 try:
622 sleepers = (cond._sleeping_count.get_value() -
623 cond._woken_count.get_value())
624 self.assertEqual(sleepers, 0)
625 self.assertEqual(cond._wait_semaphore.get_value(), 0)
626 except NotImplementedError:
627 pass
628
629 def test_notify(self):
630 cond = self.Condition()
631 sleeping = self.Semaphore(0)
632 woken = self.Semaphore(0)
633
634 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000635 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000636 p.start()
637
638 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000639 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000640 p.start()
641
642 # wait for both children to start sleeping
643 sleeping.acquire()
644 sleeping.acquire()
645
646 # check no process/thread has woken up
647 time.sleep(DELTA)
648 self.assertReturnsIfImplemented(0, get_value, woken)
649
650 # wake up one process/thread
651 cond.acquire()
652 cond.notify()
653 cond.release()
654
655 # check one process/thread has woken up
656 time.sleep(DELTA)
657 self.assertReturnsIfImplemented(1, get_value, woken)
658
659 # wake up another
660 cond.acquire()
661 cond.notify()
662 cond.release()
663
664 # check other has woken up
665 time.sleep(DELTA)
666 self.assertReturnsIfImplemented(2, get_value, woken)
667
668 # check state is not mucked up
669 self.check_invariant(cond)
670 p.join()
671
672 def test_notify_all(self):
673 cond = self.Condition()
674 sleeping = self.Semaphore(0)
675 woken = self.Semaphore(0)
676
677 # start some threads/processes which will timeout
678 for i in range(3):
679 p = self.Process(target=self.f,
680 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000681 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000682 p.start()
683
684 t = threading.Thread(target=self.f,
685 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000686 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000687 t.start()
688
689 # wait for them all to sleep
690 for i in range(6):
691 sleeping.acquire()
692
693 # check they have all timed out
694 for i in range(6):
695 woken.acquire()
696 self.assertReturnsIfImplemented(0, get_value, woken)
697
698 # check state is not mucked up
699 self.check_invariant(cond)
700
701 # start some more threads/processes
702 for i in range(3):
703 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000704 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000705 p.start()
706
707 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000708 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000709 t.start()
710
711 # wait for them to all sleep
712 for i in range(6):
713 sleeping.acquire()
714
715 # check no process/thread has woken up
716 time.sleep(DELTA)
717 self.assertReturnsIfImplemented(0, get_value, woken)
718
719 # wake them all up
720 cond.acquire()
721 cond.notify_all()
722 cond.release()
723
724 # check they have all woken
725 time.sleep(DELTA)
726 self.assertReturnsIfImplemented(6, get_value, woken)
727
728 # check state is not mucked up
729 self.check_invariant(cond)
730
731 def test_timeout(self):
732 cond = self.Condition()
733 wait = TimingWrapper(cond.wait)
734 cond.acquire()
735 res = wait(TIMEOUT1)
736 cond.release()
737 self.assertEqual(res, None)
738 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
739
740
741class _TestEvent(BaseTestCase):
742
743 def _test_event(self, event):
744 time.sleep(TIMEOUT2)
745 event.set()
746
747 def test_event(self):
748 event = self.Event()
749 wait = TimingWrapper(event.wait)
750
751 # Removed temporaily, due to API shear, this does not
752 # work with threading._Event objects. is_set == isSet
753 #self.assertEqual(event.is_set(), False)
754
755 self.assertEqual(wait(0.0), None)
756 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
757 self.assertEqual(wait(TIMEOUT1), None)
758 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
759
760 event.set()
761
762 # See note above on the API differences
763 # self.assertEqual(event.is_set(), True)
764 self.assertEqual(wait(), None)
765 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
766 self.assertEqual(wait(TIMEOUT1), None)
767 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
768 # self.assertEqual(event.is_set(), True)
769
770 event.clear()
771
772 #self.assertEqual(event.is_set(), False)
773
774 self.Process(target=self._test_event, args=(event,)).start()
775 self.assertEqual(wait(), None)
776
777#
778#
779#
780
781class _TestValue(BaseTestCase):
782
783 codes_values = [
784 ('i', 4343, 24234),
785 ('d', 3.625, -4.25),
786 ('h', -232, 234),
787 ('c', latin('x'), latin('y'))
788 ]
789
790 def _test(self, values):
791 for sv, cv in zip(values, self.codes_values):
792 sv.value = cv[2]
793
794
795 def test_value(self, raw=False):
796 if self.TYPE != 'processes':
797 return
798
799 if raw:
800 values = [self.RawValue(code, value)
801 for code, value, _ in self.codes_values]
802 else:
803 values = [self.Value(code, value)
804 for code, value, _ in self.codes_values]
805
806 for sv, cv in zip(values, self.codes_values):
807 self.assertEqual(sv.value, cv[1])
808
809 proc = self.Process(target=self._test, args=(values,))
810 proc.start()
811 proc.join()
812
813 for sv, cv in zip(values, self.codes_values):
814 self.assertEqual(sv.value, cv[2])
815
816 def test_rawvalue(self):
817 self.test_value(raw=True)
818
819 def test_getobj_getlock(self):
820 if self.TYPE != 'processes':
821 return
822
823 val1 = self.Value('i', 5)
824 lock1 = val1.get_lock()
825 obj1 = val1.get_obj()
826
827 val2 = self.Value('i', 5, lock=None)
828 lock2 = val2.get_lock()
829 obj2 = val2.get_obj()
830
831 lock = self.Lock()
832 val3 = self.Value('i', 5, lock=lock)
833 lock3 = val3.get_lock()
834 obj3 = val3.get_obj()
835 self.assertEqual(lock, lock3)
836
Jesse Nollerb0516a62009-01-18 03:11:38 +0000837 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 self.assertFalse(hasattr(arr4, 'get_lock'))
839 self.assertFalse(hasattr(arr4, 'get_obj'))
840
Jesse Nollerb0516a62009-01-18 03:11:38 +0000841 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
842
843 arr5 = self.RawValue('i', 5)
844 self.assertFalse(hasattr(arr5, 'get_lock'))
845 self.assertFalse(hasattr(arr5, 'get_obj'))
846
Benjamin Petersone711caf2008-06-11 16:44:04 +0000847
848class _TestArray(BaseTestCase):
849
850 def f(self, seq):
851 for i in range(1, len(seq)):
852 seq[i] += seq[i-1]
853
854 def test_array(self, raw=False):
855 if self.TYPE != 'processes':
856 return
857
858 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
859 if raw:
860 arr = self.RawArray('i', seq)
861 else:
862 arr = self.Array('i', seq)
863
864 self.assertEqual(len(arr), len(seq))
865 self.assertEqual(arr[3], seq[3])
866 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
867
868 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
869
870 self.assertEqual(list(arr[:]), seq)
871
872 self.f(seq)
873
874 p = self.Process(target=self.f, args=(arr,))
875 p.start()
876 p.join()
877
878 self.assertEqual(list(arr[:]), seq)
879
880 def test_rawarray(self):
881 self.test_array(raw=True)
882
883 def test_getobj_getlock_obj(self):
884 if self.TYPE != 'processes':
885 return
886
887 arr1 = self.Array('i', list(range(10)))
888 lock1 = arr1.get_lock()
889 obj1 = arr1.get_obj()
890
891 arr2 = self.Array('i', list(range(10)), lock=None)
892 lock2 = arr2.get_lock()
893 obj2 = arr2.get_obj()
894
895 lock = self.Lock()
896 arr3 = self.Array('i', list(range(10)), lock=lock)
897 lock3 = arr3.get_lock()
898 obj3 = arr3.get_obj()
899 self.assertEqual(lock, lock3)
900
Jesse Nollerb0516a62009-01-18 03:11:38 +0000901 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000902 self.assertFalse(hasattr(arr4, 'get_lock'))
903 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000904 self.assertRaises(AttributeError,
905 self.Array, 'i', range(10), lock='notalock')
906
907 arr5 = self.RawArray('i', range(10))
908 self.assertFalse(hasattr(arr5, 'get_lock'))
909 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000910
911#
912#
913#
914
915class _TestContainers(BaseTestCase):
916
917 ALLOWED_TYPES = ('manager',)
918
919 def test_list(self):
920 a = self.list(list(range(10)))
921 self.assertEqual(a[:], list(range(10)))
922
923 b = self.list()
924 self.assertEqual(b[:], [])
925
926 b.extend(list(range(5)))
927 self.assertEqual(b[:], list(range(5)))
928
929 self.assertEqual(b[2], 2)
930 self.assertEqual(b[2:10], [2,3,4])
931
932 b *= 2
933 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
934
935 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
936
937 self.assertEqual(a[:], list(range(10)))
938
939 d = [a, b]
940 e = self.list(d)
941 self.assertEqual(
942 e[:],
943 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
944 )
945
946 f = self.list([a])
947 a.append('hello')
948 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
949
950 def test_dict(self):
951 d = self.dict()
952 indices = list(range(65, 70))
953 for i in indices:
954 d[i] = chr(i)
955 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
956 self.assertEqual(sorted(d.keys()), indices)
957 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
958 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
959
960 def test_namespace(self):
961 n = self.Namespace()
962 n.name = 'Bob'
963 n.job = 'Builder'
964 n._hidden = 'hidden'
965 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
966 del n.job
967 self.assertEqual(str(n), "Namespace(name='Bob')")
968 self.assertTrue(hasattr(n, 'name'))
969 self.assertTrue(not hasattr(n, 'job'))
970
971#
972#
973#
974
975def sqr(x, wait=0.0):
976 time.sleep(wait)
977 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000978class _TestPool(BaseTestCase):
979
980 def test_apply(self):
981 papply = self.pool.apply
982 self.assertEqual(papply(sqr, (5,)), sqr(5))
983 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
984
985 def test_map(self):
986 pmap = self.pool.map
987 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
988 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
989 list(map(sqr, list(range(100)))))
990
991 def test_async(self):
992 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
993 get = TimingWrapper(res.get)
994 self.assertEqual(get(), 49)
995 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
996
997 def test_async_timeout(self):
998 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
999 get = TimingWrapper(res.get)
1000 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1001 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1002
1003 def test_imap(self):
1004 it = self.pool.imap(sqr, list(range(10)))
1005 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1006
1007 it = self.pool.imap(sqr, list(range(10)))
1008 for i in range(10):
1009 self.assertEqual(next(it), i*i)
1010 self.assertRaises(StopIteration, it.__next__)
1011
1012 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1013 for i in range(1000):
1014 self.assertEqual(next(it), i*i)
1015 self.assertRaises(StopIteration, it.__next__)
1016
1017 def test_imap_unordered(self):
1018 it = self.pool.imap_unordered(sqr, list(range(1000)))
1019 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1020
1021 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1022 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1023
1024 def test_make_pool(self):
1025 p = multiprocessing.Pool(3)
1026 self.assertEqual(3, len(p._pool))
1027 p.close()
1028 p.join()
1029
1030 def test_terminate(self):
1031 if self.TYPE == 'manager':
1032 # On Unix a forked process increfs each shared object to
1033 # which its parent process held a reference. If the
1034 # forked process gets terminated then there is likely to
1035 # be a reference leak. So to prevent
1036 # _TestZZZNumberOfObjects from failing we skip this test
1037 # when using a manager.
1038 return
1039
1040 result = self.pool.map_async(
1041 time.sleep, [0.1 for i in range(10000)], chunksize=1
1042 )
1043 self.pool.terminate()
1044 join = TimingWrapper(self.pool.join)
1045 join()
1046 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001047#
1048# Test that manager has expected number of shared objects left
1049#
1050
1051class _TestZZZNumberOfObjects(BaseTestCase):
1052 # Because test cases are sorted alphabetically, this one will get
1053 # run after all the other tests for the manager. It tests that
1054 # there have been no "reference leaks" for the manager's shared
1055 # objects. Note the comment in _TestPool.test_terminate().
1056 ALLOWED_TYPES = ('manager',)
1057
1058 def test_number_of_objects(self):
1059 EXPECTED_NUMBER = 1 # the pool object is still alive
1060 multiprocessing.active_children() # discard dead process objs
1061 gc.collect() # do garbage collection
1062 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001063 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001064 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001065 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001066 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001067
1068 self.assertEqual(refs, EXPECTED_NUMBER)
1069
1070#
1071# Test of creating a customized manager class
1072#
1073
1074from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1075
1076class FooBar(object):
1077 def f(self):
1078 return 'f()'
1079 def g(self):
1080 raise ValueError
1081 def _h(self):
1082 return '_h()'
1083
1084def baz():
1085 for i in range(10):
1086 yield i*i
1087
1088class IteratorProxy(BaseProxy):
1089 _exposed_ = ('next', '__next__')
1090 def __iter__(self):
1091 return self
1092 def __next__(self):
1093 return self._callmethod('next')
1094 def __next__(self):
1095 return self._callmethod('__next__')
1096
1097class MyManager(BaseManager):
1098 pass
1099
1100MyManager.register('Foo', callable=FooBar)
1101MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1102MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1103
1104
1105class _TestMyManager(BaseTestCase):
1106
1107 ALLOWED_TYPES = ('manager',)
1108
1109 def test_mymanager(self):
1110 manager = MyManager()
1111 manager.start()
1112
1113 foo = manager.Foo()
1114 bar = manager.Bar()
1115 baz = manager.baz()
1116
1117 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1118 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1119
1120 self.assertEqual(foo_methods, ['f', 'g'])
1121 self.assertEqual(bar_methods, ['f', '_h'])
1122
1123 self.assertEqual(foo.f(), 'f()')
1124 self.assertRaises(ValueError, foo.g)
1125 self.assertEqual(foo._callmethod('f'), 'f()')
1126 self.assertRaises(RemoteError, foo._callmethod, '_h')
1127
1128 self.assertEqual(bar.f(), 'f()')
1129 self.assertEqual(bar._h(), '_h()')
1130 self.assertEqual(bar._callmethod('f'), 'f()')
1131 self.assertEqual(bar._callmethod('_h'), '_h()')
1132
1133 self.assertEqual(list(baz), [i*i for i in range(10)])
1134
1135 manager.shutdown()
1136
1137#
1138# Test of connecting to a remote server and using xmlrpclib for serialization
1139#
1140
1141_queue = pyqueue.Queue()
1142def get_queue():
1143 return _queue
1144
1145class QueueManager(BaseManager):
1146 '''manager class used by server process'''
1147QueueManager.register('get_queue', callable=get_queue)
1148
1149class QueueManager2(BaseManager):
1150 '''manager class which specifies the same interface as QueueManager'''
1151QueueManager2.register('get_queue')
1152
1153
1154SERIALIZER = 'xmlrpclib'
1155
1156class _TestRemoteManager(BaseTestCase):
1157
1158 ALLOWED_TYPES = ('manager',)
1159
1160 def _putter(self, address, authkey):
1161 manager = QueueManager2(
1162 address=address, authkey=authkey, serializer=SERIALIZER
1163 )
1164 manager.connect()
1165 queue = manager.get_queue()
1166 queue.put(('hello world', None, True, 2.25))
1167
1168 def test_remote(self):
1169 authkey = os.urandom(32)
1170
1171 manager = QueueManager(
1172 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1173 )
1174 manager.start()
1175
1176 p = self.Process(target=self._putter, args=(manager.address, authkey))
1177 p.start()
1178
1179 manager2 = QueueManager2(
1180 address=manager.address, authkey=authkey, serializer=SERIALIZER
1181 )
1182 manager2.connect()
1183 queue = manager2.get_queue()
1184
1185 # Note that xmlrpclib will deserialize object as a list not a tuple
1186 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1187
1188 # Because we are using xmlrpclib for serialization instead of
1189 # pickle this will cause a serialization error.
1190 self.assertRaises(Exception, queue.put, time.sleep)
1191
1192 # Make queue finalizer run before the server is stopped
1193 del queue
1194 manager.shutdown()
1195
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001196class _TestManagerRestart(BaseTestCase):
1197
1198 def _putter(self, address, authkey):
1199 manager = QueueManager(
1200 address=address, authkey=authkey, serializer=SERIALIZER)
1201 manager.connect()
1202 queue = manager.get_queue()
1203 queue.put('hello world')
1204
1205 def test_rapid_restart(self):
1206 authkey = os.urandom(32)
1207 manager = QueueManager(
1208 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1209 manager.start()
1210
1211 p = self.Process(target=self._putter, args=(manager.address, authkey))
1212 p.start()
1213 queue = manager.get_queue()
1214 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001215 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001216 manager.shutdown()
1217 manager = QueueManager(
1218 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1219 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001220 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001221
Benjamin Petersone711caf2008-06-11 16:44:04 +00001222#
1223#
1224#
1225
1226SENTINEL = latin('')
1227
1228class _TestConnection(BaseTestCase):
1229
1230 ALLOWED_TYPES = ('processes', 'threads')
1231
1232 def _echo(self, conn):
1233 for msg in iter(conn.recv_bytes, SENTINEL):
1234 conn.send_bytes(msg)
1235 conn.close()
1236
1237 def test_connection(self):
1238 conn, child_conn = self.Pipe()
1239
1240 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001242 p.start()
1243
1244 seq = [1, 2.25, None]
1245 msg = latin('hello world')
1246 longmsg = msg * 10
1247 arr = array.array('i', list(range(4)))
1248
1249 if self.TYPE == 'processes':
1250 self.assertEqual(type(conn.fileno()), int)
1251
1252 self.assertEqual(conn.send(seq), None)
1253 self.assertEqual(conn.recv(), seq)
1254
1255 self.assertEqual(conn.send_bytes(msg), None)
1256 self.assertEqual(conn.recv_bytes(), msg)
1257
1258 if self.TYPE == 'processes':
1259 buffer = array.array('i', [0]*10)
1260 expected = list(arr) + [0] * (10 - len(arr))
1261 self.assertEqual(conn.send_bytes(arr), None)
1262 self.assertEqual(conn.recv_bytes_into(buffer),
1263 len(arr) * buffer.itemsize)
1264 self.assertEqual(list(buffer), expected)
1265
1266 buffer = array.array('i', [0]*10)
1267 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1268 self.assertEqual(conn.send_bytes(arr), None)
1269 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1270 len(arr) * buffer.itemsize)
1271 self.assertEqual(list(buffer), expected)
1272
1273 buffer = bytearray(latin(' ' * 40))
1274 self.assertEqual(conn.send_bytes(longmsg), None)
1275 try:
1276 res = conn.recv_bytes_into(buffer)
1277 except multiprocessing.BufferTooShort as e:
1278 self.assertEqual(e.args, (longmsg,))
1279 else:
1280 self.fail('expected BufferTooShort, got %s' % res)
1281
1282 poll = TimingWrapper(conn.poll)
1283
1284 self.assertEqual(poll(), False)
1285 self.assertTimingAlmostEqual(poll.elapsed, 0)
1286
1287 self.assertEqual(poll(TIMEOUT1), False)
1288 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1289
1290 conn.send(None)
1291
1292 self.assertEqual(poll(TIMEOUT1), True)
1293 self.assertTimingAlmostEqual(poll.elapsed, 0)
1294
1295 self.assertEqual(conn.recv(), None)
1296
1297 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1298 conn.send_bytes(really_big_msg)
1299 self.assertEqual(conn.recv_bytes(), really_big_msg)
1300
1301 conn.send_bytes(SENTINEL) # tell child to quit
1302 child_conn.close()
1303
1304 if self.TYPE == 'processes':
1305 self.assertEqual(conn.readable, True)
1306 self.assertEqual(conn.writable, True)
1307 self.assertRaises(EOFError, conn.recv)
1308 self.assertRaises(EOFError, conn.recv_bytes)
1309
1310 p.join()
1311
1312 def test_duplex_false(self):
1313 reader, writer = self.Pipe(duplex=False)
1314 self.assertEqual(writer.send(1), None)
1315 self.assertEqual(reader.recv(), 1)
1316 if self.TYPE == 'processes':
1317 self.assertEqual(reader.readable, True)
1318 self.assertEqual(reader.writable, False)
1319 self.assertEqual(writer.readable, False)
1320 self.assertEqual(writer.writable, True)
1321 self.assertRaises(IOError, reader.send, 2)
1322 self.assertRaises(IOError, writer.recv)
1323 self.assertRaises(IOError, writer.poll)
1324
1325 def test_spawn_close(self):
1326 # We test that a pipe connection can be closed by parent
1327 # process immediately after child is spawned. On Windows this
1328 # would have sometimes failed on old versions because
1329 # child_conn would be closed before the child got a chance to
1330 # duplicate it.
1331 conn, child_conn = self.Pipe()
1332
1333 p = self.Process(target=self._echo, args=(child_conn,))
1334 p.start()
1335 child_conn.close() # this might complete before child initializes
1336
1337 msg = latin('hello')
1338 conn.send_bytes(msg)
1339 self.assertEqual(conn.recv_bytes(), msg)
1340
1341 conn.send_bytes(SENTINEL)
1342 conn.close()
1343 p.join()
1344
1345 def test_sendbytes(self):
1346 if self.TYPE != 'processes':
1347 return
1348
1349 msg = latin('abcdefghijklmnopqrstuvwxyz')
1350 a, b = self.Pipe()
1351
1352 a.send_bytes(msg)
1353 self.assertEqual(b.recv_bytes(), msg)
1354
1355 a.send_bytes(msg, 5)
1356 self.assertEqual(b.recv_bytes(), msg[5:])
1357
1358 a.send_bytes(msg, 7, 8)
1359 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1360
1361 a.send_bytes(msg, 26)
1362 self.assertEqual(b.recv_bytes(), latin(''))
1363
1364 a.send_bytes(msg, 26, 0)
1365 self.assertEqual(b.recv_bytes(), latin(''))
1366
1367 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1368
1369 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1370
1371 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1372
1373 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1374
1375 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1376
Benjamin Petersone711caf2008-06-11 16:44:04 +00001377class _TestListenerClient(BaseTestCase):
1378
1379 ALLOWED_TYPES = ('processes', 'threads')
1380
1381 def _test(self, address):
1382 conn = self.connection.Client(address)
1383 conn.send('hello')
1384 conn.close()
1385
1386 def test_listener_client(self):
1387 for family in self.connection.families:
1388 l = self.connection.Listener(family=family)
1389 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001390 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001391 p.start()
1392 conn = l.accept()
1393 self.assertEqual(conn.recv(), 'hello')
1394 p.join()
1395 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001396#
1397# Test of sending connection and socket objects between processes
1398#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001399"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001400class _TestPicklingConnections(BaseTestCase):
1401
1402 ALLOWED_TYPES = ('processes',)
1403
1404 def _listener(self, conn, families):
1405 for fam in families:
1406 l = self.connection.Listener(family=fam)
1407 conn.send(l.address)
1408 new_conn = l.accept()
1409 conn.send(new_conn)
1410
1411 if self.TYPE == 'processes':
1412 l = socket.socket()
1413 l.bind(('localhost', 0))
1414 conn.send(l.getsockname())
1415 l.listen(1)
1416 new_conn, addr = l.accept()
1417 conn.send(new_conn)
1418
1419 conn.recv()
1420
1421 def _remote(self, conn):
1422 for (address, msg) in iter(conn.recv, None):
1423 client = self.connection.Client(address)
1424 client.send(msg.upper())
1425 client.close()
1426
1427 if self.TYPE == 'processes':
1428 address, msg = conn.recv()
1429 client = socket.socket()
1430 client.connect(address)
1431 client.sendall(msg.upper())
1432 client.close()
1433
1434 conn.close()
1435
1436 def test_pickling(self):
1437 try:
1438 multiprocessing.allow_connection_pickling()
1439 except ImportError:
1440 return
1441
1442 families = self.connection.families
1443
1444 lconn, lconn0 = self.Pipe()
1445 lp = self.Process(target=self._listener, args=(lconn0, families))
1446 lp.start()
1447 lconn0.close()
1448
1449 rconn, rconn0 = self.Pipe()
1450 rp = self.Process(target=self._remote, args=(rconn0,))
1451 rp.start()
1452 rconn0.close()
1453
1454 for fam in families:
1455 msg = ('This connection uses family %s' % fam).encode('ascii')
1456 address = lconn.recv()
1457 rconn.send((address, msg))
1458 new_conn = lconn.recv()
1459 self.assertEqual(new_conn.recv(), msg.upper())
1460
1461 rconn.send(None)
1462
1463 if self.TYPE == 'processes':
1464 msg = latin('This connection uses a normal socket')
1465 address = lconn.recv()
1466 rconn.send((address, msg))
1467 if hasattr(socket, 'fromfd'):
1468 new_conn = lconn.recv()
1469 self.assertEqual(new_conn.recv(100), msg.upper())
1470 else:
1471 # XXX On Windows with Py2.6 need to backport fromfd()
1472 discard = lconn.recv_bytes()
1473
1474 lconn.send(None)
1475
1476 rconn.close()
1477 lconn.close()
1478
1479 lp.join()
1480 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001481"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001482#
1483#
1484#
1485
1486class _TestHeap(BaseTestCase):
1487
1488 ALLOWED_TYPES = ('processes',)
1489
1490 def test_heap(self):
1491 iterations = 5000
1492 maxblocks = 50
1493 blocks = []
1494
1495 # create and destroy lots of blocks of different sizes
1496 for i in range(iterations):
1497 size = int(random.lognormvariate(0, 1) * 1000)
1498 b = multiprocessing.heap.BufferWrapper(size)
1499 blocks.append(b)
1500 if len(blocks) > maxblocks:
1501 i = random.randrange(maxblocks)
1502 del blocks[i]
1503
1504 # get the heap object
1505 heap = multiprocessing.heap.BufferWrapper._heap
1506
1507 # verify the state of the heap
1508 all = []
1509 occupied = 0
1510 for L in list(heap._len_to_seq.values()):
1511 for arena, start, stop in L:
1512 all.append((heap._arenas.index(arena), start, stop,
1513 stop-start, 'free'))
1514 for arena, start, stop in heap._allocated_blocks:
1515 all.append((heap._arenas.index(arena), start, stop,
1516 stop-start, 'occupied'))
1517 occupied += (stop-start)
1518
1519 all.sort()
1520
1521 for i in range(len(all)-1):
1522 (arena, start, stop) = all[i][:3]
1523 (narena, nstart, nstop) = all[i+1][:3]
1524 self.assertTrue((arena != narena and nstart == 0) or
1525 (stop == nstart))
1526
1527#
1528#
1529#
1530
1531try:
1532 from ctypes import Structure, Value, copy, c_int, c_double
1533except ImportError:
1534 Structure = object
1535 c_int = c_double = None
1536
1537class _Foo(Structure):
1538 _fields_ = [
1539 ('x', c_int),
1540 ('y', c_double)
1541 ]
1542
1543class _TestSharedCTypes(BaseTestCase):
1544
1545 ALLOWED_TYPES = ('processes',)
1546
1547 def _double(self, x, y, foo, arr, string):
1548 x.value *= 2
1549 y.value *= 2
1550 foo.x *= 2
1551 foo.y *= 2
1552 string.value *= 2
1553 for i in range(len(arr)):
1554 arr[i] *= 2
1555
1556 def test_sharedctypes(self, lock=False):
1557 if c_int is None:
1558 return
1559
1560 x = Value('i', 7, lock=lock)
1561 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1562 foo = Value(_Foo, 3, 2, lock=lock)
1563 arr = Array('d', list(range(10)), lock=lock)
1564 string = Array('c', 20, lock=lock)
1565 string.value = 'hello'
1566
1567 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1568 p.start()
1569 p.join()
1570
1571 self.assertEqual(x.value, 14)
1572 self.assertAlmostEqual(y.value, 2.0/3.0)
1573 self.assertEqual(foo.x, 6)
1574 self.assertAlmostEqual(foo.y, 4.0)
1575 for i in range(10):
1576 self.assertAlmostEqual(arr[i], i*2)
1577 self.assertEqual(string.value, latin('hellohello'))
1578
1579 def test_synchronize(self):
1580 self.test_sharedctypes(lock=True)
1581
1582 def test_copy(self):
1583 if c_int is None:
1584 return
1585
1586 foo = _Foo(2, 5.0)
1587 bar = copy(foo)
1588 foo.x = 0
1589 foo.y = 0
1590 self.assertEqual(bar.x, 2)
1591 self.assertAlmostEqual(bar.y, 5.0)
1592
1593#
1594#
1595#
1596
1597class _TestFinalize(BaseTestCase):
1598
1599 ALLOWED_TYPES = ('processes',)
1600
1601 def _test_finalize(self, conn):
1602 class Foo(object):
1603 pass
1604
1605 a = Foo()
1606 util.Finalize(a, conn.send, args=('a',))
1607 del a # triggers callback for a
1608
1609 b = Foo()
1610 close_b = util.Finalize(b, conn.send, args=('b',))
1611 close_b() # triggers callback for b
1612 close_b() # does nothing because callback has already been called
1613 del b # does nothing because callback has already been called
1614
1615 c = Foo()
1616 util.Finalize(c, conn.send, args=('c',))
1617
1618 d10 = Foo()
1619 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1620
1621 d01 = Foo()
1622 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1623 d02 = Foo()
1624 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1625 d03 = Foo()
1626 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1627
1628 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1629
1630 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1631
1632 # call mutliprocessing's cleanup function then exit process without
1633 # garbage collecting locals
1634 util._exit_function()
1635 conn.close()
1636 os._exit(0)
1637
1638 def test_finalize(self):
1639 conn, child_conn = self.Pipe()
1640
1641 p = self.Process(target=self._test_finalize, args=(child_conn,))
1642 p.start()
1643 p.join()
1644
1645 result = [obj for obj in iter(conn.recv, 'STOP')]
1646 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1647
1648#
1649# Test that from ... import * works for each module
1650#
1651
1652class _TestImportStar(BaseTestCase):
1653
1654 ALLOWED_TYPES = ('processes',)
1655
1656 def test_import(self):
1657 modules = (
1658 'multiprocessing', 'multiprocessing.connection',
1659 'multiprocessing.heap', 'multiprocessing.managers',
1660 'multiprocessing.pool', 'multiprocessing.process',
1661 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1662 'multiprocessing.synchronize', 'multiprocessing.util'
1663 )
1664
1665 for name in modules:
1666 __import__(name)
1667 mod = sys.modules[name]
1668
1669 for attr in getattr(mod, '__all__', ()):
1670 self.assertTrue(
1671 hasattr(mod, attr),
1672 '%r does not have attribute %r' % (mod, attr)
1673 )
1674
1675#
1676# Quick test that logging works -- does not test logging output
1677#
1678
1679class _TestLogging(BaseTestCase):
1680
1681 ALLOWED_TYPES = ('processes',)
1682
1683 def test_enable_logging(self):
1684 logger = multiprocessing.get_logger()
1685 logger.setLevel(util.SUBWARNING)
1686 self.assertTrue(logger is not None)
1687 logger.debug('this will not be printed')
1688 logger.info('nor will this')
1689 logger.setLevel(LOG_LEVEL)
1690
1691 def _test_level(self, conn):
1692 logger = multiprocessing.get_logger()
1693 conn.send(logger.getEffectiveLevel())
1694
1695 def test_level(self):
1696 LEVEL1 = 32
1697 LEVEL2 = 37
1698
1699 logger = multiprocessing.get_logger()
1700 root_logger = logging.getLogger()
1701 root_level = root_logger.level
1702
1703 reader, writer = multiprocessing.Pipe(duplex=False)
1704
1705 logger.setLevel(LEVEL1)
1706 self.Process(target=self._test_level, args=(writer,)).start()
1707 self.assertEqual(LEVEL1, reader.recv())
1708
1709 logger.setLevel(logging.NOTSET)
1710 root_logger.setLevel(LEVEL2)
1711 self.Process(target=self._test_level, args=(writer,)).start()
1712 self.assertEqual(LEVEL2, reader.recv())
1713
1714 root_logger.setLevel(root_level)
1715 logger.setLevel(level=LOG_LEVEL)
1716
1717#
Jesse Noller6214edd2009-01-19 16:23:53 +00001718# Test to verify handle verification, see issue 3321
1719#
1720
1721class TestInvalidHandle(unittest.TestCase):
1722
1723 def test_invalid_handles(self):
1724 if WIN32:
1725 return
1726 conn = _multiprocessing.Connection(44977608)
1727 self.assertRaises(IOError, conn.poll)
1728 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1729#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001730# Functions used to create test cases from the base ones in this module
1731#
1732
1733def get_attributes(Source, names):
1734 d = {}
1735 for name in names:
1736 obj = getattr(Source, name)
1737 if type(obj) == type(get_attributes):
1738 obj = staticmethod(obj)
1739 d[name] = obj
1740 return d
1741
1742def create_test_cases(Mixin, type):
1743 result = {}
1744 glob = globals()
1745 Type = type[0].upper() + type[1:]
1746
1747 for name in list(glob.keys()):
1748 if name.startswith('_Test'):
1749 base = glob[name]
1750 if type in base.ALLOWED_TYPES:
1751 newname = 'With' + Type + name[1:]
1752 class Temp(base, unittest.TestCase, Mixin):
1753 pass
1754 result[newname] = Temp
1755 Temp.__name__ = newname
1756 Temp.__module__ = Mixin.__module__
1757 return result
1758
1759#
1760# Create test cases
1761#
1762
1763class ProcessesMixin(object):
1764 TYPE = 'processes'
1765 Process = multiprocessing.Process
1766 locals().update(get_attributes(multiprocessing, (
1767 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1768 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1769 'RawArray', 'current_process', 'active_children', 'Pipe',
1770 'connection', 'JoinableQueue'
1771 )))
1772
1773testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1774globals().update(testcases_processes)
1775
1776
1777class ManagerMixin(object):
1778 TYPE = 'manager'
1779 Process = multiprocessing.Process
1780 manager = object.__new__(multiprocessing.managers.SyncManager)
1781 locals().update(get_attributes(manager, (
1782 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1783 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1784 'Namespace', 'JoinableQueue'
1785 )))
1786
1787testcases_manager = create_test_cases(ManagerMixin, type='manager')
1788globals().update(testcases_manager)
1789
1790
1791class ThreadsMixin(object):
1792 TYPE = 'threads'
1793 Process = multiprocessing.dummy.Process
1794 locals().update(get_attributes(multiprocessing.dummy, (
1795 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1796 'Condition', 'Event', 'Value', 'Array', 'current_process',
1797 'active_children', 'Pipe', 'connection', 'dict', 'list',
1798 'Namespace', 'JoinableQueue'
1799 )))
1800
1801testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1802globals().update(testcases_threads)
1803
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001804class OtherTest(unittest.TestCase):
1805 # TODO: add more tests for deliver/answer challenge.
1806 def test_deliver_challenge_auth_failure(self):
1807 class _FakeConnection(object):
1808 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001809 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001810 def send_bytes(self, data):
1811 pass
1812 self.assertRaises(multiprocessing.AuthenticationError,
1813 multiprocessing.connection.deliver_challenge,
1814 _FakeConnection(), b'abc')
1815
1816 def test_answer_challenge_auth_failure(self):
1817 class _FakeConnection(object):
1818 def __init__(self):
1819 self.count = 0
1820 def recv_bytes(self, size):
1821 self.count += 1
1822 if self.count == 1:
1823 return multiprocessing.connection.CHALLENGE
1824 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001825 return b'something bogus'
1826 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001827 def send_bytes(self, data):
1828 pass
1829 self.assertRaises(multiprocessing.AuthenticationError,
1830 multiprocessing.connection.answer_challenge,
1831 _FakeConnection(), b'abc')
1832
Jesse Noller6214edd2009-01-19 16:23:53 +00001833testcases_other = [OtherTest, TestInvalidHandle]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001834
Benjamin Petersone711caf2008-06-11 16:44:04 +00001835#
1836#
1837#
1838
1839def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001840 if sys.platform.startswith("linux"):
1841 try:
1842 lock = multiprocessing.RLock()
1843 except OSError:
Amaury Forgeot d'Arc620626b2008-06-19 22:03:50 +00001844 from test.support import TestSkipped
Benjamin Petersone549ead2009-03-28 21:42:05 +00001845 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001846
Benjamin Petersone711caf2008-06-11 16:44:04 +00001847 if run is None:
1848 from test.support import run_unittest as run
1849
1850 util.get_temp_dir() # creates temp directory for use by all processes
1851
1852 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1853
Benjamin Peterson41181742008-07-02 20:22:54 +00001854 ProcessesMixin.pool = multiprocessing.Pool(4)
1855 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1856 ManagerMixin.manager.__init__()
1857 ManagerMixin.manager.start()
1858 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001859
1860 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001861 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1862 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001863 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1864 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001865 )
1866
1867 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1868 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1869 run(suite)
1870
Benjamin Peterson41181742008-07-02 20:22:54 +00001871 ThreadsMixin.pool.terminate()
1872 ProcessesMixin.pool.terminate()
1873 ManagerMixin.pool.terminate()
1874 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001875
Benjamin Peterson41181742008-07-02 20:22:54 +00001876 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001877
1878def main():
1879 test_main(unittest.TextTestRunner(verbosity=2).run)
1880
1881if __name__ == '__main__':
1882 main()