blob: ed8c62630b289f222a2f470a4fb86446ca7e51e6 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
22# Work around broken sem_open implementations
23try:
24 import multiprocessing.synchronize
25except ImportError as e:
Benjamin Petersone549ead2009-03-28 21:42:05 +000026 raise unittest.SkipTest(e)
Benjamin Petersone5384b02008-10-04 22:00:42 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.pool
33import _multiprocessing
34
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000041def latin(s):
42 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
Benjamin Petersone711caf2008-06-11 16:44:04 +000044#
45# Constants
46#
47
48LOG_LEVEL = util.SUBWARNING
49#LOG_LEVEL = logging.WARNING
50
51DELTA = 0.1
52CHECK_TIMINGS = False # making true makes tests take a lot longer
53 # and can sometimes cause some non-serious
54 # failures because some calls block a bit
55 # longer than expected
56if CHECK_TIMINGS:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
58else:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
60
61HAVE_GETVALUE = not getattr(_multiprocessing,
62 'HAVE_BROKEN_SEM_GETVALUE', False)
63
Jesse Noller6214edd2009-01-19 16:23:53 +000064WIN32 = (sys.platform == "win32")
65
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
67# Creates a wrapper for a function which records the time it takes to finish
68#
69
70class TimingWrapper(object):
71
72 def __init__(self, func):
73 self.func = func
74 self.elapsed = None
75
76 def __call__(self, *args, **kwds):
77 t = time.time()
78 try:
79 return self.func(*args, **kwds)
80 finally:
81 self.elapsed = time.time() - t
82
83#
84# Base class for test cases
85#
86
87class BaseTestCase(object):
88
89 ALLOWED_TYPES = ('processes', 'manager', 'threads')
90
91 def assertTimingAlmostEqual(self, a, b):
92 if CHECK_TIMINGS:
93 self.assertAlmostEqual(a, b, 1)
94
95 def assertReturnsIfImplemented(self, value, func, *args):
96 try:
97 res = func(*args)
98 except NotImplementedError:
99 pass
100 else:
101 return self.assertEqual(value, res)
102
103#
104# Return the value of a semaphore
105#
106
107def get_value(self):
108 try:
109 return self.get_value()
110 except AttributeError:
111 try:
112 return self._Semaphore__value
113 except AttributeError:
114 try:
115 return self._value
116 except AttributeError:
117 raise NotImplementedError
118
119#
120# Testcases
121#
122
123class _TestProcess(BaseTestCase):
124
125 ALLOWED_TYPES = ('processes', 'threads')
126
127 def test_current(self):
128 if self.TYPE == 'threads':
129 return
130
131 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000132 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000133
134 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000135 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136 self.assertTrue(isinstance(authkey, bytes))
137 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000138 self.assertEqual(current.ident, os.getpid())
139 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000140
141 def _test(self, q, *args, **kwds):
142 current = self.current_process()
143 q.put(args)
144 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000145 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000147 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000148 q.put(current.pid)
149
150 def test_process(self):
151 q = self.Queue(1)
152 e = self.Event()
153 args = (q, 1, 2)
154 kwargs = {'hello':23, 'bye':2.54}
155 name = 'SomeProcess'
156 p = self.Process(
157 target=self._test, args=args, kwargs=kwargs, name=name
158 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000159 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160 current = self.current_process()
161
162 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000163 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000164 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000165 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 self.assertTrue(p not in self.active_children())
167 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000168 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169
170 p.start()
171
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000172 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 self.assertEquals(p.is_alive(), True)
174 self.assertTrue(p in self.active_children())
175
176 self.assertEquals(q.get(), args[1:])
177 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000180 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000181 self.assertEquals(q.get(), p.pid)
182
183 p.join()
184
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 self.assertEquals(p.is_alive(), False)
187 self.assertTrue(p not in self.active_children())
188
189 def _test_terminate(self):
190 time.sleep(1000)
191
192 def test_terminate(self):
193 if self.TYPE == 'threads':
194 return
195
196 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000197 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198 p.start()
199
200 self.assertEqual(p.is_alive(), True)
201 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203
204 p.terminate()
205
206 join = TimingWrapper(p.join)
207 self.assertEqual(join(), None)
208 self.assertTimingAlmostEqual(join.elapsed, 0.0)
209
210 self.assertEqual(p.is_alive(), False)
211 self.assertTrue(p not in self.active_children())
212
213 p.join()
214
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000215 # XXX sometimes get p.exitcode == 0 on Windows ...
216 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217
218 def test_cpu_count(self):
219 try:
220 cpus = multiprocessing.cpu_count()
221 except NotImplementedError:
222 cpus = 1
223 self.assertTrue(type(cpus) is int)
224 self.assertTrue(cpus >= 1)
225
226 def test_active_children(self):
227 self.assertEqual(type(self.active_children()), list)
228
229 p = self.Process(target=time.sleep, args=(DELTA,))
230 self.assertTrue(p not in self.active_children())
231
232 p.start()
233 self.assertTrue(p in self.active_children())
234
235 p.join()
236 self.assertTrue(p not in self.active_children())
237
238 def _test_recursion(self, wconn, id):
239 from multiprocessing import forking
240 wconn.send(id)
241 if len(id) < 2:
242 for i in range(2):
243 p = self.Process(
244 target=self._test_recursion, args=(wconn, id+[i])
245 )
246 p.start()
247 p.join()
248
249 def test_recursion(self):
250 rconn, wconn = self.Pipe(duplex=False)
251 self._test_recursion(wconn, [])
252
253 time.sleep(DELTA)
254 result = []
255 while rconn.poll():
256 result.append(rconn.recv())
257
258 expected = [
259 [],
260 [0],
261 [0, 0],
262 [0, 1],
263 [1],
264 [1, 0],
265 [1, 1]
266 ]
267 self.assertEqual(result, expected)
268
269#
270#
271#
272
273class _UpperCaser(multiprocessing.Process):
274
275 def __init__(self):
276 multiprocessing.Process.__init__(self)
277 self.child_conn, self.parent_conn = multiprocessing.Pipe()
278
279 def run(self):
280 self.parent_conn.close()
281 for s in iter(self.child_conn.recv, None):
282 self.child_conn.send(s.upper())
283 self.child_conn.close()
284
285 def submit(self, s):
286 assert type(s) is str
287 self.parent_conn.send(s)
288 return self.parent_conn.recv()
289
290 def stop(self):
291 self.parent_conn.send(None)
292 self.parent_conn.close()
293 self.child_conn.close()
294
295class _TestSubclassingProcess(BaseTestCase):
296
297 ALLOWED_TYPES = ('processes',)
298
299 def test_subclassing(self):
300 uppercaser = _UpperCaser()
301 uppercaser.start()
302 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
303 self.assertEqual(uppercaser.submit('world'), 'WORLD')
304 uppercaser.stop()
305 uppercaser.join()
306
307#
308#
309#
310
311def queue_empty(q):
312 if hasattr(q, 'empty'):
313 return q.empty()
314 else:
315 return q.qsize() == 0
316
317def queue_full(q, maxsize):
318 if hasattr(q, 'full'):
319 return q.full()
320 else:
321 return q.qsize() == maxsize
322
323
324class _TestQueue(BaseTestCase):
325
326
327 def _test_put(self, queue, child_can_start, parent_can_continue):
328 child_can_start.wait()
329 for i in range(6):
330 queue.get()
331 parent_can_continue.set()
332
333 def test_put(self):
334 MAXSIZE = 6
335 queue = self.Queue(maxsize=MAXSIZE)
336 child_can_start = self.Event()
337 parent_can_continue = self.Event()
338
339 proc = self.Process(
340 target=self._test_put,
341 args=(queue, child_can_start, parent_can_continue)
342 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000343 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000344 proc.start()
345
346 self.assertEqual(queue_empty(queue), True)
347 self.assertEqual(queue_full(queue, MAXSIZE), False)
348
349 queue.put(1)
350 queue.put(2, True)
351 queue.put(3, True, None)
352 queue.put(4, False)
353 queue.put(5, False, None)
354 queue.put_nowait(6)
355
356 # the values may be in buffer but not yet in pipe so sleep a bit
357 time.sleep(DELTA)
358
359 self.assertEqual(queue_empty(queue), False)
360 self.assertEqual(queue_full(queue, MAXSIZE), True)
361
362 put = TimingWrapper(queue.put)
363 put_nowait = TimingWrapper(queue.put_nowait)
364
365 self.assertRaises(pyqueue.Full, put, 7, False)
366 self.assertTimingAlmostEqual(put.elapsed, 0)
367
368 self.assertRaises(pyqueue.Full, put, 7, False, None)
369 self.assertTimingAlmostEqual(put.elapsed, 0)
370
371 self.assertRaises(pyqueue.Full, put_nowait, 7)
372 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
373
374 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
375 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
376
377 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
378 self.assertTimingAlmostEqual(put.elapsed, 0)
379
380 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
381 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
382
383 child_can_start.set()
384 parent_can_continue.wait()
385
386 self.assertEqual(queue_empty(queue), True)
387 self.assertEqual(queue_full(queue, MAXSIZE), False)
388
389 proc.join()
390
391 def _test_get(self, queue, child_can_start, parent_can_continue):
392 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000393 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000394 queue.put(2)
395 queue.put(3)
396 queue.put(4)
397 queue.put(5)
398 parent_can_continue.set()
399
400 def test_get(self):
401 queue = self.Queue()
402 child_can_start = self.Event()
403 parent_can_continue = self.Event()
404
405 proc = self.Process(
406 target=self._test_get,
407 args=(queue, child_can_start, parent_can_continue)
408 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000409 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000410 proc.start()
411
412 self.assertEqual(queue_empty(queue), True)
413
414 child_can_start.set()
415 parent_can_continue.wait()
416
417 time.sleep(DELTA)
418 self.assertEqual(queue_empty(queue), False)
419
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000420 # Hangs unexpectedly, remove for now
421 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422 self.assertEqual(queue.get(True, None), 2)
423 self.assertEqual(queue.get(True), 3)
424 self.assertEqual(queue.get(timeout=1), 4)
425 self.assertEqual(queue.get_nowait(), 5)
426
427 self.assertEqual(queue_empty(queue), True)
428
429 get = TimingWrapper(queue.get)
430 get_nowait = TimingWrapper(queue.get_nowait)
431
432 self.assertRaises(pyqueue.Empty, get, False)
433 self.assertTimingAlmostEqual(get.elapsed, 0)
434
435 self.assertRaises(pyqueue.Empty, get, False, None)
436 self.assertTimingAlmostEqual(get.elapsed, 0)
437
438 self.assertRaises(pyqueue.Empty, get_nowait)
439 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
440
441 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
442 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
443
444 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
445 self.assertTimingAlmostEqual(get.elapsed, 0)
446
447 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
448 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
449
450 proc.join()
451
452 def _test_fork(self, queue):
453 for i in range(10, 20):
454 queue.put(i)
455 # note that at this point the items may only be buffered, so the
456 # process cannot shutdown until the feeder thread has finished
457 # pushing items onto the pipe.
458
459 def test_fork(self):
460 # Old versions of Queue would fail to create a new feeder
461 # thread for a forked process if the original process had its
462 # own feeder thread. This test checks that this no longer
463 # happens.
464
465 queue = self.Queue()
466
467 # put items on queue so that main process starts a feeder thread
468 for i in range(10):
469 queue.put(i)
470
471 # wait to make sure thread starts before we fork a new process
472 time.sleep(DELTA)
473
474 # fork process
475 p = self.Process(target=self._test_fork, args=(queue,))
476 p.start()
477
478 # check that all expected items are in the queue
479 for i in range(20):
480 self.assertEqual(queue.get(), i)
481 self.assertRaises(pyqueue.Empty, queue.get, False)
482
483 p.join()
484
485 def test_qsize(self):
486 q = self.Queue()
487 try:
488 self.assertEqual(q.qsize(), 0)
489 except NotImplementedError:
490 return
491 q.put(1)
492 self.assertEqual(q.qsize(), 1)
493 q.put(5)
494 self.assertEqual(q.qsize(), 2)
495 q.get()
496 self.assertEqual(q.qsize(), 1)
497 q.get()
498 self.assertEqual(q.qsize(), 0)
499
500 def _test_task_done(self, q):
501 for obj in iter(q.get, None):
502 time.sleep(DELTA)
503 q.task_done()
504
505 def test_task_done(self):
506 queue = self.JoinableQueue()
507
508 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
509 return
510
511 workers = [self.Process(target=self._test_task_done, args=(queue,))
512 for i in range(4)]
513
514 for p in workers:
515 p.start()
516
517 for i in range(10):
518 queue.put(i)
519
520 queue.join()
521
522 for p in workers:
523 queue.put(None)
524
525 for p in workers:
526 p.join()
527
528#
529#
530#
531
532class _TestLock(BaseTestCase):
533
534 def test_lock(self):
535 lock = self.Lock()
536 self.assertEqual(lock.acquire(), True)
537 self.assertEqual(lock.acquire(False), False)
538 self.assertEqual(lock.release(), None)
539 self.assertRaises((ValueError, threading.ThreadError), lock.release)
540
541 def test_rlock(self):
542 lock = self.RLock()
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertEqual(lock.release(), None)
549 self.assertRaises((AssertionError, RuntimeError), lock.release)
550
551
552class _TestSemaphore(BaseTestCase):
553
554 def _test_semaphore(self, sem):
555 self.assertReturnsIfImplemented(2, get_value, sem)
556 self.assertEqual(sem.acquire(), True)
557 self.assertReturnsIfImplemented(1, get_value, sem)
558 self.assertEqual(sem.acquire(), True)
559 self.assertReturnsIfImplemented(0, get_value, sem)
560 self.assertEqual(sem.acquire(False), False)
561 self.assertReturnsIfImplemented(0, get_value, sem)
562 self.assertEqual(sem.release(), None)
563 self.assertReturnsIfImplemented(1, get_value, sem)
564 self.assertEqual(sem.release(), None)
565 self.assertReturnsIfImplemented(2, get_value, sem)
566
567 def test_semaphore(self):
568 sem = self.Semaphore(2)
569 self._test_semaphore(sem)
570 self.assertEqual(sem.release(), None)
571 self.assertReturnsIfImplemented(3, get_value, sem)
572 self.assertEqual(sem.release(), None)
573 self.assertReturnsIfImplemented(4, get_value, sem)
574
575 def test_bounded_semaphore(self):
576 sem = self.BoundedSemaphore(2)
577 self._test_semaphore(sem)
578 # Currently fails on OS/X
579 #if HAVE_GETVALUE:
580 # self.assertRaises(ValueError, sem.release)
581 # self.assertReturnsIfImplemented(2, get_value, sem)
582
583 def test_timeout(self):
584 if self.TYPE != 'processes':
585 return
586
587 sem = self.Semaphore(0)
588 acquire = TimingWrapper(sem.acquire)
589
590 self.assertEqual(acquire(False), False)
591 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
592
593 self.assertEqual(acquire(False, None), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
595
596 self.assertEqual(acquire(False, TIMEOUT1), False)
597 self.assertTimingAlmostEqual(acquire.elapsed, 0)
598
599 self.assertEqual(acquire(True, TIMEOUT2), False)
600 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
601
602 self.assertEqual(acquire(timeout=TIMEOUT3), False)
603 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
604
605
606class _TestCondition(BaseTestCase):
607
608 def f(self, cond, sleeping, woken, timeout=None):
609 cond.acquire()
610 sleeping.release()
611 cond.wait(timeout)
612 woken.release()
613 cond.release()
614
615 def check_invariant(self, cond):
616 # this is only supposed to succeed when there are no sleepers
617 if self.TYPE == 'processes':
618 try:
619 sleepers = (cond._sleeping_count.get_value() -
620 cond._woken_count.get_value())
621 self.assertEqual(sleepers, 0)
622 self.assertEqual(cond._wait_semaphore.get_value(), 0)
623 except NotImplementedError:
624 pass
625
626 def test_notify(self):
627 cond = self.Condition()
628 sleeping = self.Semaphore(0)
629 woken = self.Semaphore(0)
630
631 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000632 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000633 p.start()
634
635 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000636 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000637 p.start()
638
639 # wait for both children to start sleeping
640 sleeping.acquire()
641 sleeping.acquire()
642
643 # check no process/thread has woken up
644 time.sleep(DELTA)
645 self.assertReturnsIfImplemented(0, get_value, woken)
646
647 # wake up one process/thread
648 cond.acquire()
649 cond.notify()
650 cond.release()
651
652 # check one process/thread has woken up
653 time.sleep(DELTA)
654 self.assertReturnsIfImplemented(1, get_value, woken)
655
656 # wake up another
657 cond.acquire()
658 cond.notify()
659 cond.release()
660
661 # check other has woken up
662 time.sleep(DELTA)
663 self.assertReturnsIfImplemented(2, get_value, woken)
664
665 # check state is not mucked up
666 self.check_invariant(cond)
667 p.join()
668
669 def test_notify_all(self):
670 cond = self.Condition()
671 sleeping = self.Semaphore(0)
672 woken = self.Semaphore(0)
673
674 # start some threads/processes which will timeout
675 for i in range(3):
676 p = self.Process(target=self.f,
677 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000678 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000679 p.start()
680
681 t = threading.Thread(target=self.f,
682 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000683 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000684 t.start()
685
686 # wait for them all to sleep
687 for i in range(6):
688 sleeping.acquire()
689
690 # check they have all timed out
691 for i in range(6):
692 woken.acquire()
693 self.assertReturnsIfImplemented(0, get_value, woken)
694
695 # check state is not mucked up
696 self.check_invariant(cond)
697
698 # start some more threads/processes
699 for i in range(3):
700 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000701 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000702 p.start()
703
704 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000705 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000706 t.start()
707
708 # wait for them to all sleep
709 for i in range(6):
710 sleeping.acquire()
711
712 # check no process/thread has woken up
713 time.sleep(DELTA)
714 self.assertReturnsIfImplemented(0, get_value, woken)
715
716 # wake them all up
717 cond.acquire()
718 cond.notify_all()
719 cond.release()
720
721 # check they have all woken
722 time.sleep(DELTA)
723 self.assertReturnsIfImplemented(6, get_value, woken)
724
725 # check state is not mucked up
726 self.check_invariant(cond)
727
728 def test_timeout(self):
729 cond = self.Condition()
730 wait = TimingWrapper(cond.wait)
731 cond.acquire()
732 res = wait(TIMEOUT1)
733 cond.release()
734 self.assertEqual(res, None)
735 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
736
737
738class _TestEvent(BaseTestCase):
739
740 def _test_event(self, event):
741 time.sleep(TIMEOUT2)
742 event.set()
743
744 def test_event(self):
745 event = self.Event()
746 wait = TimingWrapper(event.wait)
747
748 # Removed temporaily, due to API shear, this does not
749 # work with threading._Event objects. is_set == isSet
750 #self.assertEqual(event.is_set(), False)
751
752 self.assertEqual(wait(0.0), None)
753 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
754 self.assertEqual(wait(TIMEOUT1), None)
755 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
756
757 event.set()
758
759 # See note above on the API differences
760 # self.assertEqual(event.is_set(), True)
761 self.assertEqual(wait(), None)
762 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
763 self.assertEqual(wait(TIMEOUT1), None)
764 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
765 # self.assertEqual(event.is_set(), True)
766
767 event.clear()
768
769 #self.assertEqual(event.is_set(), False)
770
771 self.Process(target=self._test_event, args=(event,)).start()
772 self.assertEqual(wait(), None)
773
774#
775#
776#
777
778class _TestValue(BaseTestCase):
779
780 codes_values = [
781 ('i', 4343, 24234),
782 ('d', 3.625, -4.25),
783 ('h', -232, 234),
784 ('c', latin('x'), latin('y'))
785 ]
786
787 def _test(self, values):
788 for sv, cv in zip(values, self.codes_values):
789 sv.value = cv[2]
790
791
792 def test_value(self, raw=False):
793 if self.TYPE != 'processes':
794 return
795
796 if raw:
797 values = [self.RawValue(code, value)
798 for code, value, _ in self.codes_values]
799 else:
800 values = [self.Value(code, value)
801 for code, value, _ in self.codes_values]
802
803 for sv, cv in zip(values, self.codes_values):
804 self.assertEqual(sv.value, cv[1])
805
806 proc = self.Process(target=self._test, args=(values,))
807 proc.start()
808 proc.join()
809
810 for sv, cv in zip(values, self.codes_values):
811 self.assertEqual(sv.value, cv[2])
812
813 def test_rawvalue(self):
814 self.test_value(raw=True)
815
816 def test_getobj_getlock(self):
817 if self.TYPE != 'processes':
818 return
819
820 val1 = self.Value('i', 5)
821 lock1 = val1.get_lock()
822 obj1 = val1.get_obj()
823
824 val2 = self.Value('i', 5, lock=None)
825 lock2 = val2.get_lock()
826 obj2 = val2.get_obj()
827
828 lock = self.Lock()
829 val3 = self.Value('i', 5, lock=lock)
830 lock3 = val3.get_lock()
831 obj3 = val3.get_obj()
832 self.assertEqual(lock, lock3)
833
Jesse Nollerb0516a62009-01-18 03:11:38 +0000834 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 self.assertFalse(hasattr(arr4, 'get_lock'))
836 self.assertFalse(hasattr(arr4, 'get_obj'))
837
Jesse Nollerb0516a62009-01-18 03:11:38 +0000838 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
839
840 arr5 = self.RawValue('i', 5)
841 self.assertFalse(hasattr(arr5, 'get_lock'))
842 self.assertFalse(hasattr(arr5, 'get_obj'))
843
Benjamin Petersone711caf2008-06-11 16:44:04 +0000844
845class _TestArray(BaseTestCase):
846
847 def f(self, seq):
848 for i in range(1, len(seq)):
849 seq[i] += seq[i-1]
850
851 def test_array(self, raw=False):
852 if self.TYPE != 'processes':
853 return
854
855 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
856 if raw:
857 arr = self.RawArray('i', seq)
858 else:
859 arr = self.Array('i', seq)
860
861 self.assertEqual(len(arr), len(seq))
862 self.assertEqual(arr[3], seq[3])
863 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
864
865 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
866
867 self.assertEqual(list(arr[:]), seq)
868
869 self.f(seq)
870
871 p = self.Process(target=self.f, args=(arr,))
872 p.start()
873 p.join()
874
875 self.assertEqual(list(arr[:]), seq)
876
877 def test_rawarray(self):
878 self.test_array(raw=True)
879
880 def test_getobj_getlock_obj(self):
881 if self.TYPE != 'processes':
882 return
883
884 arr1 = self.Array('i', list(range(10)))
885 lock1 = arr1.get_lock()
886 obj1 = arr1.get_obj()
887
888 arr2 = self.Array('i', list(range(10)), lock=None)
889 lock2 = arr2.get_lock()
890 obj2 = arr2.get_obj()
891
892 lock = self.Lock()
893 arr3 = self.Array('i', list(range(10)), lock=lock)
894 lock3 = arr3.get_lock()
895 obj3 = arr3.get_obj()
896 self.assertEqual(lock, lock3)
897
Jesse Nollerb0516a62009-01-18 03:11:38 +0000898 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000899 self.assertFalse(hasattr(arr4, 'get_lock'))
900 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000901 self.assertRaises(AttributeError,
902 self.Array, 'i', range(10), lock='notalock')
903
904 arr5 = self.RawArray('i', range(10))
905 self.assertFalse(hasattr(arr5, 'get_lock'))
906 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000907
908#
909#
910#
911
912class _TestContainers(BaseTestCase):
913
914 ALLOWED_TYPES = ('manager',)
915
916 def test_list(self):
917 a = self.list(list(range(10)))
918 self.assertEqual(a[:], list(range(10)))
919
920 b = self.list()
921 self.assertEqual(b[:], [])
922
923 b.extend(list(range(5)))
924 self.assertEqual(b[:], list(range(5)))
925
926 self.assertEqual(b[2], 2)
927 self.assertEqual(b[2:10], [2,3,4])
928
929 b *= 2
930 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
931
932 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
933
934 self.assertEqual(a[:], list(range(10)))
935
936 d = [a, b]
937 e = self.list(d)
938 self.assertEqual(
939 e[:],
940 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
941 )
942
943 f = self.list([a])
944 a.append('hello')
945 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
946
947 def test_dict(self):
948 d = self.dict()
949 indices = list(range(65, 70))
950 for i in indices:
951 d[i] = chr(i)
952 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
953 self.assertEqual(sorted(d.keys()), indices)
954 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
955 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
956
957 def test_namespace(self):
958 n = self.Namespace()
959 n.name = 'Bob'
960 n.job = 'Builder'
961 n._hidden = 'hidden'
962 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
963 del n.job
964 self.assertEqual(str(n), "Namespace(name='Bob')")
965 self.assertTrue(hasattr(n, 'name'))
966 self.assertTrue(not hasattr(n, 'job'))
967
968#
969#
970#
971
972def sqr(x, wait=0.0):
973 time.sleep(wait)
974 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000975class _TestPool(BaseTestCase):
976
977 def test_apply(self):
978 papply = self.pool.apply
979 self.assertEqual(papply(sqr, (5,)), sqr(5))
980 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
981
982 def test_map(self):
983 pmap = self.pool.map
984 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
985 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
986 list(map(sqr, list(range(100)))))
987
988 def test_async(self):
989 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
990 get = TimingWrapper(res.get)
991 self.assertEqual(get(), 49)
992 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
993
994 def test_async_timeout(self):
995 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
996 get = TimingWrapper(res.get)
997 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
998 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
999
1000 def test_imap(self):
1001 it = self.pool.imap(sqr, list(range(10)))
1002 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1003
1004 it = self.pool.imap(sqr, list(range(10)))
1005 for i in range(10):
1006 self.assertEqual(next(it), i*i)
1007 self.assertRaises(StopIteration, it.__next__)
1008
1009 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1010 for i in range(1000):
1011 self.assertEqual(next(it), i*i)
1012 self.assertRaises(StopIteration, it.__next__)
1013
1014 def test_imap_unordered(self):
1015 it = self.pool.imap_unordered(sqr, list(range(1000)))
1016 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1017
1018 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1019 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1020
1021 def test_make_pool(self):
1022 p = multiprocessing.Pool(3)
1023 self.assertEqual(3, len(p._pool))
1024 p.close()
1025 p.join()
1026
1027 def test_terminate(self):
1028 if self.TYPE == 'manager':
1029 # On Unix a forked process increfs each shared object to
1030 # which its parent process held a reference. If the
1031 # forked process gets terminated then there is likely to
1032 # be a reference leak. So to prevent
1033 # _TestZZZNumberOfObjects from failing we skip this test
1034 # when using a manager.
1035 return
1036
1037 result = self.pool.map_async(
1038 time.sleep, [0.1 for i in range(10000)], chunksize=1
1039 )
1040 self.pool.terminate()
1041 join = TimingWrapper(self.pool.join)
1042 join()
1043 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001044#
1045# Test that manager has expected number of shared objects left
1046#
1047
1048class _TestZZZNumberOfObjects(BaseTestCase):
1049 # Because test cases are sorted alphabetically, this one will get
1050 # run after all the other tests for the manager. It tests that
1051 # there have been no "reference leaks" for the manager's shared
1052 # objects. Note the comment in _TestPool.test_terminate().
1053 ALLOWED_TYPES = ('manager',)
1054
1055 def test_number_of_objects(self):
1056 EXPECTED_NUMBER = 1 # the pool object is still alive
1057 multiprocessing.active_children() # discard dead process objs
1058 gc.collect() # do garbage collection
1059 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001060 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001061 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001062 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001063 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001064
1065 self.assertEqual(refs, EXPECTED_NUMBER)
1066
1067#
1068# Test of creating a customized manager class
1069#
1070
1071from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1072
1073class FooBar(object):
1074 def f(self):
1075 return 'f()'
1076 def g(self):
1077 raise ValueError
1078 def _h(self):
1079 return '_h()'
1080
1081def baz():
1082 for i in range(10):
1083 yield i*i
1084
1085class IteratorProxy(BaseProxy):
1086 _exposed_ = ('next', '__next__')
1087 def __iter__(self):
1088 return self
1089 def __next__(self):
1090 return self._callmethod('next')
1091 def __next__(self):
1092 return self._callmethod('__next__')
1093
1094class MyManager(BaseManager):
1095 pass
1096
1097MyManager.register('Foo', callable=FooBar)
1098MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1099MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1100
1101
1102class _TestMyManager(BaseTestCase):
1103
1104 ALLOWED_TYPES = ('manager',)
1105
1106 def test_mymanager(self):
1107 manager = MyManager()
1108 manager.start()
1109
1110 foo = manager.Foo()
1111 bar = manager.Bar()
1112 baz = manager.baz()
1113
1114 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1115 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1116
1117 self.assertEqual(foo_methods, ['f', 'g'])
1118 self.assertEqual(bar_methods, ['f', '_h'])
1119
1120 self.assertEqual(foo.f(), 'f()')
1121 self.assertRaises(ValueError, foo.g)
1122 self.assertEqual(foo._callmethod('f'), 'f()')
1123 self.assertRaises(RemoteError, foo._callmethod, '_h')
1124
1125 self.assertEqual(bar.f(), 'f()')
1126 self.assertEqual(bar._h(), '_h()')
1127 self.assertEqual(bar._callmethod('f'), 'f()')
1128 self.assertEqual(bar._callmethod('_h'), '_h()')
1129
1130 self.assertEqual(list(baz), [i*i for i in range(10)])
1131
1132 manager.shutdown()
1133
1134#
1135# Test of connecting to a remote server and using xmlrpclib for serialization
1136#
1137
1138_queue = pyqueue.Queue()
1139def get_queue():
1140 return _queue
1141
1142class QueueManager(BaseManager):
1143 '''manager class used by server process'''
1144QueueManager.register('get_queue', callable=get_queue)
1145
1146class QueueManager2(BaseManager):
1147 '''manager class which specifies the same interface as QueueManager'''
1148QueueManager2.register('get_queue')
1149
1150
1151SERIALIZER = 'xmlrpclib'
1152
1153class _TestRemoteManager(BaseTestCase):
1154
1155 ALLOWED_TYPES = ('manager',)
1156
1157 def _putter(self, address, authkey):
1158 manager = QueueManager2(
1159 address=address, authkey=authkey, serializer=SERIALIZER
1160 )
1161 manager.connect()
1162 queue = manager.get_queue()
1163 queue.put(('hello world', None, True, 2.25))
1164
1165 def test_remote(self):
1166 authkey = os.urandom(32)
1167
1168 manager = QueueManager(
1169 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1170 )
1171 manager.start()
1172
1173 p = self.Process(target=self._putter, args=(manager.address, authkey))
1174 p.start()
1175
1176 manager2 = QueueManager2(
1177 address=manager.address, authkey=authkey, serializer=SERIALIZER
1178 )
1179 manager2.connect()
1180 queue = manager2.get_queue()
1181
1182 # Note that xmlrpclib will deserialize object as a list not a tuple
1183 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1184
1185 # Because we are using xmlrpclib for serialization instead of
1186 # pickle this will cause a serialization error.
1187 self.assertRaises(Exception, queue.put, time.sleep)
1188
1189 # Make queue finalizer run before the server is stopped
1190 del queue
1191 manager.shutdown()
1192
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001193class _TestManagerRestart(BaseTestCase):
1194
1195 def _putter(self, address, authkey):
1196 manager = QueueManager(
1197 address=address, authkey=authkey, serializer=SERIALIZER)
1198 manager.connect()
1199 queue = manager.get_queue()
1200 queue.put('hello world')
1201
1202 def test_rapid_restart(self):
1203 authkey = os.urandom(32)
1204 manager = QueueManager(
1205 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1206 manager.start()
1207
1208 p = self.Process(target=self._putter, args=(manager.address, authkey))
1209 p.start()
1210 queue = manager.get_queue()
1211 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001212 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001213 manager.shutdown()
1214 manager = QueueManager(
1215 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1216 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001217 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001218
Benjamin Petersone711caf2008-06-11 16:44:04 +00001219#
1220#
1221#
1222
1223SENTINEL = latin('')
1224
1225class _TestConnection(BaseTestCase):
1226
1227 ALLOWED_TYPES = ('processes', 'threads')
1228
1229 def _echo(self, conn):
1230 for msg in iter(conn.recv_bytes, SENTINEL):
1231 conn.send_bytes(msg)
1232 conn.close()
1233
1234 def test_connection(self):
1235 conn, child_conn = self.Pipe()
1236
1237 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001238 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001239 p.start()
1240
1241 seq = [1, 2.25, None]
1242 msg = latin('hello world')
1243 longmsg = msg * 10
1244 arr = array.array('i', list(range(4)))
1245
1246 if self.TYPE == 'processes':
1247 self.assertEqual(type(conn.fileno()), int)
1248
1249 self.assertEqual(conn.send(seq), None)
1250 self.assertEqual(conn.recv(), seq)
1251
1252 self.assertEqual(conn.send_bytes(msg), None)
1253 self.assertEqual(conn.recv_bytes(), msg)
1254
1255 if self.TYPE == 'processes':
1256 buffer = array.array('i', [0]*10)
1257 expected = list(arr) + [0] * (10 - len(arr))
1258 self.assertEqual(conn.send_bytes(arr), None)
1259 self.assertEqual(conn.recv_bytes_into(buffer),
1260 len(arr) * buffer.itemsize)
1261 self.assertEqual(list(buffer), expected)
1262
1263 buffer = array.array('i', [0]*10)
1264 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1265 self.assertEqual(conn.send_bytes(arr), None)
1266 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1267 len(arr) * buffer.itemsize)
1268 self.assertEqual(list(buffer), expected)
1269
1270 buffer = bytearray(latin(' ' * 40))
1271 self.assertEqual(conn.send_bytes(longmsg), None)
1272 try:
1273 res = conn.recv_bytes_into(buffer)
1274 except multiprocessing.BufferTooShort as e:
1275 self.assertEqual(e.args, (longmsg,))
1276 else:
1277 self.fail('expected BufferTooShort, got %s' % res)
1278
1279 poll = TimingWrapper(conn.poll)
1280
1281 self.assertEqual(poll(), False)
1282 self.assertTimingAlmostEqual(poll.elapsed, 0)
1283
1284 self.assertEqual(poll(TIMEOUT1), False)
1285 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1286
1287 conn.send(None)
1288
1289 self.assertEqual(poll(TIMEOUT1), True)
1290 self.assertTimingAlmostEqual(poll.elapsed, 0)
1291
1292 self.assertEqual(conn.recv(), None)
1293
1294 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1295 conn.send_bytes(really_big_msg)
1296 self.assertEqual(conn.recv_bytes(), really_big_msg)
1297
1298 conn.send_bytes(SENTINEL) # tell child to quit
1299 child_conn.close()
1300
1301 if self.TYPE == 'processes':
1302 self.assertEqual(conn.readable, True)
1303 self.assertEqual(conn.writable, True)
1304 self.assertRaises(EOFError, conn.recv)
1305 self.assertRaises(EOFError, conn.recv_bytes)
1306
1307 p.join()
1308
1309 def test_duplex_false(self):
1310 reader, writer = self.Pipe(duplex=False)
1311 self.assertEqual(writer.send(1), None)
1312 self.assertEqual(reader.recv(), 1)
1313 if self.TYPE == 'processes':
1314 self.assertEqual(reader.readable, True)
1315 self.assertEqual(reader.writable, False)
1316 self.assertEqual(writer.readable, False)
1317 self.assertEqual(writer.writable, True)
1318 self.assertRaises(IOError, reader.send, 2)
1319 self.assertRaises(IOError, writer.recv)
1320 self.assertRaises(IOError, writer.poll)
1321
1322 def test_spawn_close(self):
1323 # We test that a pipe connection can be closed by parent
1324 # process immediately after child is spawned. On Windows this
1325 # would have sometimes failed on old versions because
1326 # child_conn would be closed before the child got a chance to
1327 # duplicate it.
1328 conn, child_conn = self.Pipe()
1329
1330 p = self.Process(target=self._echo, args=(child_conn,))
1331 p.start()
1332 child_conn.close() # this might complete before child initializes
1333
1334 msg = latin('hello')
1335 conn.send_bytes(msg)
1336 self.assertEqual(conn.recv_bytes(), msg)
1337
1338 conn.send_bytes(SENTINEL)
1339 conn.close()
1340 p.join()
1341
1342 def test_sendbytes(self):
1343 if self.TYPE != 'processes':
1344 return
1345
1346 msg = latin('abcdefghijklmnopqrstuvwxyz')
1347 a, b = self.Pipe()
1348
1349 a.send_bytes(msg)
1350 self.assertEqual(b.recv_bytes(), msg)
1351
1352 a.send_bytes(msg, 5)
1353 self.assertEqual(b.recv_bytes(), msg[5:])
1354
1355 a.send_bytes(msg, 7, 8)
1356 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1357
1358 a.send_bytes(msg, 26)
1359 self.assertEqual(b.recv_bytes(), latin(''))
1360
1361 a.send_bytes(msg, 26, 0)
1362 self.assertEqual(b.recv_bytes(), latin(''))
1363
1364 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1365
1366 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1367
1368 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1369
1370 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1371
1372 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1373
Benjamin Petersone711caf2008-06-11 16:44:04 +00001374class _TestListenerClient(BaseTestCase):
1375
1376 ALLOWED_TYPES = ('processes', 'threads')
1377
1378 def _test(self, address):
1379 conn = self.connection.Client(address)
1380 conn.send('hello')
1381 conn.close()
1382
1383 def test_listener_client(self):
1384 for family in self.connection.families:
1385 l = self.connection.Listener(family=family)
1386 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001387 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001388 p.start()
1389 conn = l.accept()
1390 self.assertEqual(conn.recv(), 'hello')
1391 p.join()
1392 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001393#
1394# Test of sending connection and socket objects between processes
1395#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001396"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001397class _TestPicklingConnections(BaseTestCase):
1398
1399 ALLOWED_TYPES = ('processes',)
1400
1401 def _listener(self, conn, families):
1402 for fam in families:
1403 l = self.connection.Listener(family=fam)
1404 conn.send(l.address)
1405 new_conn = l.accept()
1406 conn.send(new_conn)
1407
1408 if self.TYPE == 'processes':
1409 l = socket.socket()
1410 l.bind(('localhost', 0))
1411 conn.send(l.getsockname())
1412 l.listen(1)
1413 new_conn, addr = l.accept()
1414 conn.send(new_conn)
1415
1416 conn.recv()
1417
1418 def _remote(self, conn):
1419 for (address, msg) in iter(conn.recv, None):
1420 client = self.connection.Client(address)
1421 client.send(msg.upper())
1422 client.close()
1423
1424 if self.TYPE == 'processes':
1425 address, msg = conn.recv()
1426 client = socket.socket()
1427 client.connect(address)
1428 client.sendall(msg.upper())
1429 client.close()
1430
1431 conn.close()
1432
1433 def test_pickling(self):
1434 try:
1435 multiprocessing.allow_connection_pickling()
1436 except ImportError:
1437 return
1438
1439 families = self.connection.families
1440
1441 lconn, lconn0 = self.Pipe()
1442 lp = self.Process(target=self._listener, args=(lconn0, families))
1443 lp.start()
1444 lconn0.close()
1445
1446 rconn, rconn0 = self.Pipe()
1447 rp = self.Process(target=self._remote, args=(rconn0,))
1448 rp.start()
1449 rconn0.close()
1450
1451 for fam in families:
1452 msg = ('This connection uses family %s' % fam).encode('ascii')
1453 address = lconn.recv()
1454 rconn.send((address, msg))
1455 new_conn = lconn.recv()
1456 self.assertEqual(new_conn.recv(), msg.upper())
1457
1458 rconn.send(None)
1459
1460 if self.TYPE == 'processes':
1461 msg = latin('This connection uses a normal socket')
1462 address = lconn.recv()
1463 rconn.send((address, msg))
1464 if hasattr(socket, 'fromfd'):
1465 new_conn = lconn.recv()
1466 self.assertEqual(new_conn.recv(100), msg.upper())
1467 else:
1468 # XXX On Windows with Py2.6 need to backport fromfd()
1469 discard = lconn.recv_bytes()
1470
1471 lconn.send(None)
1472
1473 rconn.close()
1474 lconn.close()
1475
1476 lp.join()
1477 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001478"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001479#
1480#
1481#
1482
1483class _TestHeap(BaseTestCase):
1484
1485 ALLOWED_TYPES = ('processes',)
1486
1487 def test_heap(self):
1488 iterations = 5000
1489 maxblocks = 50
1490 blocks = []
1491
1492 # create and destroy lots of blocks of different sizes
1493 for i in range(iterations):
1494 size = int(random.lognormvariate(0, 1) * 1000)
1495 b = multiprocessing.heap.BufferWrapper(size)
1496 blocks.append(b)
1497 if len(blocks) > maxblocks:
1498 i = random.randrange(maxblocks)
1499 del blocks[i]
1500
1501 # get the heap object
1502 heap = multiprocessing.heap.BufferWrapper._heap
1503
1504 # verify the state of the heap
1505 all = []
1506 occupied = 0
1507 for L in list(heap._len_to_seq.values()):
1508 for arena, start, stop in L:
1509 all.append((heap._arenas.index(arena), start, stop,
1510 stop-start, 'free'))
1511 for arena, start, stop in heap._allocated_blocks:
1512 all.append((heap._arenas.index(arena), start, stop,
1513 stop-start, 'occupied'))
1514 occupied += (stop-start)
1515
1516 all.sort()
1517
1518 for i in range(len(all)-1):
1519 (arena, start, stop) = all[i][:3]
1520 (narena, nstart, nstop) = all[i+1][:3]
1521 self.assertTrue((arena != narena and nstart == 0) or
1522 (stop == nstart))
1523
1524#
1525#
1526#
1527
1528try:
1529 from ctypes import Structure, Value, copy, c_int, c_double
1530except ImportError:
1531 Structure = object
1532 c_int = c_double = None
1533
1534class _Foo(Structure):
1535 _fields_ = [
1536 ('x', c_int),
1537 ('y', c_double)
1538 ]
1539
1540class _TestSharedCTypes(BaseTestCase):
1541
1542 ALLOWED_TYPES = ('processes',)
1543
1544 def _double(self, x, y, foo, arr, string):
1545 x.value *= 2
1546 y.value *= 2
1547 foo.x *= 2
1548 foo.y *= 2
1549 string.value *= 2
1550 for i in range(len(arr)):
1551 arr[i] *= 2
1552
1553 def test_sharedctypes(self, lock=False):
1554 if c_int is None:
1555 return
1556
1557 x = Value('i', 7, lock=lock)
1558 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1559 foo = Value(_Foo, 3, 2, lock=lock)
1560 arr = Array('d', list(range(10)), lock=lock)
1561 string = Array('c', 20, lock=lock)
1562 string.value = 'hello'
1563
1564 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1565 p.start()
1566 p.join()
1567
1568 self.assertEqual(x.value, 14)
1569 self.assertAlmostEqual(y.value, 2.0/3.0)
1570 self.assertEqual(foo.x, 6)
1571 self.assertAlmostEqual(foo.y, 4.0)
1572 for i in range(10):
1573 self.assertAlmostEqual(arr[i], i*2)
1574 self.assertEqual(string.value, latin('hellohello'))
1575
1576 def test_synchronize(self):
1577 self.test_sharedctypes(lock=True)
1578
1579 def test_copy(self):
1580 if c_int is None:
1581 return
1582
1583 foo = _Foo(2, 5.0)
1584 bar = copy(foo)
1585 foo.x = 0
1586 foo.y = 0
1587 self.assertEqual(bar.x, 2)
1588 self.assertAlmostEqual(bar.y, 5.0)
1589
1590#
1591#
1592#
1593
1594class _TestFinalize(BaseTestCase):
1595
1596 ALLOWED_TYPES = ('processes',)
1597
1598 def _test_finalize(self, conn):
1599 class Foo(object):
1600 pass
1601
1602 a = Foo()
1603 util.Finalize(a, conn.send, args=('a',))
1604 del a # triggers callback for a
1605
1606 b = Foo()
1607 close_b = util.Finalize(b, conn.send, args=('b',))
1608 close_b() # triggers callback for b
1609 close_b() # does nothing because callback has already been called
1610 del b # does nothing because callback has already been called
1611
1612 c = Foo()
1613 util.Finalize(c, conn.send, args=('c',))
1614
1615 d10 = Foo()
1616 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1617
1618 d01 = Foo()
1619 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1620 d02 = Foo()
1621 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1622 d03 = Foo()
1623 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1624
1625 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1626
1627 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1628
1629 # call mutliprocessing's cleanup function then exit process without
1630 # garbage collecting locals
1631 util._exit_function()
1632 conn.close()
1633 os._exit(0)
1634
1635 def test_finalize(self):
1636 conn, child_conn = self.Pipe()
1637
1638 p = self.Process(target=self._test_finalize, args=(child_conn,))
1639 p.start()
1640 p.join()
1641
1642 result = [obj for obj in iter(conn.recv, 'STOP')]
1643 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1644
1645#
1646# Test that from ... import * works for each module
1647#
1648
1649class _TestImportStar(BaseTestCase):
1650
1651 ALLOWED_TYPES = ('processes',)
1652
1653 def test_import(self):
1654 modules = (
1655 'multiprocessing', 'multiprocessing.connection',
1656 'multiprocessing.heap', 'multiprocessing.managers',
1657 'multiprocessing.pool', 'multiprocessing.process',
1658 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1659 'multiprocessing.synchronize', 'multiprocessing.util'
1660 )
1661
1662 for name in modules:
1663 __import__(name)
1664 mod = sys.modules[name]
1665
1666 for attr in getattr(mod, '__all__', ()):
1667 self.assertTrue(
1668 hasattr(mod, attr),
1669 '%r does not have attribute %r' % (mod, attr)
1670 )
1671
1672#
1673# Quick test that logging works -- does not test logging output
1674#
1675
1676class _TestLogging(BaseTestCase):
1677
1678 ALLOWED_TYPES = ('processes',)
1679
1680 def test_enable_logging(self):
1681 logger = multiprocessing.get_logger()
1682 logger.setLevel(util.SUBWARNING)
1683 self.assertTrue(logger is not None)
1684 logger.debug('this will not be printed')
1685 logger.info('nor will this')
1686 logger.setLevel(LOG_LEVEL)
1687
1688 def _test_level(self, conn):
1689 logger = multiprocessing.get_logger()
1690 conn.send(logger.getEffectiveLevel())
1691
1692 def test_level(self):
1693 LEVEL1 = 32
1694 LEVEL2 = 37
1695
1696 logger = multiprocessing.get_logger()
1697 root_logger = logging.getLogger()
1698 root_level = root_logger.level
1699
1700 reader, writer = multiprocessing.Pipe(duplex=False)
1701
1702 logger.setLevel(LEVEL1)
1703 self.Process(target=self._test_level, args=(writer,)).start()
1704 self.assertEqual(LEVEL1, reader.recv())
1705
1706 logger.setLevel(logging.NOTSET)
1707 root_logger.setLevel(LEVEL2)
1708 self.Process(target=self._test_level, args=(writer,)).start()
1709 self.assertEqual(LEVEL2, reader.recv())
1710
1711 root_logger.setLevel(root_level)
1712 logger.setLevel(level=LOG_LEVEL)
1713
1714#
Jesse Noller6214edd2009-01-19 16:23:53 +00001715# Test to verify handle verification, see issue 3321
1716#
1717
1718class TestInvalidHandle(unittest.TestCase):
1719
1720 def test_invalid_handles(self):
1721 if WIN32:
1722 return
1723 conn = _multiprocessing.Connection(44977608)
1724 self.assertRaises(IOError, conn.poll)
1725 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1726#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001727# Functions used to create test cases from the base ones in this module
1728#
1729
1730def get_attributes(Source, names):
1731 d = {}
1732 for name in names:
1733 obj = getattr(Source, name)
1734 if type(obj) == type(get_attributes):
1735 obj = staticmethod(obj)
1736 d[name] = obj
1737 return d
1738
1739def create_test_cases(Mixin, type):
1740 result = {}
1741 glob = globals()
1742 Type = type[0].upper() + type[1:]
1743
1744 for name in list(glob.keys()):
1745 if name.startswith('_Test'):
1746 base = glob[name]
1747 if type in base.ALLOWED_TYPES:
1748 newname = 'With' + Type + name[1:]
1749 class Temp(base, unittest.TestCase, Mixin):
1750 pass
1751 result[newname] = Temp
1752 Temp.__name__ = newname
1753 Temp.__module__ = Mixin.__module__
1754 return result
1755
1756#
1757# Create test cases
1758#
1759
1760class ProcessesMixin(object):
1761 TYPE = 'processes'
1762 Process = multiprocessing.Process
1763 locals().update(get_attributes(multiprocessing, (
1764 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1765 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1766 'RawArray', 'current_process', 'active_children', 'Pipe',
1767 'connection', 'JoinableQueue'
1768 )))
1769
1770testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1771globals().update(testcases_processes)
1772
1773
1774class ManagerMixin(object):
1775 TYPE = 'manager'
1776 Process = multiprocessing.Process
1777 manager = object.__new__(multiprocessing.managers.SyncManager)
1778 locals().update(get_attributes(manager, (
1779 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1780 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1781 'Namespace', 'JoinableQueue'
1782 )))
1783
1784testcases_manager = create_test_cases(ManagerMixin, type='manager')
1785globals().update(testcases_manager)
1786
1787
1788class ThreadsMixin(object):
1789 TYPE = 'threads'
1790 Process = multiprocessing.dummy.Process
1791 locals().update(get_attributes(multiprocessing.dummy, (
1792 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1793 'Condition', 'Event', 'Value', 'Array', 'current_process',
1794 'active_children', 'Pipe', 'connection', 'dict', 'list',
1795 'Namespace', 'JoinableQueue'
1796 )))
1797
1798testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1799globals().update(testcases_threads)
1800
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001801class OtherTest(unittest.TestCase):
1802 # TODO: add more tests for deliver/answer challenge.
1803 def test_deliver_challenge_auth_failure(self):
1804 class _FakeConnection(object):
1805 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001806 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001807 def send_bytes(self, data):
1808 pass
1809 self.assertRaises(multiprocessing.AuthenticationError,
1810 multiprocessing.connection.deliver_challenge,
1811 _FakeConnection(), b'abc')
1812
1813 def test_answer_challenge_auth_failure(self):
1814 class _FakeConnection(object):
1815 def __init__(self):
1816 self.count = 0
1817 def recv_bytes(self, size):
1818 self.count += 1
1819 if self.count == 1:
1820 return multiprocessing.connection.CHALLENGE
1821 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001822 return b'something bogus'
1823 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001824 def send_bytes(self, data):
1825 pass
1826 self.assertRaises(multiprocessing.AuthenticationError,
1827 multiprocessing.connection.answer_challenge,
1828 _FakeConnection(), b'abc')
1829
Jesse Noller6214edd2009-01-19 16:23:53 +00001830testcases_other = [OtherTest, TestInvalidHandle]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001831
Benjamin Petersone711caf2008-06-11 16:44:04 +00001832#
1833#
1834#
1835
1836def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001837 if sys.platform.startswith("linux"):
1838 try:
1839 lock = multiprocessing.RLock()
1840 except OSError:
Amaury Forgeot d'Arc620626b2008-06-19 22:03:50 +00001841 from test.support import TestSkipped
Benjamin Petersone549ead2009-03-28 21:42:05 +00001842 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001843
Benjamin Petersone711caf2008-06-11 16:44:04 +00001844 if run is None:
1845 from test.support import run_unittest as run
1846
1847 util.get_temp_dir() # creates temp directory for use by all processes
1848
1849 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1850
Benjamin Peterson41181742008-07-02 20:22:54 +00001851 ProcessesMixin.pool = multiprocessing.Pool(4)
1852 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1853 ManagerMixin.manager.__init__()
1854 ManagerMixin.manager.start()
1855 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001856
1857 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001858 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1859 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001860 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1861 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001862 )
1863
1864 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1865 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1866 run(suite)
1867
Benjamin Peterson41181742008-07-02 20:22:54 +00001868 ThreadsMixin.pool.terminate()
1869 ProcessesMixin.pool.terminate()
1870 ManagerMixin.pool.terminate()
1871 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001872
Benjamin Peterson41181742008-07-02 20:22:54 +00001873 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001874
1875def main():
1876 test_main(unittest.TextTestRunner(verbosity=2).run)
1877
1878if __name__ == '__main__':
1879 main()