blob: 078d7faf3bacafc5b261318235269c47d1b49ee4 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
21import multiprocessing.dummy
22import multiprocessing.connection
23import multiprocessing.managers
24import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000025import multiprocessing.pool
26import _multiprocessing
27
28from multiprocessing import util
29
30#
31#
32#
33
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000034def latin(s):
35 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
Benjamin Petersone711caf2008-06-11 16:44:04 +000037#
38# Constants
39#
40
41LOG_LEVEL = util.SUBWARNING
42#LOG_LEVEL = logging.WARNING
43
44DELTA = 0.1
45CHECK_TIMINGS = False # making true makes tests take a lot longer
46 # and can sometimes cause some non-serious
47 # failures because some calls block a bit
48 # longer than expected
49if CHECK_TIMINGS:
50 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
51else:
52 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
53
54HAVE_GETVALUE = not getattr(_multiprocessing,
55 'HAVE_BROKEN_SEM_GETVALUE', False)
56
57#
58# Creates a wrapper for a function which records the time it takes to finish
59#
60
61class TimingWrapper(object):
62
63 def __init__(self, func):
64 self.func = func
65 self.elapsed = None
66
67 def __call__(self, *args, **kwds):
68 t = time.time()
69 try:
70 return self.func(*args, **kwds)
71 finally:
72 self.elapsed = time.time() - t
73
74#
75# Base class for test cases
76#
77
78class BaseTestCase(object):
79
80 ALLOWED_TYPES = ('processes', 'manager', 'threads')
81
82 def assertTimingAlmostEqual(self, a, b):
83 if CHECK_TIMINGS:
84 self.assertAlmostEqual(a, b, 1)
85
86 def assertReturnsIfImplemented(self, value, func, *args):
87 try:
88 res = func(*args)
89 except NotImplementedError:
90 pass
91 else:
92 return self.assertEqual(value, res)
93
94#
95# Return the value of a semaphore
96#
97
98def get_value(self):
99 try:
100 return self.get_value()
101 except AttributeError:
102 try:
103 return self._Semaphore__value
104 except AttributeError:
105 try:
106 return self._value
107 except AttributeError:
108 raise NotImplementedError
109
110#
111# Testcases
112#
113
114class _TestProcess(BaseTestCase):
115
116 ALLOWED_TYPES = ('processes', 'threads')
117
118 def test_current(self):
119 if self.TYPE == 'threads':
120 return
121
122 current = self.current_process()
123 authkey = current.get_authkey()
124
125 self.assertTrue(current.is_alive())
126 self.assertTrue(not current.is_daemon())
127 self.assertTrue(isinstance(authkey, bytes))
128 self.assertTrue(len(authkey) > 0)
129 self.assertEqual(current.get_ident(), os.getpid())
130 self.assertEqual(current.get_exitcode(), None)
131
132 def _test(self, q, *args, **kwds):
133 current = self.current_process()
134 q.put(args)
135 q.put(kwds)
136 q.put(current.get_name())
137 if self.TYPE != 'threads':
138 q.put(bytes(current.get_authkey()))
139 q.put(current.pid)
140
141 def test_process(self):
142 q = self.Queue(1)
143 e = self.Event()
144 args = (q, 1, 2)
145 kwargs = {'hello':23, 'bye':2.54}
146 name = 'SomeProcess'
147 p = self.Process(
148 target=self._test, args=args, kwargs=kwargs, name=name
149 )
150 p.set_daemon(True)
151 current = self.current_process()
152
153 if self.TYPE != 'threads':
154 self.assertEquals(p.get_authkey(), current.get_authkey())
155 self.assertEquals(p.is_alive(), False)
156 self.assertEquals(p.is_daemon(), True)
157 self.assertTrue(p not in self.active_children())
158 self.assertTrue(type(self.active_children()) is list)
159 self.assertEqual(p.get_exitcode(), None)
160
161 p.start()
162
163 self.assertEquals(p.get_exitcode(), None)
164 self.assertEquals(p.is_alive(), True)
165 self.assertTrue(p in self.active_children())
166
167 self.assertEquals(q.get(), args[1:])
168 self.assertEquals(q.get(), kwargs)
169 self.assertEquals(q.get(), p.get_name())
170 if self.TYPE != 'threads':
171 self.assertEquals(q.get(), current.get_authkey())
172 self.assertEquals(q.get(), p.pid)
173
174 p.join()
175
176 self.assertEquals(p.get_exitcode(), 0)
177 self.assertEquals(p.is_alive(), False)
178 self.assertTrue(p not in self.active_children())
179
180 def _test_terminate(self):
181 time.sleep(1000)
182
183 def test_terminate(self):
184 if self.TYPE == 'threads':
185 return
186
187 p = self.Process(target=self._test_terminate)
188 p.set_daemon(True)
189 p.start()
190
191 self.assertEqual(p.is_alive(), True)
192 self.assertTrue(p in self.active_children())
193 self.assertEqual(p.get_exitcode(), None)
194
195 p.terminate()
196
197 join = TimingWrapper(p.join)
198 self.assertEqual(join(), None)
199 self.assertTimingAlmostEqual(join.elapsed, 0.0)
200
201 self.assertEqual(p.is_alive(), False)
202 self.assertTrue(p not in self.active_children())
203
204 p.join()
205
206 # XXX sometimes get p.get_exitcode() == 0 on Windows ...
207 #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
208
209 def test_cpu_count(self):
210 try:
211 cpus = multiprocessing.cpu_count()
212 except NotImplementedError:
213 cpus = 1
214 self.assertTrue(type(cpus) is int)
215 self.assertTrue(cpus >= 1)
216
217 def test_active_children(self):
218 self.assertEqual(type(self.active_children()), list)
219
220 p = self.Process(target=time.sleep, args=(DELTA,))
221 self.assertTrue(p not in self.active_children())
222
223 p.start()
224 self.assertTrue(p in self.active_children())
225
226 p.join()
227 self.assertTrue(p not in self.active_children())
228
229 def _test_recursion(self, wconn, id):
230 from multiprocessing import forking
231 wconn.send(id)
232 if len(id) < 2:
233 for i in range(2):
234 p = self.Process(
235 target=self._test_recursion, args=(wconn, id+[i])
236 )
237 p.start()
238 p.join()
239
240 def test_recursion(self):
241 rconn, wconn = self.Pipe(duplex=False)
242 self._test_recursion(wconn, [])
243
244 time.sleep(DELTA)
245 result = []
246 while rconn.poll():
247 result.append(rconn.recv())
248
249 expected = [
250 [],
251 [0],
252 [0, 0],
253 [0, 1],
254 [1],
255 [1, 0],
256 [1, 1]
257 ]
258 self.assertEqual(result, expected)
259
260#
261#
262#
263
264class _UpperCaser(multiprocessing.Process):
265
266 def __init__(self):
267 multiprocessing.Process.__init__(self)
268 self.child_conn, self.parent_conn = multiprocessing.Pipe()
269
270 def run(self):
271 self.parent_conn.close()
272 for s in iter(self.child_conn.recv, None):
273 self.child_conn.send(s.upper())
274 self.child_conn.close()
275
276 def submit(self, s):
277 assert type(s) is str
278 self.parent_conn.send(s)
279 return self.parent_conn.recv()
280
281 def stop(self):
282 self.parent_conn.send(None)
283 self.parent_conn.close()
284 self.child_conn.close()
285
286class _TestSubclassingProcess(BaseTestCase):
287
288 ALLOWED_TYPES = ('processes',)
289
290 def test_subclassing(self):
291 uppercaser = _UpperCaser()
292 uppercaser.start()
293 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
294 self.assertEqual(uppercaser.submit('world'), 'WORLD')
295 uppercaser.stop()
296 uppercaser.join()
297
298#
299#
300#
301
302def queue_empty(q):
303 if hasattr(q, 'empty'):
304 return q.empty()
305 else:
306 return q.qsize() == 0
307
308def queue_full(q, maxsize):
309 if hasattr(q, 'full'):
310 return q.full()
311 else:
312 return q.qsize() == maxsize
313
314
315class _TestQueue(BaseTestCase):
316
317
318 def _test_put(self, queue, child_can_start, parent_can_continue):
319 child_can_start.wait()
320 for i in range(6):
321 queue.get()
322 parent_can_continue.set()
323
324 def test_put(self):
325 MAXSIZE = 6
326 queue = self.Queue(maxsize=MAXSIZE)
327 child_can_start = self.Event()
328 parent_can_continue = self.Event()
329
330 proc = self.Process(
331 target=self._test_put,
332 args=(queue, child_can_start, parent_can_continue)
333 )
334 proc.set_daemon(True)
335 proc.start()
336
337 self.assertEqual(queue_empty(queue), True)
338 self.assertEqual(queue_full(queue, MAXSIZE), False)
339
340 queue.put(1)
341 queue.put(2, True)
342 queue.put(3, True, None)
343 queue.put(4, False)
344 queue.put(5, False, None)
345 queue.put_nowait(6)
346
347 # the values may be in buffer but not yet in pipe so sleep a bit
348 time.sleep(DELTA)
349
350 self.assertEqual(queue_empty(queue), False)
351 self.assertEqual(queue_full(queue, MAXSIZE), True)
352
353 put = TimingWrapper(queue.put)
354 put_nowait = TimingWrapper(queue.put_nowait)
355
356 self.assertRaises(pyqueue.Full, put, 7, False)
357 self.assertTimingAlmostEqual(put.elapsed, 0)
358
359 self.assertRaises(pyqueue.Full, put, 7, False, None)
360 self.assertTimingAlmostEqual(put.elapsed, 0)
361
362 self.assertRaises(pyqueue.Full, put_nowait, 7)
363 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
364
365 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
366 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
367
368 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
369 self.assertTimingAlmostEqual(put.elapsed, 0)
370
371 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
372 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
373
374 child_can_start.set()
375 parent_can_continue.wait()
376
377 self.assertEqual(queue_empty(queue), True)
378 self.assertEqual(queue_full(queue, MAXSIZE), False)
379
380 proc.join()
381
382 def _test_get(self, queue, child_can_start, parent_can_continue):
383 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000384 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000385 queue.put(2)
386 queue.put(3)
387 queue.put(4)
388 queue.put(5)
389 parent_can_continue.set()
390
391 def test_get(self):
392 queue = self.Queue()
393 child_can_start = self.Event()
394 parent_can_continue = self.Event()
395
396 proc = self.Process(
397 target=self._test_get,
398 args=(queue, child_can_start, parent_can_continue)
399 )
400 proc.set_daemon(True)
401 proc.start()
402
403 self.assertEqual(queue_empty(queue), True)
404
405 child_can_start.set()
406 parent_can_continue.wait()
407
408 time.sleep(DELTA)
409 self.assertEqual(queue_empty(queue), False)
410
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000411 # Hangs unexpectedly, remove for now
412 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000413 self.assertEqual(queue.get(True, None), 2)
414 self.assertEqual(queue.get(True), 3)
415 self.assertEqual(queue.get(timeout=1), 4)
416 self.assertEqual(queue.get_nowait(), 5)
417
418 self.assertEqual(queue_empty(queue), True)
419
420 get = TimingWrapper(queue.get)
421 get_nowait = TimingWrapper(queue.get_nowait)
422
423 self.assertRaises(pyqueue.Empty, get, False)
424 self.assertTimingAlmostEqual(get.elapsed, 0)
425
426 self.assertRaises(pyqueue.Empty, get, False, None)
427 self.assertTimingAlmostEqual(get.elapsed, 0)
428
429 self.assertRaises(pyqueue.Empty, get_nowait)
430 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
431
432 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
433 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
434
435 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
436 self.assertTimingAlmostEqual(get.elapsed, 0)
437
438 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
439 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
440
441 proc.join()
442
443 def _test_fork(self, queue):
444 for i in range(10, 20):
445 queue.put(i)
446 # note that at this point the items may only be buffered, so the
447 # process cannot shutdown until the feeder thread has finished
448 # pushing items onto the pipe.
449
450 def test_fork(self):
451 # Old versions of Queue would fail to create a new feeder
452 # thread for a forked process if the original process had its
453 # own feeder thread. This test checks that this no longer
454 # happens.
455
456 queue = self.Queue()
457
458 # put items on queue so that main process starts a feeder thread
459 for i in range(10):
460 queue.put(i)
461
462 # wait to make sure thread starts before we fork a new process
463 time.sleep(DELTA)
464
465 # fork process
466 p = self.Process(target=self._test_fork, args=(queue,))
467 p.start()
468
469 # check that all expected items are in the queue
470 for i in range(20):
471 self.assertEqual(queue.get(), i)
472 self.assertRaises(pyqueue.Empty, queue.get, False)
473
474 p.join()
475
476 def test_qsize(self):
477 q = self.Queue()
478 try:
479 self.assertEqual(q.qsize(), 0)
480 except NotImplementedError:
481 return
482 q.put(1)
483 self.assertEqual(q.qsize(), 1)
484 q.put(5)
485 self.assertEqual(q.qsize(), 2)
486 q.get()
487 self.assertEqual(q.qsize(), 1)
488 q.get()
489 self.assertEqual(q.qsize(), 0)
490
491 def _test_task_done(self, q):
492 for obj in iter(q.get, None):
493 time.sleep(DELTA)
494 q.task_done()
495
496 def test_task_done(self):
497 queue = self.JoinableQueue()
498
499 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
500 return
501
502 workers = [self.Process(target=self._test_task_done, args=(queue,))
503 for i in range(4)]
504
505 for p in workers:
506 p.start()
507
508 for i in range(10):
509 queue.put(i)
510
511 queue.join()
512
513 for p in workers:
514 queue.put(None)
515
516 for p in workers:
517 p.join()
518
519#
520#
521#
522
523class _TestLock(BaseTestCase):
524
525 def test_lock(self):
526 lock = self.Lock()
527 self.assertEqual(lock.acquire(), True)
528 self.assertEqual(lock.acquire(False), False)
529 self.assertEqual(lock.release(), None)
530 self.assertRaises((ValueError, threading.ThreadError), lock.release)
531
532 def test_rlock(self):
533 lock = self.RLock()
534 self.assertEqual(lock.acquire(), True)
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(), True)
537 self.assertEqual(lock.release(), None)
538 self.assertEqual(lock.release(), None)
539 self.assertEqual(lock.release(), None)
540 self.assertRaises((AssertionError, RuntimeError), lock.release)
541
542
543class _TestSemaphore(BaseTestCase):
544
545 def _test_semaphore(self, sem):
546 self.assertReturnsIfImplemented(2, get_value, sem)
547 self.assertEqual(sem.acquire(), True)
548 self.assertReturnsIfImplemented(1, get_value, sem)
549 self.assertEqual(sem.acquire(), True)
550 self.assertReturnsIfImplemented(0, get_value, sem)
551 self.assertEqual(sem.acquire(False), False)
552 self.assertReturnsIfImplemented(0, get_value, sem)
553 self.assertEqual(sem.release(), None)
554 self.assertReturnsIfImplemented(1, get_value, sem)
555 self.assertEqual(sem.release(), None)
556 self.assertReturnsIfImplemented(2, get_value, sem)
557
558 def test_semaphore(self):
559 sem = self.Semaphore(2)
560 self._test_semaphore(sem)
561 self.assertEqual(sem.release(), None)
562 self.assertReturnsIfImplemented(3, get_value, sem)
563 self.assertEqual(sem.release(), None)
564 self.assertReturnsIfImplemented(4, get_value, sem)
565
566 def test_bounded_semaphore(self):
567 sem = self.BoundedSemaphore(2)
568 self._test_semaphore(sem)
569 # Currently fails on OS/X
570 #if HAVE_GETVALUE:
571 # self.assertRaises(ValueError, sem.release)
572 # self.assertReturnsIfImplemented(2, get_value, sem)
573
574 def test_timeout(self):
575 if self.TYPE != 'processes':
576 return
577
578 sem = self.Semaphore(0)
579 acquire = TimingWrapper(sem.acquire)
580
581 self.assertEqual(acquire(False), False)
582 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
583
584 self.assertEqual(acquire(False, None), False)
585 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
586
587 self.assertEqual(acquire(False, TIMEOUT1), False)
588 self.assertTimingAlmostEqual(acquire.elapsed, 0)
589
590 self.assertEqual(acquire(True, TIMEOUT2), False)
591 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
592
593 self.assertEqual(acquire(timeout=TIMEOUT3), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
595
596
597class _TestCondition(BaseTestCase):
598
599 def f(self, cond, sleeping, woken, timeout=None):
600 cond.acquire()
601 sleeping.release()
602 cond.wait(timeout)
603 woken.release()
604 cond.release()
605
606 def check_invariant(self, cond):
607 # this is only supposed to succeed when there are no sleepers
608 if self.TYPE == 'processes':
609 try:
610 sleepers = (cond._sleeping_count.get_value() -
611 cond._woken_count.get_value())
612 self.assertEqual(sleepers, 0)
613 self.assertEqual(cond._wait_semaphore.get_value(), 0)
614 except NotImplementedError:
615 pass
616
617 def test_notify(self):
618 cond = self.Condition()
619 sleeping = self.Semaphore(0)
620 woken = self.Semaphore(0)
621
622 p = self.Process(target=self.f, args=(cond, sleeping, woken))
623 p.set_daemon(True)
624 p.start()
625
626 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000627 p.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000628 p.start()
629
630 # wait for both children to start sleeping
631 sleeping.acquire()
632 sleeping.acquire()
633
634 # check no process/thread has woken up
635 time.sleep(DELTA)
636 self.assertReturnsIfImplemented(0, get_value, woken)
637
638 # wake up one process/thread
639 cond.acquire()
640 cond.notify()
641 cond.release()
642
643 # check one process/thread has woken up
644 time.sleep(DELTA)
645 self.assertReturnsIfImplemented(1, get_value, woken)
646
647 # wake up another
648 cond.acquire()
649 cond.notify()
650 cond.release()
651
652 # check other has woken up
653 time.sleep(DELTA)
654 self.assertReturnsIfImplemented(2, get_value, woken)
655
656 # check state is not mucked up
657 self.check_invariant(cond)
658 p.join()
659
660 def test_notify_all(self):
661 cond = self.Condition()
662 sleeping = self.Semaphore(0)
663 woken = self.Semaphore(0)
664
665 # start some threads/processes which will timeout
666 for i in range(3):
667 p = self.Process(target=self.f,
668 args=(cond, sleeping, woken, TIMEOUT1))
669 p.set_daemon(True)
670 p.start()
671
672 t = threading.Thread(target=self.f,
673 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000674 t.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000675 t.start()
676
677 # wait for them all to sleep
678 for i in range(6):
679 sleeping.acquire()
680
681 # check they have all timed out
682 for i in range(6):
683 woken.acquire()
684 self.assertReturnsIfImplemented(0, get_value, woken)
685
686 # check state is not mucked up
687 self.check_invariant(cond)
688
689 # start some more threads/processes
690 for i in range(3):
691 p = self.Process(target=self.f, args=(cond, sleeping, woken))
692 p.set_daemon(True)
693 p.start()
694
695 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000696 t.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000697 t.start()
698
699 # wait for them to all sleep
700 for i in range(6):
701 sleeping.acquire()
702
703 # check no process/thread has woken up
704 time.sleep(DELTA)
705 self.assertReturnsIfImplemented(0, get_value, woken)
706
707 # wake them all up
708 cond.acquire()
709 cond.notify_all()
710 cond.release()
711
712 # check they have all woken
713 time.sleep(DELTA)
714 self.assertReturnsIfImplemented(6, get_value, woken)
715
716 # check state is not mucked up
717 self.check_invariant(cond)
718
719 def test_timeout(self):
720 cond = self.Condition()
721 wait = TimingWrapper(cond.wait)
722 cond.acquire()
723 res = wait(TIMEOUT1)
724 cond.release()
725 self.assertEqual(res, None)
726 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
727
728
729class _TestEvent(BaseTestCase):
730
731 def _test_event(self, event):
732 time.sleep(TIMEOUT2)
733 event.set()
734
735 def test_event(self):
736 event = self.Event()
737 wait = TimingWrapper(event.wait)
738
739 # Removed temporaily, due to API shear, this does not
740 # work with threading._Event objects. is_set == isSet
741 #self.assertEqual(event.is_set(), False)
742
743 self.assertEqual(wait(0.0), None)
744 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
745 self.assertEqual(wait(TIMEOUT1), None)
746 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
747
748 event.set()
749
750 # See note above on the API differences
751 # self.assertEqual(event.is_set(), True)
752 self.assertEqual(wait(), None)
753 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
754 self.assertEqual(wait(TIMEOUT1), None)
755 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
756 # self.assertEqual(event.is_set(), True)
757
758 event.clear()
759
760 #self.assertEqual(event.is_set(), False)
761
762 self.Process(target=self._test_event, args=(event,)).start()
763 self.assertEqual(wait(), None)
764
765#
766#
767#
768
769class _TestValue(BaseTestCase):
770
771 codes_values = [
772 ('i', 4343, 24234),
773 ('d', 3.625, -4.25),
774 ('h', -232, 234),
775 ('c', latin('x'), latin('y'))
776 ]
777
778 def _test(self, values):
779 for sv, cv in zip(values, self.codes_values):
780 sv.value = cv[2]
781
782
783 def test_value(self, raw=False):
784 if self.TYPE != 'processes':
785 return
786
787 if raw:
788 values = [self.RawValue(code, value)
789 for code, value, _ in self.codes_values]
790 else:
791 values = [self.Value(code, value)
792 for code, value, _ in self.codes_values]
793
794 for sv, cv in zip(values, self.codes_values):
795 self.assertEqual(sv.value, cv[1])
796
797 proc = self.Process(target=self._test, args=(values,))
798 proc.start()
799 proc.join()
800
801 for sv, cv in zip(values, self.codes_values):
802 self.assertEqual(sv.value, cv[2])
803
804 def test_rawvalue(self):
805 self.test_value(raw=True)
806
807 def test_getobj_getlock(self):
808 if self.TYPE != 'processes':
809 return
810
811 val1 = self.Value('i', 5)
812 lock1 = val1.get_lock()
813 obj1 = val1.get_obj()
814
815 val2 = self.Value('i', 5, lock=None)
816 lock2 = val2.get_lock()
817 obj2 = val2.get_obj()
818
819 lock = self.Lock()
820 val3 = self.Value('i', 5, lock=lock)
821 lock3 = val3.get_lock()
822 obj3 = val3.get_obj()
823 self.assertEqual(lock, lock3)
824
825 arr4 = self.RawValue('i', 5)
826 self.assertFalse(hasattr(arr4, 'get_lock'))
827 self.assertFalse(hasattr(arr4, 'get_obj'))
828
829
830class _TestArray(BaseTestCase):
831
832 def f(self, seq):
833 for i in range(1, len(seq)):
834 seq[i] += seq[i-1]
835
836 def test_array(self, raw=False):
837 if self.TYPE != 'processes':
838 return
839
840 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
841 if raw:
842 arr = self.RawArray('i', seq)
843 else:
844 arr = self.Array('i', seq)
845
846 self.assertEqual(len(arr), len(seq))
847 self.assertEqual(arr[3], seq[3])
848 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
849
850 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
851
852 self.assertEqual(list(arr[:]), seq)
853
854 self.f(seq)
855
856 p = self.Process(target=self.f, args=(arr,))
857 p.start()
858 p.join()
859
860 self.assertEqual(list(arr[:]), seq)
861
862 def test_rawarray(self):
863 self.test_array(raw=True)
864
865 def test_getobj_getlock_obj(self):
866 if self.TYPE != 'processes':
867 return
868
869 arr1 = self.Array('i', list(range(10)))
870 lock1 = arr1.get_lock()
871 obj1 = arr1.get_obj()
872
873 arr2 = self.Array('i', list(range(10)), lock=None)
874 lock2 = arr2.get_lock()
875 obj2 = arr2.get_obj()
876
877 lock = self.Lock()
878 arr3 = self.Array('i', list(range(10)), lock=lock)
879 lock3 = arr3.get_lock()
880 obj3 = arr3.get_obj()
881 self.assertEqual(lock, lock3)
882
883 arr4 = self.RawArray('i', list(range(10)))
884 self.assertFalse(hasattr(arr4, 'get_lock'))
885 self.assertFalse(hasattr(arr4, 'get_obj'))
886
887#
888#
889#
890
891class _TestContainers(BaseTestCase):
892
893 ALLOWED_TYPES = ('manager',)
894
895 def test_list(self):
896 a = self.list(list(range(10)))
897 self.assertEqual(a[:], list(range(10)))
898
899 b = self.list()
900 self.assertEqual(b[:], [])
901
902 b.extend(list(range(5)))
903 self.assertEqual(b[:], list(range(5)))
904
905 self.assertEqual(b[2], 2)
906 self.assertEqual(b[2:10], [2,3,4])
907
908 b *= 2
909 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
910
911 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
912
913 self.assertEqual(a[:], list(range(10)))
914
915 d = [a, b]
916 e = self.list(d)
917 self.assertEqual(
918 e[:],
919 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
920 )
921
922 f = self.list([a])
923 a.append('hello')
924 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
925
926 def test_dict(self):
927 d = self.dict()
928 indices = list(range(65, 70))
929 for i in indices:
930 d[i] = chr(i)
931 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
932 self.assertEqual(sorted(d.keys()), indices)
933 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
934 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
935
936 def test_namespace(self):
937 n = self.Namespace()
938 n.name = 'Bob'
939 n.job = 'Builder'
940 n._hidden = 'hidden'
941 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
942 del n.job
943 self.assertEqual(str(n), "Namespace(name='Bob')")
944 self.assertTrue(hasattr(n, 'name'))
945 self.assertTrue(not hasattr(n, 'job'))
946
947#
948#
949#
950
951def sqr(x, wait=0.0):
952 time.sleep(wait)
953 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000954class _TestPool(BaseTestCase):
955
956 def test_apply(self):
957 papply = self.pool.apply
958 self.assertEqual(papply(sqr, (5,)), sqr(5))
959 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
960
961 def test_map(self):
962 pmap = self.pool.map
963 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
964 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
965 list(map(sqr, list(range(100)))))
966
967 def test_async(self):
968 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
969 get = TimingWrapper(res.get)
970 self.assertEqual(get(), 49)
971 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
972
973 def test_async_timeout(self):
974 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
975 get = TimingWrapper(res.get)
976 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
977 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
978
979 def test_imap(self):
980 it = self.pool.imap(sqr, list(range(10)))
981 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
982
983 it = self.pool.imap(sqr, list(range(10)))
984 for i in range(10):
985 self.assertEqual(next(it), i*i)
986 self.assertRaises(StopIteration, it.__next__)
987
988 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
989 for i in range(1000):
990 self.assertEqual(next(it), i*i)
991 self.assertRaises(StopIteration, it.__next__)
992
993 def test_imap_unordered(self):
994 it = self.pool.imap_unordered(sqr, list(range(1000)))
995 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
996
997 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
998 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
999
1000 def test_make_pool(self):
1001 p = multiprocessing.Pool(3)
1002 self.assertEqual(3, len(p._pool))
1003 p.close()
1004 p.join()
1005
1006 def test_terminate(self):
1007 if self.TYPE == 'manager':
1008 # On Unix a forked process increfs each shared object to
1009 # which its parent process held a reference. If the
1010 # forked process gets terminated then there is likely to
1011 # be a reference leak. So to prevent
1012 # _TestZZZNumberOfObjects from failing we skip this test
1013 # when using a manager.
1014 return
1015
1016 result = self.pool.map_async(
1017 time.sleep, [0.1 for i in range(10000)], chunksize=1
1018 )
1019 self.pool.terminate()
1020 join = TimingWrapper(self.pool.join)
1021 join()
1022 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001023#
1024# Test that manager has expected number of shared objects left
1025#
1026
1027class _TestZZZNumberOfObjects(BaseTestCase):
1028 # Because test cases are sorted alphabetically, this one will get
1029 # run after all the other tests for the manager. It tests that
1030 # there have been no "reference leaks" for the manager's shared
1031 # objects. Note the comment in _TestPool.test_terminate().
1032 ALLOWED_TYPES = ('manager',)
1033
1034 def test_number_of_objects(self):
1035 EXPECTED_NUMBER = 1 # the pool object is still alive
1036 multiprocessing.active_children() # discard dead process objs
1037 gc.collect() # do garbage collection
1038 refs = self.manager._number_of_objects()
1039 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001040 print(self.manager._debug_info())
Benjamin Petersone711caf2008-06-11 16:44:04 +00001041
1042 self.assertEqual(refs, EXPECTED_NUMBER)
1043
1044#
1045# Test of creating a customized manager class
1046#
1047
1048from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1049
1050class FooBar(object):
1051 def f(self):
1052 return 'f()'
1053 def g(self):
1054 raise ValueError
1055 def _h(self):
1056 return '_h()'
1057
1058def baz():
1059 for i in range(10):
1060 yield i*i
1061
1062class IteratorProxy(BaseProxy):
1063 _exposed_ = ('next', '__next__')
1064 def __iter__(self):
1065 return self
1066 def __next__(self):
1067 return self._callmethod('next')
1068 def __next__(self):
1069 return self._callmethod('__next__')
1070
1071class MyManager(BaseManager):
1072 pass
1073
1074MyManager.register('Foo', callable=FooBar)
1075MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1076MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1077
1078
1079class _TestMyManager(BaseTestCase):
1080
1081 ALLOWED_TYPES = ('manager',)
1082
1083 def test_mymanager(self):
1084 manager = MyManager()
1085 manager.start()
1086
1087 foo = manager.Foo()
1088 bar = manager.Bar()
1089 baz = manager.baz()
1090
1091 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1092 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1093
1094 self.assertEqual(foo_methods, ['f', 'g'])
1095 self.assertEqual(bar_methods, ['f', '_h'])
1096
1097 self.assertEqual(foo.f(), 'f()')
1098 self.assertRaises(ValueError, foo.g)
1099 self.assertEqual(foo._callmethod('f'), 'f()')
1100 self.assertRaises(RemoteError, foo._callmethod, '_h')
1101
1102 self.assertEqual(bar.f(), 'f()')
1103 self.assertEqual(bar._h(), '_h()')
1104 self.assertEqual(bar._callmethod('f'), 'f()')
1105 self.assertEqual(bar._callmethod('_h'), '_h()')
1106
1107 self.assertEqual(list(baz), [i*i for i in range(10)])
1108
1109 manager.shutdown()
1110
1111#
1112# Test of connecting to a remote server and using xmlrpclib for serialization
1113#
1114
1115_queue = pyqueue.Queue()
1116def get_queue():
1117 return _queue
1118
1119class QueueManager(BaseManager):
1120 '''manager class used by server process'''
1121QueueManager.register('get_queue', callable=get_queue)
1122
1123class QueueManager2(BaseManager):
1124 '''manager class which specifies the same interface as QueueManager'''
1125QueueManager2.register('get_queue')
1126
1127
1128SERIALIZER = 'xmlrpclib'
1129
1130class _TestRemoteManager(BaseTestCase):
1131
1132 ALLOWED_TYPES = ('manager',)
1133
1134 def _putter(self, address, authkey):
1135 manager = QueueManager2(
1136 address=address, authkey=authkey, serializer=SERIALIZER
1137 )
1138 manager.connect()
1139 queue = manager.get_queue()
1140 queue.put(('hello world', None, True, 2.25))
1141
1142 def test_remote(self):
1143 authkey = os.urandom(32)
1144
1145 manager = QueueManager(
1146 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1147 )
1148 manager.start()
1149
1150 p = self.Process(target=self._putter, args=(manager.address, authkey))
1151 p.start()
1152
1153 manager2 = QueueManager2(
1154 address=manager.address, authkey=authkey, serializer=SERIALIZER
1155 )
1156 manager2.connect()
1157 queue = manager2.get_queue()
1158
1159 # Note that xmlrpclib will deserialize object as a list not a tuple
1160 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1161
1162 # Because we are using xmlrpclib for serialization instead of
1163 # pickle this will cause a serialization error.
1164 self.assertRaises(Exception, queue.put, time.sleep)
1165
1166 # Make queue finalizer run before the server is stopped
1167 del queue
1168 manager.shutdown()
1169
1170#
1171#
1172#
1173
1174SENTINEL = latin('')
1175
1176class _TestConnection(BaseTestCase):
1177
1178 ALLOWED_TYPES = ('processes', 'threads')
1179
1180 def _echo(self, conn):
1181 for msg in iter(conn.recv_bytes, SENTINEL):
1182 conn.send_bytes(msg)
1183 conn.close()
1184
1185 def test_connection(self):
1186 conn, child_conn = self.Pipe()
1187
1188 p = self.Process(target=self._echo, args=(child_conn,))
1189 p.set_daemon(True)
1190 p.start()
1191
1192 seq = [1, 2.25, None]
1193 msg = latin('hello world')
1194 longmsg = msg * 10
1195 arr = array.array('i', list(range(4)))
1196
1197 if self.TYPE == 'processes':
1198 self.assertEqual(type(conn.fileno()), int)
1199
1200 self.assertEqual(conn.send(seq), None)
1201 self.assertEqual(conn.recv(), seq)
1202
1203 self.assertEqual(conn.send_bytes(msg), None)
1204 self.assertEqual(conn.recv_bytes(), msg)
1205
1206 if self.TYPE == 'processes':
1207 buffer = array.array('i', [0]*10)
1208 expected = list(arr) + [0] * (10 - len(arr))
1209 self.assertEqual(conn.send_bytes(arr), None)
1210 self.assertEqual(conn.recv_bytes_into(buffer),
1211 len(arr) * buffer.itemsize)
1212 self.assertEqual(list(buffer), expected)
1213
1214 buffer = array.array('i', [0]*10)
1215 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1216 self.assertEqual(conn.send_bytes(arr), None)
1217 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1218 len(arr) * buffer.itemsize)
1219 self.assertEqual(list(buffer), expected)
1220
1221 buffer = bytearray(latin(' ' * 40))
1222 self.assertEqual(conn.send_bytes(longmsg), None)
1223 try:
1224 res = conn.recv_bytes_into(buffer)
1225 except multiprocessing.BufferTooShort as e:
1226 self.assertEqual(e.args, (longmsg,))
1227 else:
1228 self.fail('expected BufferTooShort, got %s' % res)
1229
1230 poll = TimingWrapper(conn.poll)
1231
1232 self.assertEqual(poll(), False)
1233 self.assertTimingAlmostEqual(poll.elapsed, 0)
1234
1235 self.assertEqual(poll(TIMEOUT1), False)
1236 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1237
1238 conn.send(None)
1239
1240 self.assertEqual(poll(TIMEOUT1), True)
1241 self.assertTimingAlmostEqual(poll.elapsed, 0)
1242
1243 self.assertEqual(conn.recv(), None)
1244
1245 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1246 conn.send_bytes(really_big_msg)
1247 self.assertEqual(conn.recv_bytes(), really_big_msg)
1248
1249 conn.send_bytes(SENTINEL) # tell child to quit
1250 child_conn.close()
1251
1252 if self.TYPE == 'processes':
1253 self.assertEqual(conn.readable, True)
1254 self.assertEqual(conn.writable, True)
1255 self.assertRaises(EOFError, conn.recv)
1256 self.assertRaises(EOFError, conn.recv_bytes)
1257
1258 p.join()
1259
1260 def test_duplex_false(self):
1261 reader, writer = self.Pipe(duplex=False)
1262 self.assertEqual(writer.send(1), None)
1263 self.assertEqual(reader.recv(), 1)
1264 if self.TYPE == 'processes':
1265 self.assertEqual(reader.readable, True)
1266 self.assertEqual(reader.writable, False)
1267 self.assertEqual(writer.readable, False)
1268 self.assertEqual(writer.writable, True)
1269 self.assertRaises(IOError, reader.send, 2)
1270 self.assertRaises(IOError, writer.recv)
1271 self.assertRaises(IOError, writer.poll)
1272
1273 def test_spawn_close(self):
1274 # We test that a pipe connection can be closed by parent
1275 # process immediately after child is spawned. On Windows this
1276 # would have sometimes failed on old versions because
1277 # child_conn would be closed before the child got a chance to
1278 # duplicate it.
1279 conn, child_conn = self.Pipe()
1280
1281 p = self.Process(target=self._echo, args=(child_conn,))
1282 p.start()
1283 child_conn.close() # this might complete before child initializes
1284
1285 msg = latin('hello')
1286 conn.send_bytes(msg)
1287 self.assertEqual(conn.recv_bytes(), msg)
1288
1289 conn.send_bytes(SENTINEL)
1290 conn.close()
1291 p.join()
1292
1293 def test_sendbytes(self):
1294 if self.TYPE != 'processes':
1295 return
1296
1297 msg = latin('abcdefghijklmnopqrstuvwxyz')
1298 a, b = self.Pipe()
1299
1300 a.send_bytes(msg)
1301 self.assertEqual(b.recv_bytes(), msg)
1302
1303 a.send_bytes(msg, 5)
1304 self.assertEqual(b.recv_bytes(), msg[5:])
1305
1306 a.send_bytes(msg, 7, 8)
1307 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1308
1309 a.send_bytes(msg, 26)
1310 self.assertEqual(b.recv_bytes(), latin(''))
1311
1312 a.send_bytes(msg, 26, 0)
1313 self.assertEqual(b.recv_bytes(), latin(''))
1314
1315 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1316
1317 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1318
1319 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1320
1321 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1322
1323 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1324
Benjamin Petersone711caf2008-06-11 16:44:04 +00001325class _TestListenerClient(BaseTestCase):
1326
1327 ALLOWED_TYPES = ('processes', 'threads')
1328
1329 def _test(self, address):
1330 conn = self.connection.Client(address)
1331 conn.send('hello')
1332 conn.close()
1333
1334 def test_listener_client(self):
1335 for family in self.connection.families:
1336 l = self.connection.Listener(family=family)
1337 p = self.Process(target=self._test, args=(l.address,))
1338 p.set_daemon(True)
1339 p.start()
1340 conn = l.accept()
1341 self.assertEqual(conn.recv(), 'hello')
1342 p.join()
1343 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001344#
1345# Test of sending connection and socket objects between processes
1346#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001347"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001348class _TestPicklingConnections(BaseTestCase):
1349
1350 ALLOWED_TYPES = ('processes',)
1351
1352 def _listener(self, conn, families):
1353 for fam in families:
1354 l = self.connection.Listener(family=fam)
1355 conn.send(l.address)
1356 new_conn = l.accept()
1357 conn.send(new_conn)
1358
1359 if self.TYPE == 'processes':
1360 l = socket.socket()
1361 l.bind(('localhost', 0))
1362 conn.send(l.getsockname())
1363 l.listen(1)
1364 new_conn, addr = l.accept()
1365 conn.send(new_conn)
1366
1367 conn.recv()
1368
1369 def _remote(self, conn):
1370 for (address, msg) in iter(conn.recv, None):
1371 client = self.connection.Client(address)
1372 client.send(msg.upper())
1373 client.close()
1374
1375 if self.TYPE == 'processes':
1376 address, msg = conn.recv()
1377 client = socket.socket()
1378 client.connect(address)
1379 client.sendall(msg.upper())
1380 client.close()
1381
1382 conn.close()
1383
1384 def test_pickling(self):
1385 try:
1386 multiprocessing.allow_connection_pickling()
1387 except ImportError:
1388 return
1389
1390 families = self.connection.families
1391
1392 lconn, lconn0 = self.Pipe()
1393 lp = self.Process(target=self._listener, args=(lconn0, families))
1394 lp.start()
1395 lconn0.close()
1396
1397 rconn, rconn0 = self.Pipe()
1398 rp = self.Process(target=self._remote, args=(rconn0,))
1399 rp.start()
1400 rconn0.close()
1401
1402 for fam in families:
1403 msg = ('This connection uses family %s' % fam).encode('ascii')
1404 address = lconn.recv()
1405 rconn.send((address, msg))
1406 new_conn = lconn.recv()
1407 self.assertEqual(new_conn.recv(), msg.upper())
1408
1409 rconn.send(None)
1410
1411 if self.TYPE == 'processes':
1412 msg = latin('This connection uses a normal socket')
1413 address = lconn.recv()
1414 rconn.send((address, msg))
1415 if hasattr(socket, 'fromfd'):
1416 new_conn = lconn.recv()
1417 self.assertEqual(new_conn.recv(100), msg.upper())
1418 else:
1419 # XXX On Windows with Py2.6 need to backport fromfd()
1420 discard = lconn.recv_bytes()
1421
1422 lconn.send(None)
1423
1424 rconn.close()
1425 lconn.close()
1426
1427 lp.join()
1428 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001429"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001430#
1431#
1432#
1433
1434class _TestHeap(BaseTestCase):
1435
1436 ALLOWED_TYPES = ('processes',)
1437
1438 def test_heap(self):
1439 iterations = 5000
1440 maxblocks = 50
1441 blocks = []
1442
1443 # create and destroy lots of blocks of different sizes
1444 for i in range(iterations):
1445 size = int(random.lognormvariate(0, 1) * 1000)
1446 b = multiprocessing.heap.BufferWrapper(size)
1447 blocks.append(b)
1448 if len(blocks) > maxblocks:
1449 i = random.randrange(maxblocks)
1450 del blocks[i]
1451
1452 # get the heap object
1453 heap = multiprocessing.heap.BufferWrapper._heap
1454
1455 # verify the state of the heap
1456 all = []
1457 occupied = 0
1458 for L in list(heap._len_to_seq.values()):
1459 for arena, start, stop in L:
1460 all.append((heap._arenas.index(arena), start, stop,
1461 stop-start, 'free'))
1462 for arena, start, stop in heap._allocated_blocks:
1463 all.append((heap._arenas.index(arena), start, stop,
1464 stop-start, 'occupied'))
1465 occupied += (stop-start)
1466
1467 all.sort()
1468
1469 for i in range(len(all)-1):
1470 (arena, start, stop) = all[i][:3]
1471 (narena, nstart, nstop) = all[i+1][:3]
1472 self.assertTrue((arena != narena and nstart == 0) or
1473 (stop == nstart))
1474
1475#
1476#
1477#
1478
1479try:
1480 from ctypes import Structure, Value, copy, c_int, c_double
1481except ImportError:
1482 Structure = object
1483 c_int = c_double = None
1484
1485class _Foo(Structure):
1486 _fields_ = [
1487 ('x', c_int),
1488 ('y', c_double)
1489 ]
1490
1491class _TestSharedCTypes(BaseTestCase):
1492
1493 ALLOWED_TYPES = ('processes',)
1494
1495 def _double(self, x, y, foo, arr, string):
1496 x.value *= 2
1497 y.value *= 2
1498 foo.x *= 2
1499 foo.y *= 2
1500 string.value *= 2
1501 for i in range(len(arr)):
1502 arr[i] *= 2
1503
1504 def test_sharedctypes(self, lock=False):
1505 if c_int is None:
1506 return
1507
1508 x = Value('i', 7, lock=lock)
1509 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1510 foo = Value(_Foo, 3, 2, lock=lock)
1511 arr = Array('d', list(range(10)), lock=lock)
1512 string = Array('c', 20, lock=lock)
1513 string.value = 'hello'
1514
1515 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1516 p.start()
1517 p.join()
1518
1519 self.assertEqual(x.value, 14)
1520 self.assertAlmostEqual(y.value, 2.0/3.0)
1521 self.assertEqual(foo.x, 6)
1522 self.assertAlmostEqual(foo.y, 4.0)
1523 for i in range(10):
1524 self.assertAlmostEqual(arr[i], i*2)
1525 self.assertEqual(string.value, latin('hellohello'))
1526
1527 def test_synchronize(self):
1528 self.test_sharedctypes(lock=True)
1529
1530 def test_copy(self):
1531 if c_int is None:
1532 return
1533
1534 foo = _Foo(2, 5.0)
1535 bar = copy(foo)
1536 foo.x = 0
1537 foo.y = 0
1538 self.assertEqual(bar.x, 2)
1539 self.assertAlmostEqual(bar.y, 5.0)
1540
1541#
1542#
1543#
1544
1545class _TestFinalize(BaseTestCase):
1546
1547 ALLOWED_TYPES = ('processes',)
1548
1549 def _test_finalize(self, conn):
1550 class Foo(object):
1551 pass
1552
1553 a = Foo()
1554 util.Finalize(a, conn.send, args=('a',))
1555 del a # triggers callback for a
1556
1557 b = Foo()
1558 close_b = util.Finalize(b, conn.send, args=('b',))
1559 close_b() # triggers callback for b
1560 close_b() # does nothing because callback has already been called
1561 del b # does nothing because callback has already been called
1562
1563 c = Foo()
1564 util.Finalize(c, conn.send, args=('c',))
1565
1566 d10 = Foo()
1567 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1568
1569 d01 = Foo()
1570 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1571 d02 = Foo()
1572 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1573 d03 = Foo()
1574 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1575
1576 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1577
1578 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1579
1580 # call mutliprocessing's cleanup function then exit process without
1581 # garbage collecting locals
1582 util._exit_function()
1583 conn.close()
1584 os._exit(0)
1585
1586 def test_finalize(self):
1587 conn, child_conn = self.Pipe()
1588
1589 p = self.Process(target=self._test_finalize, args=(child_conn,))
1590 p.start()
1591 p.join()
1592
1593 result = [obj for obj in iter(conn.recv, 'STOP')]
1594 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1595
1596#
1597# Test that from ... import * works for each module
1598#
1599
1600class _TestImportStar(BaseTestCase):
1601
1602 ALLOWED_TYPES = ('processes',)
1603
1604 def test_import(self):
1605 modules = (
1606 'multiprocessing', 'multiprocessing.connection',
1607 'multiprocessing.heap', 'multiprocessing.managers',
1608 'multiprocessing.pool', 'multiprocessing.process',
1609 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1610 'multiprocessing.synchronize', 'multiprocessing.util'
1611 )
1612
1613 for name in modules:
1614 __import__(name)
1615 mod = sys.modules[name]
1616
1617 for attr in getattr(mod, '__all__', ()):
1618 self.assertTrue(
1619 hasattr(mod, attr),
1620 '%r does not have attribute %r' % (mod, attr)
1621 )
1622
1623#
1624# Quick test that logging works -- does not test logging output
1625#
1626
1627class _TestLogging(BaseTestCase):
1628
1629 ALLOWED_TYPES = ('processes',)
1630
1631 def test_enable_logging(self):
1632 logger = multiprocessing.get_logger()
1633 logger.setLevel(util.SUBWARNING)
1634 self.assertTrue(logger is not None)
1635 logger.debug('this will not be printed')
1636 logger.info('nor will this')
1637 logger.setLevel(LOG_LEVEL)
1638
1639 def _test_level(self, conn):
1640 logger = multiprocessing.get_logger()
1641 conn.send(logger.getEffectiveLevel())
1642
1643 def test_level(self):
1644 LEVEL1 = 32
1645 LEVEL2 = 37
1646
1647 logger = multiprocessing.get_logger()
1648 root_logger = logging.getLogger()
1649 root_level = root_logger.level
1650
1651 reader, writer = multiprocessing.Pipe(duplex=False)
1652
1653 logger.setLevel(LEVEL1)
1654 self.Process(target=self._test_level, args=(writer,)).start()
1655 self.assertEqual(LEVEL1, reader.recv())
1656
1657 logger.setLevel(logging.NOTSET)
1658 root_logger.setLevel(LEVEL2)
1659 self.Process(target=self._test_level, args=(writer,)).start()
1660 self.assertEqual(LEVEL2, reader.recv())
1661
1662 root_logger.setLevel(root_level)
1663 logger.setLevel(level=LOG_LEVEL)
1664
1665#
1666# Functions used to create test cases from the base ones in this module
1667#
1668
1669def get_attributes(Source, names):
1670 d = {}
1671 for name in names:
1672 obj = getattr(Source, name)
1673 if type(obj) == type(get_attributes):
1674 obj = staticmethod(obj)
1675 d[name] = obj
1676 return d
1677
1678def create_test_cases(Mixin, type):
1679 result = {}
1680 glob = globals()
1681 Type = type[0].upper() + type[1:]
1682
1683 for name in list(glob.keys()):
1684 if name.startswith('_Test'):
1685 base = glob[name]
1686 if type in base.ALLOWED_TYPES:
1687 newname = 'With' + Type + name[1:]
1688 class Temp(base, unittest.TestCase, Mixin):
1689 pass
1690 result[newname] = Temp
1691 Temp.__name__ = newname
1692 Temp.__module__ = Mixin.__module__
1693 return result
1694
1695#
1696# Create test cases
1697#
1698
1699class ProcessesMixin(object):
1700 TYPE = 'processes'
1701 Process = multiprocessing.Process
1702 locals().update(get_attributes(multiprocessing, (
1703 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1704 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1705 'RawArray', 'current_process', 'active_children', 'Pipe',
1706 'connection', 'JoinableQueue'
1707 )))
1708
1709testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1710globals().update(testcases_processes)
1711
1712
1713class ManagerMixin(object):
1714 TYPE = 'manager'
1715 Process = multiprocessing.Process
1716 manager = object.__new__(multiprocessing.managers.SyncManager)
1717 locals().update(get_attributes(manager, (
1718 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1719 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1720 'Namespace', 'JoinableQueue'
1721 )))
1722
1723testcases_manager = create_test_cases(ManagerMixin, type='manager')
1724globals().update(testcases_manager)
1725
1726
1727class ThreadsMixin(object):
1728 TYPE = 'threads'
1729 Process = multiprocessing.dummy.Process
1730 locals().update(get_attributes(multiprocessing.dummy, (
1731 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1732 'Condition', 'Event', 'Value', 'Array', 'current_process',
1733 'active_children', 'Pipe', 'connection', 'dict', 'list',
1734 'Namespace', 'JoinableQueue'
1735 )))
1736
1737testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1738globals().update(testcases_threads)
1739
1740#
1741#
1742#
1743
1744def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001745 if sys.platform.startswith("linux"):
1746 try:
1747 lock = multiprocessing.RLock()
1748 except OSError:
Amaury Forgeot d'Arc620626b2008-06-19 22:03:50 +00001749 from test.support import TestSkipped
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001750 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001751
Benjamin Petersone711caf2008-06-11 16:44:04 +00001752 if run is None:
1753 from test.support import run_unittest as run
1754
1755 util.get_temp_dir() # creates temp directory for use by all processes
1756
1757 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1758
Benjamin Peterson41181742008-07-02 20:22:54 +00001759 ProcessesMixin.pool = multiprocessing.Pool(4)
1760 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1761 ManagerMixin.manager.__init__()
1762 ManagerMixin.manager.start()
1763 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001764
1765 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001766 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1767 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1768 sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001769 )
1770
1771 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1772 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1773 run(suite)
1774
Benjamin Peterson41181742008-07-02 20:22:54 +00001775 ThreadsMixin.pool.terminate()
1776 ProcessesMixin.pool.terminate()
1777 ManagerMixin.pool.terminate()
1778 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001779
Benjamin Peterson41181742008-07-02 20:22:54 +00001780 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001781
1782def main():
1783 test_main(unittest.TextTestRunner(verbosity=2).run)
1784
1785if __name__ == '__main__':
1786 main()