blob: 3681c60d50d0c8677fa95b37a0e1adea6e27ad0a [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
22# Work around broken sem_open implementations
23try:
24 import multiprocessing.synchronize
25except ImportError as e:
26 from test.test_support import TestSkipped
27 raise TestSkipped(e)
28
Benjamin Petersone711caf2008-06-11 16:44:04 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.pool
34import _multiprocessing
35
36from multiprocessing import util
37
38#
39#
40#
41
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000042def latin(s):
43 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000044
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46# Constants
47#
48
49LOG_LEVEL = util.SUBWARNING
50#LOG_LEVEL = logging.WARNING
51
52DELTA = 0.1
53CHECK_TIMINGS = False # making true makes tests take a lot longer
54 # and can sometimes cause some non-serious
55 # failures because some calls block a bit
56 # longer than expected
57if CHECK_TIMINGS:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
59else:
60 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
61
62HAVE_GETVALUE = not getattr(_multiprocessing,
63 'HAVE_BROKEN_SEM_GETVALUE', False)
64
Jesse Noller6214edd2009-01-19 16:23:53 +000065WIN32 = (sys.platform == "win32")
66
Benjamin Petersone711caf2008-06-11 16:44:04 +000067#
68# Creates a wrapper for a function which records the time it takes to finish
69#
70
71class TimingWrapper(object):
72
73 def __init__(self, func):
74 self.func = func
75 self.elapsed = None
76
77 def __call__(self, *args, **kwds):
78 t = time.time()
79 try:
80 return self.func(*args, **kwds)
81 finally:
82 self.elapsed = time.time() - t
83
84#
85# Base class for test cases
86#
87
88class BaseTestCase(object):
89
90 ALLOWED_TYPES = ('processes', 'manager', 'threads')
91
92 def assertTimingAlmostEqual(self, a, b):
93 if CHECK_TIMINGS:
94 self.assertAlmostEqual(a, b, 1)
95
96 def assertReturnsIfImplemented(self, value, func, *args):
97 try:
98 res = func(*args)
99 except NotImplementedError:
100 pass
101 else:
102 return self.assertEqual(value, res)
103
104#
105# Return the value of a semaphore
106#
107
108def get_value(self):
109 try:
110 return self.get_value()
111 except AttributeError:
112 try:
113 return self._Semaphore__value
114 except AttributeError:
115 try:
116 return self._value
117 except AttributeError:
118 raise NotImplementedError
119
120#
121# Testcases
122#
123
124class _TestProcess(BaseTestCase):
125
126 ALLOWED_TYPES = ('processes', 'threads')
127
128 def test_current(self):
129 if self.TYPE == 'threads':
130 return
131
132 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000133 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000134
135 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000136 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000137 self.assertTrue(isinstance(authkey, bytes))
138 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000139 self.assertEqual(current.ident, os.getpid())
140 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141
142 def _test(self, q, *args, **kwds):
143 current = self.current_process()
144 q.put(args)
145 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000146 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000148 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 q.put(current.pid)
150
151 def test_process(self):
152 q = self.Queue(1)
153 e = self.Event()
154 args = (q, 1, 2)
155 kwargs = {'hello':23, 'bye':2.54}
156 name = 'SomeProcess'
157 p = self.Process(
158 target=self._test, args=args, kwargs=kwargs, name=name
159 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161 current = self.current_process()
162
163 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000166 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000167 self.assertTrue(p not in self.active_children())
168 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000169 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170
171 p.start()
172
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000173 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 self.assertEquals(p.is_alive(), True)
175 self.assertTrue(p in self.active_children())
176
177 self.assertEquals(q.get(), args[1:])
178 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000179 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000181 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182 self.assertEquals(q.get(), p.pid)
183
184 p.join()
185
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 self.assertEquals(p.is_alive(), False)
188 self.assertTrue(p not in self.active_children())
189
190 def _test_terminate(self):
191 time.sleep(1000)
192
193 def test_terminate(self):
194 if self.TYPE == 'threads':
195 return
196
197 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 p.start()
200
201 self.assertEqual(p.is_alive(), True)
202 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204
205 p.terminate()
206
207 join = TimingWrapper(p.join)
208 self.assertEqual(join(), None)
209 self.assertTimingAlmostEqual(join.elapsed, 0.0)
210
211 self.assertEqual(p.is_alive(), False)
212 self.assertTrue(p not in self.active_children())
213
214 p.join()
215
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000216 # XXX sometimes get p.exitcode == 0 on Windows ...
217 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218
219 def test_cpu_count(self):
220 try:
221 cpus = multiprocessing.cpu_count()
222 except NotImplementedError:
223 cpus = 1
224 self.assertTrue(type(cpus) is int)
225 self.assertTrue(cpus >= 1)
226
227 def test_active_children(self):
228 self.assertEqual(type(self.active_children()), list)
229
230 p = self.Process(target=time.sleep, args=(DELTA,))
231 self.assertTrue(p not in self.active_children())
232
233 p.start()
234 self.assertTrue(p in self.active_children())
235
236 p.join()
237 self.assertTrue(p not in self.active_children())
238
239 def _test_recursion(self, wconn, id):
240 from multiprocessing import forking
241 wconn.send(id)
242 if len(id) < 2:
243 for i in range(2):
244 p = self.Process(
245 target=self._test_recursion, args=(wconn, id+[i])
246 )
247 p.start()
248 p.join()
249
250 def test_recursion(self):
251 rconn, wconn = self.Pipe(duplex=False)
252 self._test_recursion(wconn, [])
253
254 time.sleep(DELTA)
255 result = []
256 while rconn.poll():
257 result.append(rconn.recv())
258
259 expected = [
260 [],
261 [0],
262 [0, 0],
263 [0, 1],
264 [1],
265 [1, 0],
266 [1, 1]
267 ]
268 self.assertEqual(result, expected)
269
270#
271#
272#
273
274class _UpperCaser(multiprocessing.Process):
275
276 def __init__(self):
277 multiprocessing.Process.__init__(self)
278 self.child_conn, self.parent_conn = multiprocessing.Pipe()
279
280 def run(self):
281 self.parent_conn.close()
282 for s in iter(self.child_conn.recv, None):
283 self.child_conn.send(s.upper())
284 self.child_conn.close()
285
286 def submit(self, s):
287 assert type(s) is str
288 self.parent_conn.send(s)
289 return self.parent_conn.recv()
290
291 def stop(self):
292 self.parent_conn.send(None)
293 self.parent_conn.close()
294 self.child_conn.close()
295
296class _TestSubclassingProcess(BaseTestCase):
297
298 ALLOWED_TYPES = ('processes',)
299
300 def test_subclassing(self):
301 uppercaser = _UpperCaser()
302 uppercaser.start()
303 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
304 self.assertEqual(uppercaser.submit('world'), 'WORLD')
305 uppercaser.stop()
306 uppercaser.join()
307
308#
309#
310#
311
312def queue_empty(q):
313 if hasattr(q, 'empty'):
314 return q.empty()
315 else:
316 return q.qsize() == 0
317
318def queue_full(q, maxsize):
319 if hasattr(q, 'full'):
320 return q.full()
321 else:
322 return q.qsize() == maxsize
323
324
325class _TestQueue(BaseTestCase):
326
327
328 def _test_put(self, queue, child_can_start, parent_can_continue):
329 child_can_start.wait()
330 for i in range(6):
331 queue.get()
332 parent_can_continue.set()
333
334 def test_put(self):
335 MAXSIZE = 6
336 queue = self.Queue(maxsize=MAXSIZE)
337 child_can_start = self.Event()
338 parent_can_continue = self.Event()
339
340 proc = self.Process(
341 target=self._test_put,
342 args=(queue, child_can_start, parent_can_continue)
343 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000344 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345 proc.start()
346
347 self.assertEqual(queue_empty(queue), True)
348 self.assertEqual(queue_full(queue, MAXSIZE), False)
349
350 queue.put(1)
351 queue.put(2, True)
352 queue.put(3, True, None)
353 queue.put(4, False)
354 queue.put(5, False, None)
355 queue.put_nowait(6)
356
357 # the values may be in buffer but not yet in pipe so sleep a bit
358 time.sleep(DELTA)
359
360 self.assertEqual(queue_empty(queue), False)
361 self.assertEqual(queue_full(queue, MAXSIZE), True)
362
363 put = TimingWrapper(queue.put)
364 put_nowait = TimingWrapper(queue.put_nowait)
365
366 self.assertRaises(pyqueue.Full, put, 7, False)
367 self.assertTimingAlmostEqual(put.elapsed, 0)
368
369 self.assertRaises(pyqueue.Full, put, 7, False, None)
370 self.assertTimingAlmostEqual(put.elapsed, 0)
371
372 self.assertRaises(pyqueue.Full, put_nowait, 7)
373 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
374
375 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
376 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
377
378 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
379 self.assertTimingAlmostEqual(put.elapsed, 0)
380
381 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
382 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
383
384 child_can_start.set()
385 parent_can_continue.wait()
386
387 self.assertEqual(queue_empty(queue), True)
388 self.assertEqual(queue_full(queue, MAXSIZE), False)
389
390 proc.join()
391
392 def _test_get(self, queue, child_can_start, parent_can_continue):
393 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000394 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395 queue.put(2)
396 queue.put(3)
397 queue.put(4)
398 queue.put(5)
399 parent_can_continue.set()
400
401 def test_get(self):
402 queue = self.Queue()
403 child_can_start = self.Event()
404 parent_can_continue = self.Event()
405
406 proc = self.Process(
407 target=self._test_get,
408 args=(queue, child_can_start, parent_can_continue)
409 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000410 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000411 proc.start()
412
413 self.assertEqual(queue_empty(queue), True)
414
415 child_can_start.set()
416 parent_can_continue.wait()
417
418 time.sleep(DELTA)
419 self.assertEqual(queue_empty(queue), False)
420
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000421 # Hangs unexpectedly, remove for now
422 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 self.assertEqual(queue.get(True, None), 2)
424 self.assertEqual(queue.get(True), 3)
425 self.assertEqual(queue.get(timeout=1), 4)
426 self.assertEqual(queue.get_nowait(), 5)
427
428 self.assertEqual(queue_empty(queue), True)
429
430 get = TimingWrapper(queue.get)
431 get_nowait = TimingWrapper(queue.get_nowait)
432
433 self.assertRaises(pyqueue.Empty, get, False)
434 self.assertTimingAlmostEqual(get.elapsed, 0)
435
436 self.assertRaises(pyqueue.Empty, get, False, None)
437 self.assertTimingAlmostEqual(get.elapsed, 0)
438
439 self.assertRaises(pyqueue.Empty, get_nowait)
440 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
441
442 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
443 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
444
445 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
446 self.assertTimingAlmostEqual(get.elapsed, 0)
447
448 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
449 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
450
451 proc.join()
452
453 def _test_fork(self, queue):
454 for i in range(10, 20):
455 queue.put(i)
456 # note that at this point the items may only be buffered, so the
457 # process cannot shutdown until the feeder thread has finished
458 # pushing items onto the pipe.
459
460 def test_fork(self):
461 # Old versions of Queue would fail to create a new feeder
462 # thread for a forked process if the original process had its
463 # own feeder thread. This test checks that this no longer
464 # happens.
465
466 queue = self.Queue()
467
468 # put items on queue so that main process starts a feeder thread
469 for i in range(10):
470 queue.put(i)
471
472 # wait to make sure thread starts before we fork a new process
473 time.sleep(DELTA)
474
475 # fork process
476 p = self.Process(target=self._test_fork, args=(queue,))
477 p.start()
478
479 # check that all expected items are in the queue
480 for i in range(20):
481 self.assertEqual(queue.get(), i)
482 self.assertRaises(pyqueue.Empty, queue.get, False)
483
484 p.join()
485
486 def test_qsize(self):
487 q = self.Queue()
488 try:
489 self.assertEqual(q.qsize(), 0)
490 except NotImplementedError:
491 return
492 q.put(1)
493 self.assertEqual(q.qsize(), 1)
494 q.put(5)
495 self.assertEqual(q.qsize(), 2)
496 q.get()
497 self.assertEqual(q.qsize(), 1)
498 q.get()
499 self.assertEqual(q.qsize(), 0)
500
501 def _test_task_done(self, q):
502 for obj in iter(q.get, None):
503 time.sleep(DELTA)
504 q.task_done()
505
506 def test_task_done(self):
507 queue = self.JoinableQueue()
508
509 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
510 return
511
512 workers = [self.Process(target=self._test_task_done, args=(queue,))
513 for i in range(4)]
514
515 for p in workers:
516 p.start()
517
518 for i in range(10):
519 queue.put(i)
520
521 queue.join()
522
523 for p in workers:
524 queue.put(None)
525
526 for p in workers:
527 p.join()
528
529#
530#
531#
532
533class _TestLock(BaseTestCase):
534
535 def test_lock(self):
536 lock = self.Lock()
537 self.assertEqual(lock.acquire(), True)
538 self.assertEqual(lock.acquire(False), False)
539 self.assertEqual(lock.release(), None)
540 self.assertRaises((ValueError, threading.ThreadError), lock.release)
541
542 def test_rlock(self):
543 lock = self.RLock()
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.acquire(), True)
547 self.assertEqual(lock.release(), None)
548 self.assertEqual(lock.release(), None)
549 self.assertEqual(lock.release(), None)
550 self.assertRaises((AssertionError, RuntimeError), lock.release)
551
552
553class _TestSemaphore(BaseTestCase):
554
555 def _test_semaphore(self, sem):
556 self.assertReturnsIfImplemented(2, get_value, sem)
557 self.assertEqual(sem.acquire(), True)
558 self.assertReturnsIfImplemented(1, get_value, sem)
559 self.assertEqual(sem.acquire(), True)
560 self.assertReturnsIfImplemented(0, get_value, sem)
561 self.assertEqual(sem.acquire(False), False)
562 self.assertReturnsIfImplemented(0, get_value, sem)
563 self.assertEqual(sem.release(), None)
564 self.assertReturnsIfImplemented(1, get_value, sem)
565 self.assertEqual(sem.release(), None)
566 self.assertReturnsIfImplemented(2, get_value, sem)
567
568 def test_semaphore(self):
569 sem = self.Semaphore(2)
570 self._test_semaphore(sem)
571 self.assertEqual(sem.release(), None)
572 self.assertReturnsIfImplemented(3, get_value, sem)
573 self.assertEqual(sem.release(), None)
574 self.assertReturnsIfImplemented(4, get_value, sem)
575
576 def test_bounded_semaphore(self):
577 sem = self.BoundedSemaphore(2)
578 self._test_semaphore(sem)
579 # Currently fails on OS/X
580 #if HAVE_GETVALUE:
581 # self.assertRaises(ValueError, sem.release)
582 # self.assertReturnsIfImplemented(2, get_value, sem)
583
584 def test_timeout(self):
585 if self.TYPE != 'processes':
586 return
587
588 sem = self.Semaphore(0)
589 acquire = TimingWrapper(sem.acquire)
590
591 self.assertEqual(acquire(False), False)
592 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
593
594 self.assertEqual(acquire(False, None), False)
595 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
596
597 self.assertEqual(acquire(False, TIMEOUT1), False)
598 self.assertTimingAlmostEqual(acquire.elapsed, 0)
599
600 self.assertEqual(acquire(True, TIMEOUT2), False)
601 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
602
603 self.assertEqual(acquire(timeout=TIMEOUT3), False)
604 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
605
606
607class _TestCondition(BaseTestCase):
608
609 def f(self, cond, sleeping, woken, timeout=None):
610 cond.acquire()
611 sleeping.release()
612 cond.wait(timeout)
613 woken.release()
614 cond.release()
615
616 def check_invariant(self, cond):
617 # this is only supposed to succeed when there are no sleepers
618 if self.TYPE == 'processes':
619 try:
620 sleepers = (cond._sleeping_count.get_value() -
621 cond._woken_count.get_value())
622 self.assertEqual(sleepers, 0)
623 self.assertEqual(cond._wait_semaphore.get_value(), 0)
624 except NotImplementedError:
625 pass
626
627 def test_notify(self):
628 cond = self.Condition()
629 sleeping = self.Semaphore(0)
630 woken = self.Semaphore(0)
631
632 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000633 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000634 p.start()
635
636 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000637 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000638 p.start()
639
640 # wait for both children to start sleeping
641 sleeping.acquire()
642 sleeping.acquire()
643
644 # check no process/thread has woken up
645 time.sleep(DELTA)
646 self.assertReturnsIfImplemented(0, get_value, woken)
647
648 # wake up one process/thread
649 cond.acquire()
650 cond.notify()
651 cond.release()
652
653 # check one process/thread has woken up
654 time.sleep(DELTA)
655 self.assertReturnsIfImplemented(1, get_value, woken)
656
657 # wake up another
658 cond.acquire()
659 cond.notify()
660 cond.release()
661
662 # check other has woken up
663 time.sleep(DELTA)
664 self.assertReturnsIfImplemented(2, get_value, woken)
665
666 # check state is not mucked up
667 self.check_invariant(cond)
668 p.join()
669
670 def test_notify_all(self):
671 cond = self.Condition()
672 sleeping = self.Semaphore(0)
673 woken = self.Semaphore(0)
674
675 # start some threads/processes which will timeout
676 for i in range(3):
677 p = self.Process(target=self.f,
678 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000679 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000680 p.start()
681
682 t = threading.Thread(target=self.f,
683 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000684 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000685 t.start()
686
687 # wait for them all to sleep
688 for i in range(6):
689 sleeping.acquire()
690
691 # check they have all timed out
692 for i in range(6):
693 woken.acquire()
694 self.assertReturnsIfImplemented(0, get_value, woken)
695
696 # check state is not mucked up
697 self.check_invariant(cond)
698
699 # start some more threads/processes
700 for i in range(3):
701 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000702 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000703 p.start()
704
705 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000706 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000707 t.start()
708
709 # wait for them to all sleep
710 for i in range(6):
711 sleeping.acquire()
712
713 # check no process/thread has woken up
714 time.sleep(DELTA)
715 self.assertReturnsIfImplemented(0, get_value, woken)
716
717 # wake them all up
718 cond.acquire()
719 cond.notify_all()
720 cond.release()
721
722 # check they have all woken
723 time.sleep(DELTA)
724 self.assertReturnsIfImplemented(6, get_value, woken)
725
726 # check state is not mucked up
727 self.check_invariant(cond)
728
729 def test_timeout(self):
730 cond = self.Condition()
731 wait = TimingWrapper(cond.wait)
732 cond.acquire()
733 res = wait(TIMEOUT1)
734 cond.release()
735 self.assertEqual(res, None)
736 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
737
738
739class _TestEvent(BaseTestCase):
740
741 def _test_event(self, event):
742 time.sleep(TIMEOUT2)
743 event.set()
744
745 def test_event(self):
746 event = self.Event()
747 wait = TimingWrapper(event.wait)
748
749 # Removed temporaily, due to API shear, this does not
750 # work with threading._Event objects. is_set == isSet
751 #self.assertEqual(event.is_set(), False)
752
753 self.assertEqual(wait(0.0), None)
754 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
755 self.assertEqual(wait(TIMEOUT1), None)
756 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
757
758 event.set()
759
760 # See note above on the API differences
761 # self.assertEqual(event.is_set(), True)
762 self.assertEqual(wait(), None)
763 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
764 self.assertEqual(wait(TIMEOUT1), None)
765 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
766 # self.assertEqual(event.is_set(), True)
767
768 event.clear()
769
770 #self.assertEqual(event.is_set(), False)
771
772 self.Process(target=self._test_event, args=(event,)).start()
773 self.assertEqual(wait(), None)
774
775#
776#
777#
778
779class _TestValue(BaseTestCase):
780
781 codes_values = [
782 ('i', 4343, 24234),
783 ('d', 3.625, -4.25),
784 ('h', -232, 234),
785 ('c', latin('x'), latin('y'))
786 ]
787
788 def _test(self, values):
789 for sv, cv in zip(values, self.codes_values):
790 sv.value = cv[2]
791
792
793 def test_value(self, raw=False):
794 if self.TYPE != 'processes':
795 return
796
797 if raw:
798 values = [self.RawValue(code, value)
799 for code, value, _ in self.codes_values]
800 else:
801 values = [self.Value(code, value)
802 for code, value, _ in self.codes_values]
803
804 for sv, cv in zip(values, self.codes_values):
805 self.assertEqual(sv.value, cv[1])
806
807 proc = self.Process(target=self._test, args=(values,))
808 proc.start()
809 proc.join()
810
811 for sv, cv in zip(values, self.codes_values):
812 self.assertEqual(sv.value, cv[2])
813
814 def test_rawvalue(self):
815 self.test_value(raw=True)
816
817 def test_getobj_getlock(self):
818 if self.TYPE != 'processes':
819 return
820
821 val1 = self.Value('i', 5)
822 lock1 = val1.get_lock()
823 obj1 = val1.get_obj()
824
825 val2 = self.Value('i', 5, lock=None)
826 lock2 = val2.get_lock()
827 obj2 = val2.get_obj()
828
829 lock = self.Lock()
830 val3 = self.Value('i', 5, lock=lock)
831 lock3 = val3.get_lock()
832 obj3 = val3.get_obj()
833 self.assertEqual(lock, lock3)
834
Jesse Nollerb0516a62009-01-18 03:11:38 +0000835 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 self.assertFalse(hasattr(arr4, 'get_lock'))
837 self.assertFalse(hasattr(arr4, 'get_obj'))
838
Jesse Nollerb0516a62009-01-18 03:11:38 +0000839 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
840
841 arr5 = self.RawValue('i', 5)
842 self.assertFalse(hasattr(arr5, 'get_lock'))
843 self.assertFalse(hasattr(arr5, 'get_obj'))
844
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845
846class _TestArray(BaseTestCase):
847
848 def f(self, seq):
849 for i in range(1, len(seq)):
850 seq[i] += seq[i-1]
851
852 def test_array(self, raw=False):
853 if self.TYPE != 'processes':
854 return
855
856 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
857 if raw:
858 arr = self.RawArray('i', seq)
859 else:
860 arr = self.Array('i', seq)
861
862 self.assertEqual(len(arr), len(seq))
863 self.assertEqual(arr[3], seq[3])
864 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
865
866 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
867
868 self.assertEqual(list(arr[:]), seq)
869
870 self.f(seq)
871
872 p = self.Process(target=self.f, args=(arr,))
873 p.start()
874 p.join()
875
876 self.assertEqual(list(arr[:]), seq)
877
878 def test_rawarray(self):
879 self.test_array(raw=True)
880
881 def test_getobj_getlock_obj(self):
882 if self.TYPE != 'processes':
883 return
884
885 arr1 = self.Array('i', list(range(10)))
886 lock1 = arr1.get_lock()
887 obj1 = arr1.get_obj()
888
889 arr2 = self.Array('i', list(range(10)), lock=None)
890 lock2 = arr2.get_lock()
891 obj2 = arr2.get_obj()
892
893 lock = self.Lock()
894 arr3 = self.Array('i', list(range(10)), lock=lock)
895 lock3 = arr3.get_lock()
896 obj3 = arr3.get_obj()
897 self.assertEqual(lock, lock3)
898
Jesse Nollerb0516a62009-01-18 03:11:38 +0000899 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 self.assertFalse(hasattr(arr4, 'get_lock'))
901 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000902 self.assertRaises(AttributeError,
903 self.Array, 'i', range(10), lock='notalock')
904
905 arr5 = self.RawArray('i', range(10))
906 self.assertFalse(hasattr(arr5, 'get_lock'))
907 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908
909#
910#
911#
912
913class _TestContainers(BaseTestCase):
914
915 ALLOWED_TYPES = ('manager',)
916
917 def test_list(self):
918 a = self.list(list(range(10)))
919 self.assertEqual(a[:], list(range(10)))
920
921 b = self.list()
922 self.assertEqual(b[:], [])
923
924 b.extend(list(range(5)))
925 self.assertEqual(b[:], list(range(5)))
926
927 self.assertEqual(b[2], 2)
928 self.assertEqual(b[2:10], [2,3,4])
929
930 b *= 2
931 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
932
933 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
934
935 self.assertEqual(a[:], list(range(10)))
936
937 d = [a, b]
938 e = self.list(d)
939 self.assertEqual(
940 e[:],
941 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
942 )
943
944 f = self.list([a])
945 a.append('hello')
946 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
947
948 def test_dict(self):
949 d = self.dict()
950 indices = list(range(65, 70))
951 for i in indices:
952 d[i] = chr(i)
953 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
954 self.assertEqual(sorted(d.keys()), indices)
955 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
956 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
957
958 def test_namespace(self):
959 n = self.Namespace()
960 n.name = 'Bob'
961 n.job = 'Builder'
962 n._hidden = 'hidden'
963 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
964 del n.job
965 self.assertEqual(str(n), "Namespace(name='Bob')")
966 self.assertTrue(hasattr(n, 'name'))
967 self.assertTrue(not hasattr(n, 'job'))
968
969#
970#
971#
972
973def sqr(x, wait=0.0):
974 time.sleep(wait)
975 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976class _TestPool(BaseTestCase):
977
978 def test_apply(self):
979 papply = self.pool.apply
980 self.assertEqual(papply(sqr, (5,)), sqr(5))
981 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
982
983 def test_map(self):
984 pmap = self.pool.map
985 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
986 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
987 list(map(sqr, list(range(100)))))
988
989 def test_async(self):
990 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
991 get = TimingWrapper(res.get)
992 self.assertEqual(get(), 49)
993 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
994
995 def test_async_timeout(self):
996 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
997 get = TimingWrapper(res.get)
998 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
999 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1000
1001 def test_imap(self):
1002 it = self.pool.imap(sqr, list(range(10)))
1003 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1004
1005 it = self.pool.imap(sqr, list(range(10)))
1006 for i in range(10):
1007 self.assertEqual(next(it), i*i)
1008 self.assertRaises(StopIteration, it.__next__)
1009
1010 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1011 for i in range(1000):
1012 self.assertEqual(next(it), i*i)
1013 self.assertRaises(StopIteration, it.__next__)
1014
1015 def test_imap_unordered(self):
1016 it = self.pool.imap_unordered(sqr, list(range(1000)))
1017 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1018
1019 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1020 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1021
1022 def test_make_pool(self):
1023 p = multiprocessing.Pool(3)
1024 self.assertEqual(3, len(p._pool))
1025 p.close()
1026 p.join()
1027
1028 def test_terminate(self):
1029 if self.TYPE == 'manager':
1030 # On Unix a forked process increfs each shared object to
1031 # which its parent process held a reference. If the
1032 # forked process gets terminated then there is likely to
1033 # be a reference leak. So to prevent
1034 # _TestZZZNumberOfObjects from failing we skip this test
1035 # when using a manager.
1036 return
1037
1038 result = self.pool.map_async(
1039 time.sleep, [0.1 for i in range(10000)], chunksize=1
1040 )
1041 self.pool.terminate()
1042 join = TimingWrapper(self.pool.join)
1043 join()
1044 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001045#
1046# Test that manager has expected number of shared objects left
1047#
1048
1049class _TestZZZNumberOfObjects(BaseTestCase):
1050 # Because test cases are sorted alphabetically, this one will get
1051 # run after all the other tests for the manager. It tests that
1052 # there have been no "reference leaks" for the manager's shared
1053 # objects. Note the comment in _TestPool.test_terminate().
1054 ALLOWED_TYPES = ('manager',)
1055
1056 def test_number_of_objects(self):
1057 EXPECTED_NUMBER = 1 # the pool object is still alive
1058 multiprocessing.active_children() # discard dead process objs
1059 gc.collect() # do garbage collection
1060 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001061 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001062 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001063 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001064 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001065
1066 self.assertEqual(refs, EXPECTED_NUMBER)
1067
1068#
1069# Test of creating a customized manager class
1070#
1071
1072from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1073
1074class FooBar(object):
1075 def f(self):
1076 return 'f()'
1077 def g(self):
1078 raise ValueError
1079 def _h(self):
1080 return '_h()'
1081
1082def baz():
1083 for i in range(10):
1084 yield i*i
1085
1086class IteratorProxy(BaseProxy):
1087 _exposed_ = ('next', '__next__')
1088 def __iter__(self):
1089 return self
1090 def __next__(self):
1091 return self._callmethod('next')
1092 def __next__(self):
1093 return self._callmethod('__next__')
1094
1095class MyManager(BaseManager):
1096 pass
1097
1098MyManager.register('Foo', callable=FooBar)
1099MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1100MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1101
1102
1103class _TestMyManager(BaseTestCase):
1104
1105 ALLOWED_TYPES = ('manager',)
1106
1107 def test_mymanager(self):
1108 manager = MyManager()
1109 manager.start()
1110
1111 foo = manager.Foo()
1112 bar = manager.Bar()
1113 baz = manager.baz()
1114
1115 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1116 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1117
1118 self.assertEqual(foo_methods, ['f', 'g'])
1119 self.assertEqual(bar_methods, ['f', '_h'])
1120
1121 self.assertEqual(foo.f(), 'f()')
1122 self.assertRaises(ValueError, foo.g)
1123 self.assertEqual(foo._callmethod('f'), 'f()')
1124 self.assertRaises(RemoteError, foo._callmethod, '_h')
1125
1126 self.assertEqual(bar.f(), 'f()')
1127 self.assertEqual(bar._h(), '_h()')
1128 self.assertEqual(bar._callmethod('f'), 'f()')
1129 self.assertEqual(bar._callmethod('_h'), '_h()')
1130
1131 self.assertEqual(list(baz), [i*i for i in range(10)])
1132
1133 manager.shutdown()
1134
1135#
1136# Test of connecting to a remote server and using xmlrpclib for serialization
1137#
1138
1139_queue = pyqueue.Queue()
1140def get_queue():
1141 return _queue
1142
1143class QueueManager(BaseManager):
1144 '''manager class used by server process'''
1145QueueManager.register('get_queue', callable=get_queue)
1146
1147class QueueManager2(BaseManager):
1148 '''manager class which specifies the same interface as QueueManager'''
1149QueueManager2.register('get_queue')
1150
1151
1152SERIALIZER = 'xmlrpclib'
1153
1154class _TestRemoteManager(BaseTestCase):
1155
1156 ALLOWED_TYPES = ('manager',)
1157
1158 def _putter(self, address, authkey):
1159 manager = QueueManager2(
1160 address=address, authkey=authkey, serializer=SERIALIZER
1161 )
1162 manager.connect()
1163 queue = manager.get_queue()
1164 queue.put(('hello world', None, True, 2.25))
1165
1166 def test_remote(self):
1167 authkey = os.urandom(32)
1168
1169 manager = QueueManager(
1170 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1171 )
1172 manager.start()
1173
1174 p = self.Process(target=self._putter, args=(manager.address, authkey))
1175 p.start()
1176
1177 manager2 = QueueManager2(
1178 address=manager.address, authkey=authkey, serializer=SERIALIZER
1179 )
1180 manager2.connect()
1181 queue = manager2.get_queue()
1182
1183 # Note that xmlrpclib will deserialize object as a list not a tuple
1184 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1185
1186 # Because we are using xmlrpclib for serialization instead of
1187 # pickle this will cause a serialization error.
1188 self.assertRaises(Exception, queue.put, time.sleep)
1189
1190 # Make queue finalizer run before the server is stopped
1191 del queue
1192 manager.shutdown()
1193
1194#
1195#
1196#
1197
1198SENTINEL = latin('')
1199
1200class _TestConnection(BaseTestCase):
1201
1202 ALLOWED_TYPES = ('processes', 'threads')
1203
1204 def _echo(self, conn):
1205 for msg in iter(conn.recv_bytes, SENTINEL):
1206 conn.send_bytes(msg)
1207 conn.close()
1208
1209 def test_connection(self):
1210 conn, child_conn = self.Pipe()
1211
1212 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001213 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001214 p.start()
1215
1216 seq = [1, 2.25, None]
1217 msg = latin('hello world')
1218 longmsg = msg * 10
1219 arr = array.array('i', list(range(4)))
1220
1221 if self.TYPE == 'processes':
1222 self.assertEqual(type(conn.fileno()), int)
1223
1224 self.assertEqual(conn.send(seq), None)
1225 self.assertEqual(conn.recv(), seq)
1226
1227 self.assertEqual(conn.send_bytes(msg), None)
1228 self.assertEqual(conn.recv_bytes(), msg)
1229
1230 if self.TYPE == 'processes':
1231 buffer = array.array('i', [0]*10)
1232 expected = list(arr) + [0] * (10 - len(arr))
1233 self.assertEqual(conn.send_bytes(arr), None)
1234 self.assertEqual(conn.recv_bytes_into(buffer),
1235 len(arr) * buffer.itemsize)
1236 self.assertEqual(list(buffer), expected)
1237
1238 buffer = array.array('i', [0]*10)
1239 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1240 self.assertEqual(conn.send_bytes(arr), None)
1241 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1242 len(arr) * buffer.itemsize)
1243 self.assertEqual(list(buffer), expected)
1244
1245 buffer = bytearray(latin(' ' * 40))
1246 self.assertEqual(conn.send_bytes(longmsg), None)
1247 try:
1248 res = conn.recv_bytes_into(buffer)
1249 except multiprocessing.BufferTooShort as e:
1250 self.assertEqual(e.args, (longmsg,))
1251 else:
1252 self.fail('expected BufferTooShort, got %s' % res)
1253
1254 poll = TimingWrapper(conn.poll)
1255
1256 self.assertEqual(poll(), False)
1257 self.assertTimingAlmostEqual(poll.elapsed, 0)
1258
1259 self.assertEqual(poll(TIMEOUT1), False)
1260 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1261
1262 conn.send(None)
1263
1264 self.assertEqual(poll(TIMEOUT1), True)
1265 self.assertTimingAlmostEqual(poll.elapsed, 0)
1266
1267 self.assertEqual(conn.recv(), None)
1268
1269 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1270 conn.send_bytes(really_big_msg)
1271 self.assertEqual(conn.recv_bytes(), really_big_msg)
1272
1273 conn.send_bytes(SENTINEL) # tell child to quit
1274 child_conn.close()
1275
1276 if self.TYPE == 'processes':
1277 self.assertEqual(conn.readable, True)
1278 self.assertEqual(conn.writable, True)
1279 self.assertRaises(EOFError, conn.recv)
1280 self.assertRaises(EOFError, conn.recv_bytes)
1281
1282 p.join()
1283
1284 def test_duplex_false(self):
1285 reader, writer = self.Pipe(duplex=False)
1286 self.assertEqual(writer.send(1), None)
1287 self.assertEqual(reader.recv(), 1)
1288 if self.TYPE == 'processes':
1289 self.assertEqual(reader.readable, True)
1290 self.assertEqual(reader.writable, False)
1291 self.assertEqual(writer.readable, False)
1292 self.assertEqual(writer.writable, True)
1293 self.assertRaises(IOError, reader.send, 2)
1294 self.assertRaises(IOError, writer.recv)
1295 self.assertRaises(IOError, writer.poll)
1296
1297 def test_spawn_close(self):
1298 # We test that a pipe connection can be closed by parent
1299 # process immediately after child is spawned. On Windows this
1300 # would have sometimes failed on old versions because
1301 # child_conn would be closed before the child got a chance to
1302 # duplicate it.
1303 conn, child_conn = self.Pipe()
1304
1305 p = self.Process(target=self._echo, args=(child_conn,))
1306 p.start()
1307 child_conn.close() # this might complete before child initializes
1308
1309 msg = latin('hello')
1310 conn.send_bytes(msg)
1311 self.assertEqual(conn.recv_bytes(), msg)
1312
1313 conn.send_bytes(SENTINEL)
1314 conn.close()
1315 p.join()
1316
1317 def test_sendbytes(self):
1318 if self.TYPE != 'processes':
1319 return
1320
1321 msg = latin('abcdefghijklmnopqrstuvwxyz')
1322 a, b = self.Pipe()
1323
1324 a.send_bytes(msg)
1325 self.assertEqual(b.recv_bytes(), msg)
1326
1327 a.send_bytes(msg, 5)
1328 self.assertEqual(b.recv_bytes(), msg[5:])
1329
1330 a.send_bytes(msg, 7, 8)
1331 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1332
1333 a.send_bytes(msg, 26)
1334 self.assertEqual(b.recv_bytes(), latin(''))
1335
1336 a.send_bytes(msg, 26, 0)
1337 self.assertEqual(b.recv_bytes(), latin(''))
1338
1339 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1340
1341 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1342
1343 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1344
1345 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1346
1347 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1348
Benjamin Petersone711caf2008-06-11 16:44:04 +00001349class _TestListenerClient(BaseTestCase):
1350
1351 ALLOWED_TYPES = ('processes', 'threads')
1352
1353 def _test(self, address):
1354 conn = self.connection.Client(address)
1355 conn.send('hello')
1356 conn.close()
1357
1358 def test_listener_client(self):
1359 for family in self.connection.families:
1360 l = self.connection.Listener(family=family)
1361 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001362 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001363 p.start()
1364 conn = l.accept()
1365 self.assertEqual(conn.recv(), 'hello')
1366 p.join()
1367 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001368#
1369# Test of sending connection and socket objects between processes
1370#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001371"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001372class _TestPicklingConnections(BaseTestCase):
1373
1374 ALLOWED_TYPES = ('processes',)
1375
1376 def _listener(self, conn, families):
1377 for fam in families:
1378 l = self.connection.Listener(family=fam)
1379 conn.send(l.address)
1380 new_conn = l.accept()
1381 conn.send(new_conn)
1382
1383 if self.TYPE == 'processes':
1384 l = socket.socket()
1385 l.bind(('localhost', 0))
1386 conn.send(l.getsockname())
1387 l.listen(1)
1388 new_conn, addr = l.accept()
1389 conn.send(new_conn)
1390
1391 conn.recv()
1392
1393 def _remote(self, conn):
1394 for (address, msg) in iter(conn.recv, None):
1395 client = self.connection.Client(address)
1396 client.send(msg.upper())
1397 client.close()
1398
1399 if self.TYPE == 'processes':
1400 address, msg = conn.recv()
1401 client = socket.socket()
1402 client.connect(address)
1403 client.sendall(msg.upper())
1404 client.close()
1405
1406 conn.close()
1407
1408 def test_pickling(self):
1409 try:
1410 multiprocessing.allow_connection_pickling()
1411 except ImportError:
1412 return
1413
1414 families = self.connection.families
1415
1416 lconn, lconn0 = self.Pipe()
1417 lp = self.Process(target=self._listener, args=(lconn0, families))
1418 lp.start()
1419 lconn0.close()
1420
1421 rconn, rconn0 = self.Pipe()
1422 rp = self.Process(target=self._remote, args=(rconn0,))
1423 rp.start()
1424 rconn0.close()
1425
1426 for fam in families:
1427 msg = ('This connection uses family %s' % fam).encode('ascii')
1428 address = lconn.recv()
1429 rconn.send((address, msg))
1430 new_conn = lconn.recv()
1431 self.assertEqual(new_conn.recv(), msg.upper())
1432
1433 rconn.send(None)
1434
1435 if self.TYPE == 'processes':
1436 msg = latin('This connection uses a normal socket')
1437 address = lconn.recv()
1438 rconn.send((address, msg))
1439 if hasattr(socket, 'fromfd'):
1440 new_conn = lconn.recv()
1441 self.assertEqual(new_conn.recv(100), msg.upper())
1442 else:
1443 # XXX On Windows with Py2.6 need to backport fromfd()
1444 discard = lconn.recv_bytes()
1445
1446 lconn.send(None)
1447
1448 rconn.close()
1449 lconn.close()
1450
1451 lp.join()
1452 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001453"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001454#
1455#
1456#
1457
1458class _TestHeap(BaseTestCase):
1459
1460 ALLOWED_TYPES = ('processes',)
1461
1462 def test_heap(self):
1463 iterations = 5000
1464 maxblocks = 50
1465 blocks = []
1466
1467 # create and destroy lots of blocks of different sizes
1468 for i in range(iterations):
1469 size = int(random.lognormvariate(0, 1) * 1000)
1470 b = multiprocessing.heap.BufferWrapper(size)
1471 blocks.append(b)
1472 if len(blocks) > maxblocks:
1473 i = random.randrange(maxblocks)
1474 del blocks[i]
1475
1476 # get the heap object
1477 heap = multiprocessing.heap.BufferWrapper._heap
1478
1479 # verify the state of the heap
1480 all = []
1481 occupied = 0
1482 for L in list(heap._len_to_seq.values()):
1483 for arena, start, stop in L:
1484 all.append((heap._arenas.index(arena), start, stop,
1485 stop-start, 'free'))
1486 for arena, start, stop in heap._allocated_blocks:
1487 all.append((heap._arenas.index(arena), start, stop,
1488 stop-start, 'occupied'))
1489 occupied += (stop-start)
1490
1491 all.sort()
1492
1493 for i in range(len(all)-1):
1494 (arena, start, stop) = all[i][:3]
1495 (narena, nstart, nstop) = all[i+1][:3]
1496 self.assertTrue((arena != narena and nstart == 0) or
1497 (stop == nstart))
1498
1499#
1500#
1501#
1502
1503try:
1504 from ctypes import Structure, Value, copy, c_int, c_double
1505except ImportError:
1506 Structure = object
1507 c_int = c_double = None
1508
1509class _Foo(Structure):
1510 _fields_ = [
1511 ('x', c_int),
1512 ('y', c_double)
1513 ]
1514
1515class _TestSharedCTypes(BaseTestCase):
1516
1517 ALLOWED_TYPES = ('processes',)
1518
1519 def _double(self, x, y, foo, arr, string):
1520 x.value *= 2
1521 y.value *= 2
1522 foo.x *= 2
1523 foo.y *= 2
1524 string.value *= 2
1525 for i in range(len(arr)):
1526 arr[i] *= 2
1527
1528 def test_sharedctypes(self, lock=False):
1529 if c_int is None:
1530 return
1531
1532 x = Value('i', 7, lock=lock)
1533 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1534 foo = Value(_Foo, 3, 2, lock=lock)
1535 arr = Array('d', list(range(10)), lock=lock)
1536 string = Array('c', 20, lock=lock)
1537 string.value = 'hello'
1538
1539 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1540 p.start()
1541 p.join()
1542
1543 self.assertEqual(x.value, 14)
1544 self.assertAlmostEqual(y.value, 2.0/3.0)
1545 self.assertEqual(foo.x, 6)
1546 self.assertAlmostEqual(foo.y, 4.0)
1547 for i in range(10):
1548 self.assertAlmostEqual(arr[i], i*2)
1549 self.assertEqual(string.value, latin('hellohello'))
1550
1551 def test_synchronize(self):
1552 self.test_sharedctypes(lock=True)
1553
1554 def test_copy(self):
1555 if c_int is None:
1556 return
1557
1558 foo = _Foo(2, 5.0)
1559 bar = copy(foo)
1560 foo.x = 0
1561 foo.y = 0
1562 self.assertEqual(bar.x, 2)
1563 self.assertAlmostEqual(bar.y, 5.0)
1564
1565#
1566#
1567#
1568
1569class _TestFinalize(BaseTestCase):
1570
1571 ALLOWED_TYPES = ('processes',)
1572
1573 def _test_finalize(self, conn):
1574 class Foo(object):
1575 pass
1576
1577 a = Foo()
1578 util.Finalize(a, conn.send, args=('a',))
1579 del a # triggers callback for a
1580
1581 b = Foo()
1582 close_b = util.Finalize(b, conn.send, args=('b',))
1583 close_b() # triggers callback for b
1584 close_b() # does nothing because callback has already been called
1585 del b # does nothing because callback has already been called
1586
1587 c = Foo()
1588 util.Finalize(c, conn.send, args=('c',))
1589
1590 d10 = Foo()
1591 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1592
1593 d01 = Foo()
1594 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1595 d02 = Foo()
1596 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1597 d03 = Foo()
1598 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1599
1600 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1601
1602 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1603
1604 # call mutliprocessing's cleanup function then exit process without
1605 # garbage collecting locals
1606 util._exit_function()
1607 conn.close()
1608 os._exit(0)
1609
1610 def test_finalize(self):
1611 conn, child_conn = self.Pipe()
1612
1613 p = self.Process(target=self._test_finalize, args=(child_conn,))
1614 p.start()
1615 p.join()
1616
1617 result = [obj for obj in iter(conn.recv, 'STOP')]
1618 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1619
1620#
1621# Test that from ... import * works for each module
1622#
1623
1624class _TestImportStar(BaseTestCase):
1625
1626 ALLOWED_TYPES = ('processes',)
1627
1628 def test_import(self):
1629 modules = (
1630 'multiprocessing', 'multiprocessing.connection',
1631 'multiprocessing.heap', 'multiprocessing.managers',
1632 'multiprocessing.pool', 'multiprocessing.process',
1633 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1634 'multiprocessing.synchronize', 'multiprocessing.util'
1635 )
1636
1637 for name in modules:
1638 __import__(name)
1639 mod = sys.modules[name]
1640
1641 for attr in getattr(mod, '__all__', ()):
1642 self.assertTrue(
1643 hasattr(mod, attr),
1644 '%r does not have attribute %r' % (mod, attr)
1645 )
1646
1647#
1648# Quick test that logging works -- does not test logging output
1649#
1650
1651class _TestLogging(BaseTestCase):
1652
1653 ALLOWED_TYPES = ('processes',)
1654
1655 def test_enable_logging(self):
1656 logger = multiprocessing.get_logger()
1657 logger.setLevel(util.SUBWARNING)
1658 self.assertTrue(logger is not None)
1659 logger.debug('this will not be printed')
1660 logger.info('nor will this')
1661 logger.setLevel(LOG_LEVEL)
1662
1663 def _test_level(self, conn):
1664 logger = multiprocessing.get_logger()
1665 conn.send(logger.getEffectiveLevel())
1666
1667 def test_level(self):
1668 LEVEL1 = 32
1669 LEVEL2 = 37
1670
1671 logger = multiprocessing.get_logger()
1672 root_logger = logging.getLogger()
1673 root_level = root_logger.level
1674
1675 reader, writer = multiprocessing.Pipe(duplex=False)
1676
1677 logger.setLevel(LEVEL1)
1678 self.Process(target=self._test_level, args=(writer,)).start()
1679 self.assertEqual(LEVEL1, reader.recv())
1680
1681 logger.setLevel(logging.NOTSET)
1682 root_logger.setLevel(LEVEL2)
1683 self.Process(target=self._test_level, args=(writer,)).start()
1684 self.assertEqual(LEVEL2, reader.recv())
1685
1686 root_logger.setLevel(root_level)
1687 logger.setLevel(level=LOG_LEVEL)
1688
1689#
Jesse Noller6214edd2009-01-19 16:23:53 +00001690# Test to verify handle verification, see issue 3321
1691#
1692
1693class TestInvalidHandle(unittest.TestCase):
1694
1695 def test_invalid_handles(self):
1696 if WIN32:
1697 return
1698 conn = _multiprocessing.Connection(44977608)
1699 self.assertRaises(IOError, conn.poll)
1700 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1701#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001702# Functions used to create test cases from the base ones in this module
1703#
1704
1705def get_attributes(Source, names):
1706 d = {}
1707 for name in names:
1708 obj = getattr(Source, name)
1709 if type(obj) == type(get_attributes):
1710 obj = staticmethod(obj)
1711 d[name] = obj
1712 return d
1713
1714def create_test_cases(Mixin, type):
1715 result = {}
1716 glob = globals()
1717 Type = type[0].upper() + type[1:]
1718
1719 for name in list(glob.keys()):
1720 if name.startswith('_Test'):
1721 base = glob[name]
1722 if type in base.ALLOWED_TYPES:
1723 newname = 'With' + Type + name[1:]
1724 class Temp(base, unittest.TestCase, Mixin):
1725 pass
1726 result[newname] = Temp
1727 Temp.__name__ = newname
1728 Temp.__module__ = Mixin.__module__
1729 return result
1730
1731#
1732# Create test cases
1733#
1734
1735class ProcessesMixin(object):
1736 TYPE = 'processes'
1737 Process = multiprocessing.Process
1738 locals().update(get_attributes(multiprocessing, (
1739 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1740 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1741 'RawArray', 'current_process', 'active_children', 'Pipe',
1742 'connection', 'JoinableQueue'
1743 )))
1744
1745testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1746globals().update(testcases_processes)
1747
1748
1749class ManagerMixin(object):
1750 TYPE = 'manager'
1751 Process = multiprocessing.Process
1752 manager = object.__new__(multiprocessing.managers.SyncManager)
1753 locals().update(get_attributes(manager, (
1754 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1755 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1756 'Namespace', 'JoinableQueue'
1757 )))
1758
1759testcases_manager = create_test_cases(ManagerMixin, type='manager')
1760globals().update(testcases_manager)
1761
1762
1763class ThreadsMixin(object):
1764 TYPE = 'threads'
1765 Process = multiprocessing.dummy.Process
1766 locals().update(get_attributes(multiprocessing.dummy, (
1767 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1768 'Condition', 'Event', 'Value', 'Array', 'current_process',
1769 'active_children', 'Pipe', 'connection', 'dict', 'list',
1770 'Namespace', 'JoinableQueue'
1771 )))
1772
1773testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1774globals().update(testcases_threads)
1775
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001776class OtherTest(unittest.TestCase):
1777 # TODO: add more tests for deliver/answer challenge.
1778 def test_deliver_challenge_auth_failure(self):
1779 class _FakeConnection(object):
1780 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001781 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001782 def send_bytes(self, data):
1783 pass
1784 self.assertRaises(multiprocessing.AuthenticationError,
1785 multiprocessing.connection.deliver_challenge,
1786 _FakeConnection(), b'abc')
1787
1788 def test_answer_challenge_auth_failure(self):
1789 class _FakeConnection(object):
1790 def __init__(self):
1791 self.count = 0
1792 def recv_bytes(self, size):
1793 self.count += 1
1794 if self.count == 1:
1795 return multiprocessing.connection.CHALLENGE
1796 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001797 return b'something bogus'
1798 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001799 def send_bytes(self, data):
1800 pass
1801 self.assertRaises(multiprocessing.AuthenticationError,
1802 multiprocessing.connection.answer_challenge,
1803 _FakeConnection(), b'abc')
1804
Jesse Noller6214edd2009-01-19 16:23:53 +00001805testcases_other = [OtherTest, TestInvalidHandle]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001806
Benjamin Petersone711caf2008-06-11 16:44:04 +00001807#
1808#
1809#
1810
1811def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001812 if sys.platform.startswith("linux"):
1813 try:
1814 lock = multiprocessing.RLock()
1815 except OSError:
Amaury Forgeot d'Arc620626b2008-06-19 22:03:50 +00001816 from test.support import TestSkipped
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001817 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001818
Benjamin Petersone711caf2008-06-11 16:44:04 +00001819 if run is None:
1820 from test.support import run_unittest as run
1821
1822 util.get_temp_dir() # creates temp directory for use by all processes
1823
1824 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1825
Benjamin Peterson41181742008-07-02 20:22:54 +00001826 ProcessesMixin.pool = multiprocessing.Pool(4)
1827 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1828 ManagerMixin.manager.__init__()
1829 ManagerMixin.manager.start()
1830 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001831
1832 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001833 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1834 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001835 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1836 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001837 )
1838
1839 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1840 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1841 run(suite)
1842
Benjamin Peterson41181742008-07-02 20:22:54 +00001843 ThreadsMixin.pool.terminate()
1844 ProcessesMixin.pool.terminate()
1845 ManagerMixin.pool.terminate()
1846 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001847
Benjamin Peterson41181742008-07-02 20:22:54 +00001848 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001849
1850def main():
1851 test_main(unittest.TextTestRunner(verbosity=2).run)
1852
1853if __name__ == '__main__':
1854 main()