blob: 29323a5e97d03c2695b0dd43443917f19ffbd4dc [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
Jesse Noller37040cd2008-09-30 00:15:45 +000021
22# Work around broken sem_open implementations
23try:
24 import multiprocessing.synchronize
25except ImportError, e:
26 from test.test_support import TestSkipped
27 raise TestSkipped(e)
28
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
34import _multiprocessing
35
36from multiprocessing import util
37
38#
39#
40#
41
Benjamin Petersone79edf52008-07-13 18:34:58 +000042latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Benjamin Petersondfd79492008-06-13 19:13:39 +000044#
45# Constants
46#
47
48LOG_LEVEL = util.SUBWARNING
49#LOG_LEVEL = logging.WARNING
50
51DELTA = 0.1
52CHECK_TIMINGS = False # making true makes tests take a lot longer
53 # and can sometimes cause some non-serious
54 # failures because some calls block a bit
55 # longer than expected
56if CHECK_TIMINGS:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
58else:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
60
61HAVE_GETVALUE = not getattr(_multiprocessing,
62 'HAVE_BROKEN_SEM_GETVALUE', False)
63
Jesse Nollere6bab482009-03-30 16:11:16 +000064WIN32 = (sys.platform == "win32")
65
Benjamin Petersondfd79492008-06-13 19:13:39 +000066#
67# Creates a wrapper for a function which records the time it takes to finish
68#
69
70class TimingWrapper(object):
71
72 def __init__(self, func):
73 self.func = func
74 self.elapsed = None
75
76 def __call__(self, *args, **kwds):
77 t = time.time()
78 try:
79 return self.func(*args, **kwds)
80 finally:
81 self.elapsed = time.time() - t
82
83#
84# Base class for test cases
85#
86
87class BaseTestCase(object):
88
89 ALLOWED_TYPES = ('processes', 'manager', 'threads')
90
91 def assertTimingAlmostEqual(self, a, b):
92 if CHECK_TIMINGS:
93 self.assertAlmostEqual(a, b, 1)
94
95 def assertReturnsIfImplemented(self, value, func, *args):
96 try:
97 res = func(*args)
98 except NotImplementedError:
99 pass
100 else:
101 return self.assertEqual(value, res)
102
103#
104# Return the value of a semaphore
105#
106
107def get_value(self):
108 try:
109 return self.get_value()
110 except AttributeError:
111 try:
112 return self._Semaphore__value
113 except AttributeError:
114 try:
115 return self._value
116 except AttributeError:
117 raise NotImplementedError
118
119#
120# Testcases
121#
122
123class _TestProcess(BaseTestCase):
124
125 ALLOWED_TYPES = ('processes', 'threads')
126
127 def test_current(self):
128 if self.TYPE == 'threads':
129 return
130
131 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000132 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000133
134 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000135 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000136 self.assertTrue(isinstance(authkey, bytes))
137 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000138 self.assertEqual(current.ident, os.getpid())
139 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000140
141 def _test(self, q, *args, **kwds):
142 current = self.current_process()
143 q.put(args)
144 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000145 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000146 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000147 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000148 q.put(current.pid)
149
150 def test_process(self):
151 q = self.Queue(1)
152 e = self.Event()
153 args = (q, 1, 2)
154 kwargs = {'hello':23, 'bye':2.54}
155 name = 'SomeProcess'
156 p = self.Process(
157 target=self._test, args=args, kwargs=kwargs, name=name
158 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000159 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000160 current = self.current_process()
161
162 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000163 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000164 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000165 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 self.assertTrue(p not in self.active_children())
167 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000169
170 p.start()
171
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000172 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000173 self.assertEquals(p.is_alive(), True)
174 self.assertTrue(p in self.active_children())
175
176 self.assertEquals(q.get(), args[1:])
177 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000178 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000180 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000181 self.assertEquals(q.get(), p.pid)
182
183 p.join()
184
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000185 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000186 self.assertEquals(p.is_alive(), False)
187 self.assertTrue(p not in self.active_children())
188
189 def _test_terminate(self):
190 time.sleep(1000)
191
192 def test_terminate(self):
193 if self.TYPE == 'threads':
194 return
195
196 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000197 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198 p.start()
199
200 self.assertEqual(p.is_alive(), True)
201 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203
204 p.terminate()
205
206 join = TimingWrapper(p.join)
207 self.assertEqual(join(), None)
208 self.assertTimingAlmostEqual(join.elapsed, 0.0)
209
210 self.assertEqual(p.is_alive(), False)
211 self.assertTrue(p not in self.active_children())
212
213 p.join()
214
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 # XXX sometimes get p.exitcode == 0 on Windows ...
216 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000217
218 def test_cpu_count(self):
219 try:
220 cpus = multiprocessing.cpu_count()
221 except NotImplementedError:
222 cpus = 1
223 self.assertTrue(type(cpus) is int)
224 self.assertTrue(cpus >= 1)
225
226 def test_active_children(self):
227 self.assertEqual(type(self.active_children()), list)
228
229 p = self.Process(target=time.sleep, args=(DELTA,))
230 self.assertTrue(p not in self.active_children())
231
232 p.start()
233 self.assertTrue(p in self.active_children())
234
235 p.join()
236 self.assertTrue(p not in self.active_children())
237
238 def _test_recursion(self, wconn, id):
239 from multiprocessing import forking
240 wconn.send(id)
241 if len(id) < 2:
242 for i in range(2):
243 p = self.Process(
244 target=self._test_recursion, args=(wconn, id+[i])
245 )
246 p.start()
247 p.join()
248
249 def test_recursion(self):
250 rconn, wconn = self.Pipe(duplex=False)
251 self._test_recursion(wconn, [])
252
253 time.sleep(DELTA)
254 result = []
255 while rconn.poll():
256 result.append(rconn.recv())
257
258 expected = [
259 [],
260 [0],
261 [0, 0],
262 [0, 1],
263 [1],
264 [1, 0],
265 [1, 1]
266 ]
267 self.assertEqual(result, expected)
268
269#
270#
271#
272
273class _UpperCaser(multiprocessing.Process):
274
275 def __init__(self):
276 multiprocessing.Process.__init__(self)
277 self.child_conn, self.parent_conn = multiprocessing.Pipe()
278
279 def run(self):
280 self.parent_conn.close()
281 for s in iter(self.child_conn.recv, None):
282 self.child_conn.send(s.upper())
283 self.child_conn.close()
284
285 def submit(self, s):
286 assert type(s) is str
287 self.parent_conn.send(s)
288 return self.parent_conn.recv()
289
290 def stop(self):
291 self.parent_conn.send(None)
292 self.parent_conn.close()
293 self.child_conn.close()
294
295class _TestSubclassingProcess(BaseTestCase):
296
297 ALLOWED_TYPES = ('processes',)
298
299 def test_subclassing(self):
300 uppercaser = _UpperCaser()
301 uppercaser.start()
302 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
303 self.assertEqual(uppercaser.submit('world'), 'WORLD')
304 uppercaser.stop()
305 uppercaser.join()
306
307#
308#
309#
310
311def queue_empty(q):
312 if hasattr(q, 'empty'):
313 return q.empty()
314 else:
315 return q.qsize() == 0
316
317def queue_full(q, maxsize):
318 if hasattr(q, 'full'):
319 return q.full()
320 else:
321 return q.qsize() == maxsize
322
323
324class _TestQueue(BaseTestCase):
325
326
327 def _test_put(self, queue, child_can_start, parent_can_continue):
328 child_can_start.wait()
329 for i in range(6):
330 queue.get()
331 parent_can_continue.set()
332
333 def test_put(self):
334 MAXSIZE = 6
335 queue = self.Queue(maxsize=MAXSIZE)
336 child_can_start = self.Event()
337 parent_can_continue = self.Event()
338
339 proc = self.Process(
340 target=self._test_put,
341 args=(queue, child_can_start, parent_can_continue)
342 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000343 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000344 proc.start()
345
346 self.assertEqual(queue_empty(queue), True)
347 self.assertEqual(queue_full(queue, MAXSIZE), False)
348
349 queue.put(1)
350 queue.put(2, True)
351 queue.put(3, True, None)
352 queue.put(4, False)
353 queue.put(5, False, None)
354 queue.put_nowait(6)
355
356 # the values may be in buffer but not yet in pipe so sleep a bit
357 time.sleep(DELTA)
358
359 self.assertEqual(queue_empty(queue), False)
360 self.assertEqual(queue_full(queue, MAXSIZE), True)
361
362 put = TimingWrapper(queue.put)
363 put_nowait = TimingWrapper(queue.put_nowait)
364
365 self.assertRaises(Queue.Full, put, 7, False)
366 self.assertTimingAlmostEqual(put.elapsed, 0)
367
368 self.assertRaises(Queue.Full, put, 7, False, None)
369 self.assertTimingAlmostEqual(put.elapsed, 0)
370
371 self.assertRaises(Queue.Full, put_nowait, 7)
372 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
373
374 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
375 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
376
377 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
378 self.assertTimingAlmostEqual(put.elapsed, 0)
379
380 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
381 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
382
383 child_can_start.set()
384 parent_can_continue.wait()
385
386 self.assertEqual(queue_empty(queue), True)
387 self.assertEqual(queue_full(queue, MAXSIZE), False)
388
389 proc.join()
390
391 def _test_get(self, queue, child_can_start, parent_can_continue):
392 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000393 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000394 queue.put(2)
395 queue.put(3)
396 queue.put(4)
397 queue.put(5)
398 parent_can_continue.set()
399
400 def test_get(self):
401 queue = self.Queue()
402 child_can_start = self.Event()
403 parent_can_continue = self.Event()
404
405 proc = self.Process(
406 target=self._test_get,
407 args=(queue, child_can_start, parent_can_continue)
408 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000409 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000410 proc.start()
411
412 self.assertEqual(queue_empty(queue), True)
413
414 child_can_start.set()
415 parent_can_continue.wait()
416
417 time.sleep(DELTA)
418 self.assertEqual(queue_empty(queue), False)
419
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000420 # Hangs unexpectedly, remove for now
421 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000422 self.assertEqual(queue.get(True, None), 2)
423 self.assertEqual(queue.get(True), 3)
424 self.assertEqual(queue.get(timeout=1), 4)
425 self.assertEqual(queue.get_nowait(), 5)
426
427 self.assertEqual(queue_empty(queue), True)
428
429 get = TimingWrapper(queue.get)
430 get_nowait = TimingWrapper(queue.get_nowait)
431
432 self.assertRaises(Queue.Empty, get, False)
433 self.assertTimingAlmostEqual(get.elapsed, 0)
434
435 self.assertRaises(Queue.Empty, get, False, None)
436 self.assertTimingAlmostEqual(get.elapsed, 0)
437
438 self.assertRaises(Queue.Empty, get_nowait)
439 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
440
441 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
442 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
443
444 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
445 self.assertTimingAlmostEqual(get.elapsed, 0)
446
447 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
448 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
449
450 proc.join()
451
452 def _test_fork(self, queue):
453 for i in range(10, 20):
454 queue.put(i)
455 # note that at this point the items may only be buffered, so the
456 # process cannot shutdown until the feeder thread has finished
457 # pushing items onto the pipe.
458
459 def test_fork(self):
460 # Old versions of Queue would fail to create a new feeder
461 # thread for a forked process if the original process had its
462 # own feeder thread. This test checks that this no longer
463 # happens.
464
465 queue = self.Queue()
466
467 # put items on queue so that main process starts a feeder thread
468 for i in range(10):
469 queue.put(i)
470
471 # wait to make sure thread starts before we fork a new process
472 time.sleep(DELTA)
473
474 # fork process
475 p = self.Process(target=self._test_fork, args=(queue,))
476 p.start()
477
478 # check that all expected items are in the queue
479 for i in range(20):
480 self.assertEqual(queue.get(), i)
481 self.assertRaises(Queue.Empty, queue.get, False)
482
483 p.join()
484
485 def test_qsize(self):
486 q = self.Queue()
487 try:
488 self.assertEqual(q.qsize(), 0)
489 except NotImplementedError:
490 return
491 q.put(1)
492 self.assertEqual(q.qsize(), 1)
493 q.put(5)
494 self.assertEqual(q.qsize(), 2)
495 q.get()
496 self.assertEqual(q.qsize(), 1)
497 q.get()
498 self.assertEqual(q.qsize(), 0)
499
500 def _test_task_done(self, q):
501 for obj in iter(q.get, None):
502 time.sleep(DELTA)
503 q.task_done()
504
505 def test_task_done(self):
506 queue = self.JoinableQueue()
507
508 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
509 return
510
511 workers = [self.Process(target=self._test_task_done, args=(queue,))
512 for i in xrange(4)]
513
514 for p in workers:
515 p.start()
516
517 for i in xrange(10):
518 queue.put(i)
519
520 queue.join()
521
522 for p in workers:
523 queue.put(None)
524
525 for p in workers:
526 p.join()
527
528#
529#
530#
531
532class _TestLock(BaseTestCase):
533
534 def test_lock(self):
535 lock = self.Lock()
536 self.assertEqual(lock.acquire(), True)
537 self.assertEqual(lock.acquire(False), False)
538 self.assertEqual(lock.release(), None)
539 self.assertRaises((ValueError, threading.ThreadError), lock.release)
540
541 def test_rlock(self):
542 lock = self.RLock()
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertEqual(lock.release(), None)
549 self.assertRaises((AssertionError, RuntimeError), lock.release)
550
Jesse Nollerf0d21c72009-03-30 23:38:36 +0000551 def test_lock_context(self):
552 with self.Lock():
553 pass
554
Benjamin Petersondfd79492008-06-13 19:13:39 +0000555
556class _TestSemaphore(BaseTestCase):
557
558 def _test_semaphore(self, sem):
559 self.assertReturnsIfImplemented(2, get_value, sem)
560 self.assertEqual(sem.acquire(), True)
561 self.assertReturnsIfImplemented(1, get_value, sem)
562 self.assertEqual(sem.acquire(), True)
563 self.assertReturnsIfImplemented(0, get_value, sem)
564 self.assertEqual(sem.acquire(False), False)
565 self.assertReturnsIfImplemented(0, get_value, sem)
566 self.assertEqual(sem.release(), None)
567 self.assertReturnsIfImplemented(1, get_value, sem)
568 self.assertEqual(sem.release(), None)
569 self.assertReturnsIfImplemented(2, get_value, sem)
570
571 def test_semaphore(self):
572 sem = self.Semaphore(2)
573 self._test_semaphore(sem)
574 self.assertEqual(sem.release(), None)
575 self.assertReturnsIfImplemented(3, get_value, sem)
576 self.assertEqual(sem.release(), None)
577 self.assertReturnsIfImplemented(4, get_value, sem)
578
579 def test_bounded_semaphore(self):
580 sem = self.BoundedSemaphore(2)
581 self._test_semaphore(sem)
582 # Currently fails on OS/X
583 #if HAVE_GETVALUE:
584 # self.assertRaises(ValueError, sem.release)
585 # self.assertReturnsIfImplemented(2, get_value, sem)
586
587 def test_timeout(self):
588 if self.TYPE != 'processes':
589 return
590
591 sem = self.Semaphore(0)
592 acquire = TimingWrapper(sem.acquire)
593
594 self.assertEqual(acquire(False), False)
595 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
596
597 self.assertEqual(acquire(False, None), False)
598 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
599
600 self.assertEqual(acquire(False, TIMEOUT1), False)
601 self.assertTimingAlmostEqual(acquire.elapsed, 0)
602
603 self.assertEqual(acquire(True, TIMEOUT2), False)
604 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
605
606 self.assertEqual(acquire(timeout=TIMEOUT3), False)
607 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
608
609
610class _TestCondition(BaseTestCase):
611
612 def f(self, cond, sleeping, woken, timeout=None):
613 cond.acquire()
614 sleeping.release()
615 cond.wait(timeout)
616 woken.release()
617 cond.release()
618
619 def check_invariant(self, cond):
620 # this is only supposed to succeed when there are no sleepers
621 if self.TYPE == 'processes':
622 try:
623 sleepers = (cond._sleeping_count.get_value() -
624 cond._woken_count.get_value())
625 self.assertEqual(sleepers, 0)
626 self.assertEqual(cond._wait_semaphore.get_value(), 0)
627 except NotImplementedError:
628 pass
629
630 def test_notify(self):
631 cond = self.Condition()
632 sleeping = self.Semaphore(0)
633 woken = self.Semaphore(0)
634
635 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000636 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000637 p.start()
638
639 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000640 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000641 p.start()
642
643 # wait for both children to start sleeping
644 sleeping.acquire()
645 sleeping.acquire()
646
647 # check no process/thread has woken up
648 time.sleep(DELTA)
649 self.assertReturnsIfImplemented(0, get_value, woken)
650
651 # wake up one process/thread
652 cond.acquire()
653 cond.notify()
654 cond.release()
655
656 # check one process/thread has woken up
657 time.sleep(DELTA)
658 self.assertReturnsIfImplemented(1, get_value, woken)
659
660 # wake up another
661 cond.acquire()
662 cond.notify()
663 cond.release()
664
665 # check other has woken up
666 time.sleep(DELTA)
667 self.assertReturnsIfImplemented(2, get_value, woken)
668
669 # check state is not mucked up
670 self.check_invariant(cond)
671 p.join()
672
673 def test_notify_all(self):
674 cond = self.Condition()
675 sleeping = self.Semaphore(0)
676 woken = self.Semaphore(0)
677
678 # start some threads/processes which will timeout
679 for i in range(3):
680 p = self.Process(target=self.f,
681 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000682 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000683 p.start()
684
685 t = threading.Thread(target=self.f,
686 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000687 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000688 t.start()
689
690 # wait for them all to sleep
691 for i in xrange(6):
692 sleeping.acquire()
693
694 # check they have all timed out
695 for i in xrange(6):
696 woken.acquire()
697 self.assertReturnsIfImplemented(0, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701
702 # start some more threads/processes
703 for i in range(3):
704 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000705 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000706 p.start()
707
708 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000709 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000710 t.start()
711
712 # wait for them to all sleep
713 for i in xrange(6):
714 sleeping.acquire()
715
716 # check no process/thread has woken up
717 time.sleep(DELTA)
718 self.assertReturnsIfImplemented(0, get_value, woken)
719
720 # wake them all up
721 cond.acquire()
722 cond.notify_all()
723 cond.release()
724
725 # check they have all woken
726 time.sleep(DELTA)
727 self.assertReturnsIfImplemented(6, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 def test_timeout(self):
733 cond = self.Condition()
734 wait = TimingWrapper(cond.wait)
735 cond.acquire()
736 res = wait(TIMEOUT1)
737 cond.release()
738 self.assertEqual(res, None)
739 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
740
741
742class _TestEvent(BaseTestCase):
743
744 def _test_event(self, event):
745 time.sleep(TIMEOUT2)
746 event.set()
747
748 def test_event(self):
749 event = self.Event()
750 wait = TimingWrapper(event.wait)
751
752 # Removed temporaily, due to API shear, this does not
753 # work with threading._Event objects. is_set == isSet
754 #self.assertEqual(event.is_set(), False)
755
756 self.assertEqual(wait(0.0), None)
757 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
758 self.assertEqual(wait(TIMEOUT1), None)
759 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
760
761 event.set()
762
763 # See note above on the API differences
764 # self.assertEqual(event.is_set(), True)
765 self.assertEqual(wait(), None)
766 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
767 self.assertEqual(wait(TIMEOUT1), None)
768 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
769 # self.assertEqual(event.is_set(), True)
770
771 event.clear()
772
773 #self.assertEqual(event.is_set(), False)
774
775 self.Process(target=self._test_event, args=(event,)).start()
776 self.assertEqual(wait(), None)
777
778#
779#
780#
781
782class _TestValue(BaseTestCase):
783
784 codes_values = [
785 ('i', 4343, 24234),
786 ('d', 3.625, -4.25),
787 ('h', -232, 234),
788 ('c', latin('x'), latin('y'))
789 ]
790
791 def _test(self, values):
792 for sv, cv in zip(values, self.codes_values):
793 sv.value = cv[2]
794
795
796 def test_value(self, raw=False):
797 if self.TYPE != 'processes':
798 return
799
800 if raw:
801 values = [self.RawValue(code, value)
802 for code, value, _ in self.codes_values]
803 else:
804 values = [self.Value(code, value)
805 for code, value, _ in self.codes_values]
806
807 for sv, cv in zip(values, self.codes_values):
808 self.assertEqual(sv.value, cv[1])
809
810 proc = self.Process(target=self._test, args=(values,))
811 proc.start()
812 proc.join()
813
814 for sv, cv in zip(values, self.codes_values):
815 self.assertEqual(sv.value, cv[2])
816
817 def test_rawvalue(self):
818 self.test_value(raw=True)
819
820 def test_getobj_getlock(self):
821 if self.TYPE != 'processes':
822 return
823
824 val1 = self.Value('i', 5)
825 lock1 = val1.get_lock()
826 obj1 = val1.get_obj()
827
828 val2 = self.Value('i', 5, lock=None)
829 lock2 = val2.get_lock()
830 obj2 = val2.get_obj()
831
832 lock = self.Lock()
833 val3 = self.Value('i', 5, lock=lock)
834 lock3 = val3.get_lock()
835 obj3 = val3.get_obj()
836 self.assertEqual(lock, lock3)
837
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000838 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000839 self.assertFalse(hasattr(arr4, 'get_lock'))
840 self.assertFalse(hasattr(arr4, 'get_obj'))
841
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000842 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
843
844 arr5 = self.RawValue('i', 5)
845 self.assertFalse(hasattr(arr5, 'get_lock'))
846 self.assertFalse(hasattr(arr5, 'get_obj'))
847
Benjamin Petersondfd79492008-06-13 19:13:39 +0000848
849class _TestArray(BaseTestCase):
850
851 def f(self, seq):
852 for i in range(1, len(seq)):
853 seq[i] += seq[i-1]
854
855 def test_array(self, raw=False):
856 if self.TYPE != 'processes':
857 return
858
859 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
860 if raw:
861 arr = self.RawArray('i', seq)
862 else:
863 arr = self.Array('i', seq)
864
865 self.assertEqual(len(arr), len(seq))
866 self.assertEqual(arr[3], seq[3])
867 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
868
869 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
870
871 self.assertEqual(list(arr[:]), seq)
872
873 self.f(seq)
874
875 p = self.Process(target=self.f, args=(arr,))
876 p.start()
877 p.join()
878
879 self.assertEqual(list(arr[:]), seq)
880
881 def test_rawarray(self):
882 self.test_array(raw=True)
883
884 def test_getobj_getlock_obj(self):
885 if self.TYPE != 'processes':
886 return
887
888 arr1 = self.Array('i', range(10))
889 lock1 = arr1.get_lock()
890 obj1 = arr1.get_obj()
891
892 arr2 = self.Array('i', range(10), lock=None)
893 lock2 = arr2.get_lock()
894 obj2 = arr2.get_obj()
895
896 lock = self.Lock()
897 arr3 = self.Array('i', range(10), lock=lock)
898 lock3 = arr3.get_lock()
899 obj3 = arr3.get_obj()
900 self.assertEqual(lock, lock3)
901
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000902 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000903 self.assertFalse(hasattr(arr4, 'get_lock'))
904 self.assertFalse(hasattr(arr4, 'get_obj'))
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000905 self.assertRaises(AttributeError,
906 self.Array, 'i', range(10), lock='notalock')
907
908 arr5 = self.RawArray('i', range(10))
909 self.assertFalse(hasattr(arr5, 'get_lock'))
910 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000911
912#
913#
914#
915
916class _TestContainers(BaseTestCase):
917
918 ALLOWED_TYPES = ('manager',)
919
920 def test_list(self):
921 a = self.list(range(10))
922 self.assertEqual(a[:], range(10))
923
924 b = self.list()
925 self.assertEqual(b[:], [])
926
927 b.extend(range(5))
928 self.assertEqual(b[:], range(5))
929
930 self.assertEqual(b[2], 2)
931 self.assertEqual(b[2:10], [2,3,4])
932
933 b *= 2
934 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
935
936 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
937
938 self.assertEqual(a[:], range(10))
939
940 d = [a, b]
941 e = self.list(d)
942 self.assertEqual(
943 e[:],
944 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
945 )
946
947 f = self.list([a])
948 a.append('hello')
949 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
950
951 def test_dict(self):
952 d = self.dict()
953 indices = range(65, 70)
954 for i in indices:
955 d[i] = chr(i)
956 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
957 self.assertEqual(sorted(d.keys()), indices)
958 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
959 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
960
961 def test_namespace(self):
962 n = self.Namespace()
963 n.name = 'Bob'
964 n.job = 'Builder'
965 n._hidden = 'hidden'
966 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
967 del n.job
968 self.assertEqual(str(n), "Namespace(name='Bob')")
969 self.assertTrue(hasattr(n, 'name'))
970 self.assertTrue(not hasattr(n, 'job'))
971
972#
973#
974#
975
976def sqr(x, wait=0.0):
977 time.sleep(wait)
978 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000979class _TestPool(BaseTestCase):
980
981 def test_apply(self):
982 papply = self.pool.apply
983 self.assertEqual(papply(sqr, (5,)), sqr(5))
984 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
985
986 def test_map(self):
987 pmap = self.pool.map
988 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
989 self.assertEqual(pmap(sqr, range(100), chunksize=20),
990 map(sqr, range(100)))
991
992 def test_async(self):
993 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
994 get = TimingWrapper(res.get)
995 self.assertEqual(get(), 49)
996 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
997
998 def test_async_timeout(self):
999 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1000 get = TimingWrapper(res.get)
1001 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1002 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1003
1004 def test_imap(self):
1005 it = self.pool.imap(sqr, range(10))
1006 self.assertEqual(list(it), map(sqr, range(10)))
1007
1008 it = self.pool.imap(sqr, range(10))
1009 for i in range(10):
1010 self.assertEqual(it.next(), i*i)
1011 self.assertRaises(StopIteration, it.next)
1012
1013 it = self.pool.imap(sqr, range(1000), chunksize=100)
1014 for i in range(1000):
1015 self.assertEqual(it.next(), i*i)
1016 self.assertRaises(StopIteration, it.next)
1017
1018 def test_imap_unordered(self):
1019 it = self.pool.imap_unordered(sqr, range(1000))
1020 self.assertEqual(sorted(it), map(sqr, range(1000)))
1021
1022 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1023 self.assertEqual(sorted(it), map(sqr, range(1000)))
1024
1025 def test_make_pool(self):
1026 p = multiprocessing.Pool(3)
1027 self.assertEqual(3, len(p._pool))
1028 p.close()
1029 p.join()
1030
1031 def test_terminate(self):
1032 if self.TYPE == 'manager':
1033 # On Unix a forked process increfs each shared object to
1034 # which its parent process held a reference. If the
1035 # forked process gets terminated then there is likely to
1036 # be a reference leak. So to prevent
1037 # _TestZZZNumberOfObjects from failing we skip this test
1038 # when using a manager.
1039 return
1040
1041 result = self.pool.map_async(
1042 time.sleep, [0.1 for i in range(10000)], chunksize=1
1043 )
1044 self.pool.terminate()
1045 join = TimingWrapper(self.pool.join)
1046 join()
1047 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001048#
1049# Test that manager has expected number of shared objects left
1050#
1051
1052class _TestZZZNumberOfObjects(BaseTestCase):
1053 # Because test cases are sorted alphabetically, this one will get
1054 # run after all the other tests for the manager. It tests that
1055 # there have been no "reference leaks" for the manager's shared
1056 # objects. Note the comment in _TestPool.test_terminate().
1057 ALLOWED_TYPES = ('manager',)
1058
1059 def test_number_of_objects(self):
1060 EXPECTED_NUMBER = 1 # the pool object is still alive
1061 multiprocessing.active_children() # discard dead process objs
1062 gc.collect() # do garbage collection
1063 refs = self.manager._number_of_objects()
1064 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001065 print self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001066
1067 self.assertEqual(refs, EXPECTED_NUMBER)
1068
1069#
1070# Test of creating a customized manager class
1071#
1072
1073from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1074
1075class FooBar(object):
1076 def f(self):
1077 return 'f()'
1078 def g(self):
1079 raise ValueError
1080 def _h(self):
1081 return '_h()'
1082
1083def baz():
1084 for i in xrange(10):
1085 yield i*i
1086
1087class IteratorProxy(BaseProxy):
1088 _exposed_ = ('next', '__next__')
1089 def __iter__(self):
1090 return self
1091 def next(self):
1092 return self._callmethod('next')
1093 def __next__(self):
1094 return self._callmethod('__next__')
1095
1096class MyManager(BaseManager):
1097 pass
1098
1099MyManager.register('Foo', callable=FooBar)
1100MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1101MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1102
1103
1104class _TestMyManager(BaseTestCase):
1105
1106 ALLOWED_TYPES = ('manager',)
1107
1108 def test_mymanager(self):
1109 manager = MyManager()
1110 manager.start()
1111
1112 foo = manager.Foo()
1113 bar = manager.Bar()
1114 baz = manager.baz()
1115
1116 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1117 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1118
1119 self.assertEqual(foo_methods, ['f', 'g'])
1120 self.assertEqual(bar_methods, ['f', '_h'])
1121
1122 self.assertEqual(foo.f(), 'f()')
1123 self.assertRaises(ValueError, foo.g)
1124 self.assertEqual(foo._callmethod('f'), 'f()')
1125 self.assertRaises(RemoteError, foo._callmethod, '_h')
1126
1127 self.assertEqual(bar.f(), 'f()')
1128 self.assertEqual(bar._h(), '_h()')
1129 self.assertEqual(bar._callmethod('f'), 'f()')
1130 self.assertEqual(bar._callmethod('_h'), '_h()')
1131
1132 self.assertEqual(list(baz), [i*i for i in range(10)])
1133
1134 manager.shutdown()
1135
1136#
1137# Test of connecting to a remote server and using xmlrpclib for serialization
1138#
1139
1140_queue = Queue.Queue()
1141def get_queue():
1142 return _queue
1143
1144class QueueManager(BaseManager):
1145 '''manager class used by server process'''
1146QueueManager.register('get_queue', callable=get_queue)
1147
1148class QueueManager2(BaseManager):
1149 '''manager class which specifies the same interface as QueueManager'''
1150QueueManager2.register('get_queue')
1151
1152
1153SERIALIZER = 'xmlrpclib'
1154
1155class _TestRemoteManager(BaseTestCase):
1156
1157 ALLOWED_TYPES = ('manager',)
1158
1159 def _putter(self, address, authkey):
1160 manager = QueueManager2(
1161 address=address, authkey=authkey, serializer=SERIALIZER
1162 )
1163 manager.connect()
1164 queue = manager.get_queue()
1165 queue.put(('hello world', None, True, 2.25))
1166
1167 def test_remote(self):
1168 authkey = os.urandom(32)
1169
1170 manager = QueueManager(
1171 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1172 )
1173 manager.start()
1174
1175 p = self.Process(target=self._putter, args=(manager.address, authkey))
1176 p.start()
1177
1178 manager2 = QueueManager2(
1179 address=manager.address, authkey=authkey, serializer=SERIALIZER
1180 )
1181 manager2.connect()
1182 queue = manager2.get_queue()
1183
1184 # Note that xmlrpclib will deserialize object as a list not a tuple
1185 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1186
1187 # Because we are using xmlrpclib for serialization instead of
1188 # pickle this will cause a serialization error.
1189 self.assertRaises(Exception, queue.put, time.sleep)
1190
1191 # Make queue finalizer run before the server is stopped
1192 del queue
1193 manager.shutdown()
1194
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001195class _TestManagerRestart(BaseTestCase):
1196
1197 def _putter(self, address, authkey):
1198 manager = QueueManager(
1199 address=address, authkey=authkey, serializer=SERIALIZER)
1200 manager.connect()
1201 queue = manager.get_queue()
1202 queue.put('hello world')
1203
1204 def test_rapid_restart(self):
1205 authkey = os.urandom(32)
1206 manager = QueueManager(
1207 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1208 manager.start()
1209
1210 p = self.Process(target=self._putter, args=(manager.address, authkey))
1211 p.start()
1212 queue = manager.get_queue()
1213 self.assertEqual(queue.get(), 'hello world')
Jesse Nollerd5ac4442009-03-30 21:57:36 +00001214 del queue
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001215 manager.shutdown()
1216 manager = QueueManager(
1217 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1218 manager.start()
Jesse Nollerd5ac4442009-03-30 21:57:36 +00001219 manager.shutdown()
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001220
Benjamin Petersondfd79492008-06-13 19:13:39 +00001221#
1222#
1223#
1224
1225SENTINEL = latin('')
1226
1227class _TestConnection(BaseTestCase):
1228
1229 ALLOWED_TYPES = ('processes', 'threads')
1230
1231 def _echo(self, conn):
1232 for msg in iter(conn.recv_bytes, SENTINEL):
1233 conn.send_bytes(msg)
1234 conn.close()
1235
1236 def test_connection(self):
1237 conn, child_conn = self.Pipe()
1238
1239 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001240 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001241 p.start()
1242
1243 seq = [1, 2.25, None]
1244 msg = latin('hello world')
1245 longmsg = msg * 10
1246 arr = array.array('i', range(4))
1247
1248 if self.TYPE == 'processes':
1249 self.assertEqual(type(conn.fileno()), int)
1250
1251 self.assertEqual(conn.send(seq), None)
1252 self.assertEqual(conn.recv(), seq)
1253
1254 self.assertEqual(conn.send_bytes(msg), None)
1255 self.assertEqual(conn.recv_bytes(), msg)
1256
1257 if self.TYPE == 'processes':
1258 buffer = array.array('i', [0]*10)
1259 expected = list(arr) + [0] * (10 - len(arr))
1260 self.assertEqual(conn.send_bytes(arr), None)
1261 self.assertEqual(conn.recv_bytes_into(buffer),
1262 len(arr) * buffer.itemsize)
1263 self.assertEqual(list(buffer), expected)
1264
1265 buffer = array.array('i', [0]*10)
1266 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1267 self.assertEqual(conn.send_bytes(arr), None)
1268 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1269 len(arr) * buffer.itemsize)
1270 self.assertEqual(list(buffer), expected)
1271
1272 buffer = bytearray(latin(' ' * 40))
1273 self.assertEqual(conn.send_bytes(longmsg), None)
1274 try:
1275 res = conn.recv_bytes_into(buffer)
1276 except multiprocessing.BufferTooShort, e:
1277 self.assertEqual(e.args, (longmsg,))
1278 else:
1279 self.fail('expected BufferTooShort, got %s' % res)
1280
1281 poll = TimingWrapper(conn.poll)
1282
1283 self.assertEqual(poll(), False)
1284 self.assertTimingAlmostEqual(poll.elapsed, 0)
1285
1286 self.assertEqual(poll(TIMEOUT1), False)
1287 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1288
1289 conn.send(None)
1290
1291 self.assertEqual(poll(TIMEOUT1), True)
1292 self.assertTimingAlmostEqual(poll.elapsed, 0)
1293
1294 self.assertEqual(conn.recv(), None)
1295
1296 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1297 conn.send_bytes(really_big_msg)
1298 self.assertEqual(conn.recv_bytes(), really_big_msg)
1299
1300 conn.send_bytes(SENTINEL) # tell child to quit
1301 child_conn.close()
1302
1303 if self.TYPE == 'processes':
1304 self.assertEqual(conn.readable, True)
1305 self.assertEqual(conn.writable, True)
1306 self.assertRaises(EOFError, conn.recv)
1307 self.assertRaises(EOFError, conn.recv_bytes)
1308
1309 p.join()
1310
1311 def test_duplex_false(self):
1312 reader, writer = self.Pipe(duplex=False)
1313 self.assertEqual(writer.send(1), None)
1314 self.assertEqual(reader.recv(), 1)
1315 if self.TYPE == 'processes':
1316 self.assertEqual(reader.readable, True)
1317 self.assertEqual(reader.writable, False)
1318 self.assertEqual(writer.readable, False)
1319 self.assertEqual(writer.writable, True)
1320 self.assertRaises(IOError, reader.send, 2)
1321 self.assertRaises(IOError, writer.recv)
1322 self.assertRaises(IOError, writer.poll)
1323
1324 def test_spawn_close(self):
1325 # We test that a pipe connection can be closed by parent
1326 # process immediately after child is spawned. On Windows this
1327 # would have sometimes failed on old versions because
1328 # child_conn would be closed before the child got a chance to
1329 # duplicate it.
1330 conn, child_conn = self.Pipe()
1331
1332 p = self.Process(target=self._echo, args=(child_conn,))
1333 p.start()
1334 child_conn.close() # this might complete before child initializes
1335
1336 msg = latin('hello')
1337 conn.send_bytes(msg)
1338 self.assertEqual(conn.recv_bytes(), msg)
1339
1340 conn.send_bytes(SENTINEL)
1341 conn.close()
1342 p.join()
1343
1344 def test_sendbytes(self):
1345 if self.TYPE != 'processes':
1346 return
1347
1348 msg = latin('abcdefghijklmnopqrstuvwxyz')
1349 a, b = self.Pipe()
1350
1351 a.send_bytes(msg)
1352 self.assertEqual(b.recv_bytes(), msg)
1353
1354 a.send_bytes(msg, 5)
1355 self.assertEqual(b.recv_bytes(), msg[5:])
1356
1357 a.send_bytes(msg, 7, 8)
1358 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1359
1360 a.send_bytes(msg, 26)
1361 self.assertEqual(b.recv_bytes(), latin(''))
1362
1363 a.send_bytes(msg, 26, 0)
1364 self.assertEqual(b.recv_bytes(), latin(''))
1365
1366 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1367
1368 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1369
1370 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1371
1372 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1373
1374 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1375
Benjamin Petersondfd79492008-06-13 19:13:39 +00001376class _TestListenerClient(BaseTestCase):
1377
1378 ALLOWED_TYPES = ('processes', 'threads')
1379
1380 def _test(self, address):
1381 conn = self.connection.Client(address)
1382 conn.send('hello')
1383 conn.close()
1384
1385 def test_listener_client(self):
1386 for family in self.connection.families:
1387 l = self.connection.Listener(family=family)
1388 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001389 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001390 p.start()
1391 conn = l.accept()
1392 self.assertEqual(conn.recv(), 'hello')
1393 p.join()
1394 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001395#
1396# Test of sending connection and socket objects between processes
1397#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001398"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001399class _TestPicklingConnections(BaseTestCase):
1400
1401 ALLOWED_TYPES = ('processes',)
1402
1403 def _listener(self, conn, families):
1404 for fam in families:
1405 l = self.connection.Listener(family=fam)
1406 conn.send(l.address)
1407 new_conn = l.accept()
1408 conn.send(new_conn)
1409
1410 if self.TYPE == 'processes':
1411 l = socket.socket()
1412 l.bind(('localhost', 0))
1413 conn.send(l.getsockname())
1414 l.listen(1)
1415 new_conn, addr = l.accept()
1416 conn.send(new_conn)
1417
1418 conn.recv()
1419
1420 def _remote(self, conn):
1421 for (address, msg) in iter(conn.recv, None):
1422 client = self.connection.Client(address)
1423 client.send(msg.upper())
1424 client.close()
1425
1426 if self.TYPE == 'processes':
1427 address, msg = conn.recv()
1428 client = socket.socket()
1429 client.connect(address)
1430 client.sendall(msg.upper())
1431 client.close()
1432
1433 conn.close()
1434
1435 def test_pickling(self):
1436 try:
1437 multiprocessing.allow_connection_pickling()
1438 except ImportError:
1439 return
1440
1441 families = self.connection.families
1442
1443 lconn, lconn0 = self.Pipe()
1444 lp = self.Process(target=self._listener, args=(lconn0, families))
1445 lp.start()
1446 lconn0.close()
1447
1448 rconn, rconn0 = self.Pipe()
1449 rp = self.Process(target=self._remote, args=(rconn0,))
1450 rp.start()
1451 rconn0.close()
1452
1453 for fam in families:
1454 msg = ('This connection uses family %s' % fam).encode('ascii')
1455 address = lconn.recv()
1456 rconn.send((address, msg))
1457 new_conn = lconn.recv()
1458 self.assertEqual(new_conn.recv(), msg.upper())
1459
1460 rconn.send(None)
1461
1462 if self.TYPE == 'processes':
1463 msg = latin('This connection uses a normal socket')
1464 address = lconn.recv()
1465 rconn.send((address, msg))
1466 if hasattr(socket, 'fromfd'):
1467 new_conn = lconn.recv()
1468 self.assertEqual(new_conn.recv(100), msg.upper())
1469 else:
1470 # XXX On Windows with Py2.6 need to backport fromfd()
1471 discard = lconn.recv_bytes()
1472
1473 lconn.send(None)
1474
1475 rconn.close()
1476 lconn.close()
1477
1478 lp.join()
1479 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001480"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001481#
1482#
1483#
1484
1485class _TestHeap(BaseTestCase):
1486
1487 ALLOWED_TYPES = ('processes',)
1488
1489 def test_heap(self):
1490 iterations = 5000
1491 maxblocks = 50
1492 blocks = []
1493
1494 # create and destroy lots of blocks of different sizes
1495 for i in xrange(iterations):
1496 size = int(random.lognormvariate(0, 1) * 1000)
1497 b = multiprocessing.heap.BufferWrapper(size)
1498 blocks.append(b)
1499 if len(blocks) > maxblocks:
1500 i = random.randrange(maxblocks)
1501 del blocks[i]
1502
1503 # get the heap object
1504 heap = multiprocessing.heap.BufferWrapper._heap
1505
1506 # verify the state of the heap
1507 all = []
1508 occupied = 0
1509 for L in heap._len_to_seq.values():
1510 for arena, start, stop in L:
1511 all.append((heap._arenas.index(arena), start, stop,
1512 stop-start, 'free'))
1513 for arena, start, stop in heap._allocated_blocks:
1514 all.append((heap._arenas.index(arena), start, stop,
1515 stop-start, 'occupied'))
1516 occupied += (stop-start)
1517
1518 all.sort()
1519
1520 for i in range(len(all)-1):
1521 (arena, start, stop) = all[i][:3]
1522 (narena, nstart, nstop) = all[i+1][:3]
1523 self.assertTrue((arena != narena and nstart == 0) or
1524 (stop == nstart))
1525
1526#
1527#
1528#
1529
1530try:
1531 from ctypes import Structure, Value, copy, c_int, c_double
1532except ImportError:
1533 Structure = object
1534 c_int = c_double = None
1535
1536class _Foo(Structure):
1537 _fields_ = [
1538 ('x', c_int),
1539 ('y', c_double)
1540 ]
1541
1542class _TestSharedCTypes(BaseTestCase):
1543
1544 ALLOWED_TYPES = ('processes',)
1545
1546 def _double(self, x, y, foo, arr, string):
1547 x.value *= 2
1548 y.value *= 2
1549 foo.x *= 2
1550 foo.y *= 2
1551 string.value *= 2
1552 for i in range(len(arr)):
1553 arr[i] *= 2
1554
1555 def test_sharedctypes(self, lock=False):
1556 if c_int is None:
1557 return
1558
1559 x = Value('i', 7, lock=lock)
1560 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1561 foo = Value(_Foo, 3, 2, lock=lock)
1562 arr = Array('d', range(10), lock=lock)
1563 string = Array('c', 20, lock=lock)
1564 string.value = 'hello'
1565
1566 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1567 p.start()
1568 p.join()
1569
1570 self.assertEqual(x.value, 14)
1571 self.assertAlmostEqual(y.value, 2.0/3.0)
1572 self.assertEqual(foo.x, 6)
1573 self.assertAlmostEqual(foo.y, 4.0)
1574 for i in range(10):
1575 self.assertAlmostEqual(arr[i], i*2)
1576 self.assertEqual(string.value, latin('hellohello'))
1577
1578 def test_synchronize(self):
1579 self.test_sharedctypes(lock=True)
1580
1581 def test_copy(self):
1582 if c_int is None:
1583 return
1584
1585 foo = _Foo(2, 5.0)
1586 bar = copy(foo)
1587 foo.x = 0
1588 foo.y = 0
1589 self.assertEqual(bar.x, 2)
1590 self.assertAlmostEqual(bar.y, 5.0)
1591
1592#
1593#
1594#
1595
1596class _TestFinalize(BaseTestCase):
1597
1598 ALLOWED_TYPES = ('processes',)
1599
1600 def _test_finalize(self, conn):
1601 class Foo(object):
1602 pass
1603
1604 a = Foo()
1605 util.Finalize(a, conn.send, args=('a',))
1606 del a # triggers callback for a
1607
1608 b = Foo()
1609 close_b = util.Finalize(b, conn.send, args=('b',))
1610 close_b() # triggers callback for b
1611 close_b() # does nothing because callback has already been called
1612 del b # does nothing because callback has already been called
1613
1614 c = Foo()
1615 util.Finalize(c, conn.send, args=('c',))
1616
1617 d10 = Foo()
1618 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1619
1620 d01 = Foo()
1621 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1622 d02 = Foo()
1623 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1624 d03 = Foo()
1625 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1626
1627 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1628
1629 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1630
1631 # call mutliprocessing's cleanup function then exit process without
1632 # garbage collecting locals
1633 util._exit_function()
1634 conn.close()
1635 os._exit(0)
1636
1637 def test_finalize(self):
1638 conn, child_conn = self.Pipe()
1639
1640 p = self.Process(target=self._test_finalize, args=(child_conn,))
1641 p.start()
1642 p.join()
1643
1644 result = [obj for obj in iter(conn.recv, 'STOP')]
1645 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1646
1647#
1648# Test that from ... import * works for each module
1649#
1650
1651class _TestImportStar(BaseTestCase):
1652
1653 ALLOWED_TYPES = ('processes',)
1654
1655 def test_import(self):
1656 modules = (
1657 'multiprocessing', 'multiprocessing.connection',
1658 'multiprocessing.heap', 'multiprocessing.managers',
1659 'multiprocessing.pool', 'multiprocessing.process',
1660 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1661 'multiprocessing.synchronize', 'multiprocessing.util'
1662 )
1663
1664 for name in modules:
1665 __import__(name)
1666 mod = sys.modules[name]
1667
1668 for attr in getattr(mod, '__all__', ()):
1669 self.assertTrue(
1670 hasattr(mod, attr),
1671 '%r does not have attribute %r' % (mod, attr)
1672 )
1673
1674#
1675# Quick test that logging works -- does not test logging output
1676#
1677
1678class _TestLogging(BaseTestCase):
1679
1680 ALLOWED_TYPES = ('processes',)
1681
1682 def test_enable_logging(self):
1683 logger = multiprocessing.get_logger()
1684 logger.setLevel(util.SUBWARNING)
1685 self.assertTrue(logger is not None)
1686 logger.debug('this will not be printed')
1687 logger.info('nor will this')
1688 logger.setLevel(LOG_LEVEL)
1689
1690 def _test_level(self, conn):
1691 logger = multiprocessing.get_logger()
1692 conn.send(logger.getEffectiveLevel())
1693
1694 def test_level(self):
1695 LEVEL1 = 32
1696 LEVEL2 = 37
1697
1698 logger = multiprocessing.get_logger()
1699 root_logger = logging.getLogger()
1700 root_level = root_logger.level
1701
1702 reader, writer = multiprocessing.Pipe(duplex=False)
1703
1704 logger.setLevel(LEVEL1)
1705 self.Process(target=self._test_level, args=(writer,)).start()
1706 self.assertEqual(LEVEL1, reader.recv())
1707
1708 logger.setLevel(logging.NOTSET)
1709 root_logger.setLevel(LEVEL2)
1710 self.Process(target=self._test_level, args=(writer,)).start()
1711 self.assertEqual(LEVEL2, reader.recv())
1712
1713 root_logger.setLevel(root_level)
1714 logger.setLevel(level=LOG_LEVEL)
1715
1716#
Jesse Nollere6bab482009-03-30 16:11:16 +00001717# Test to verify handle verification, see issue 3321
1718#
1719
1720class TestInvalidHandle(unittest.TestCase):
1721
1722 def test_invalid_handles(self):
1723 if WIN32:
1724 return
1725 conn = _multiprocessing.Connection(44977608)
1726 self.assertRaises(IOError, conn.poll)
1727 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1728#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001729# Functions used to create test cases from the base ones in this module
1730#
1731
1732def get_attributes(Source, names):
1733 d = {}
1734 for name in names:
1735 obj = getattr(Source, name)
1736 if type(obj) == type(get_attributes):
1737 obj = staticmethod(obj)
1738 d[name] = obj
1739 return d
1740
1741def create_test_cases(Mixin, type):
1742 result = {}
1743 glob = globals()
1744 Type = type[0].upper() + type[1:]
1745
1746 for name in glob.keys():
1747 if name.startswith('_Test'):
1748 base = glob[name]
1749 if type in base.ALLOWED_TYPES:
1750 newname = 'With' + Type + name[1:]
1751 class Temp(base, unittest.TestCase, Mixin):
1752 pass
1753 result[newname] = Temp
1754 Temp.__name__ = newname
1755 Temp.__module__ = Mixin.__module__
1756 return result
1757
1758#
1759# Create test cases
1760#
1761
1762class ProcessesMixin(object):
1763 TYPE = 'processes'
1764 Process = multiprocessing.Process
1765 locals().update(get_attributes(multiprocessing, (
1766 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1767 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1768 'RawArray', 'current_process', 'active_children', 'Pipe',
1769 'connection', 'JoinableQueue'
1770 )))
1771
1772testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1773globals().update(testcases_processes)
1774
1775
1776class ManagerMixin(object):
1777 TYPE = 'manager'
1778 Process = multiprocessing.Process
1779 manager = object.__new__(multiprocessing.managers.SyncManager)
1780 locals().update(get_attributes(manager, (
1781 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1782 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1783 'Namespace', 'JoinableQueue'
1784 )))
1785
1786testcases_manager = create_test_cases(ManagerMixin, type='manager')
1787globals().update(testcases_manager)
1788
1789
1790class ThreadsMixin(object):
1791 TYPE = 'threads'
1792 Process = multiprocessing.dummy.Process
1793 locals().update(get_attributes(multiprocessing.dummy, (
1794 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1795 'Condition', 'Event', 'Value', 'Array', 'current_process',
1796 'active_children', 'Pipe', 'connection', 'dict', 'list',
1797 'Namespace', 'JoinableQueue'
1798 )))
1799
1800testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1801globals().update(testcases_threads)
1802
Neal Norwitz0c519b32008-08-25 01:50:24 +00001803class OtherTest(unittest.TestCase):
1804 # TODO: add more tests for deliver/answer challenge.
1805 def test_deliver_challenge_auth_failure(self):
1806 class _FakeConnection(object):
1807 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001808 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001809 def send_bytes(self, data):
1810 pass
1811 self.assertRaises(multiprocessing.AuthenticationError,
1812 multiprocessing.connection.deliver_challenge,
1813 _FakeConnection(), b'abc')
1814
1815 def test_answer_challenge_auth_failure(self):
1816 class _FakeConnection(object):
1817 def __init__(self):
1818 self.count = 0
1819 def recv_bytes(self, size):
1820 self.count += 1
1821 if self.count == 1:
1822 return multiprocessing.connection.CHALLENGE
1823 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001824 return b'something bogus'
1825 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001826 def send_bytes(self, data):
1827 pass
1828 self.assertRaises(multiprocessing.AuthenticationError,
1829 multiprocessing.connection.answer_challenge,
1830 _FakeConnection(), b'abc')
1831
Jesse Nollere6bab482009-03-30 16:11:16 +00001832testcases_other = [OtherTest, TestInvalidHandle]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001833
Benjamin Petersondfd79492008-06-13 19:13:39 +00001834#
1835#
1836#
1837
1838def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001839 if sys.platform.startswith("linux"):
1840 try:
1841 lock = multiprocessing.RLock()
1842 except OSError:
1843 from test.test_support import TestSkipped
1844 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001845
Benjamin Petersondfd79492008-06-13 19:13:39 +00001846 if run is None:
1847 from test.test_support import run_unittest as run
1848
1849 util.get_temp_dir() # creates temp directory for use by all processes
1850
1851 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1852
Jesse Noller146b7ab2008-07-02 16:44:09 +00001853 ProcessesMixin.pool = multiprocessing.Pool(4)
1854 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1855 ManagerMixin.manager.__init__()
1856 ManagerMixin.manager.start()
1857 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001858
1859 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001860 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1861 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00001862 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1863 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00001864 )
1865
1866 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1867 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1868 run(suite)
1869
Jesse Noller146b7ab2008-07-02 16:44:09 +00001870 ThreadsMixin.pool.terminate()
1871 ProcessesMixin.pool.terminate()
1872 ManagerMixin.pool.terminate()
1873 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001874
Jesse Noller146b7ab2008-07-02 16:44:09 +00001875 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001876
1877def main():
1878 test_main(unittest.TextTestRunner(verbosity=2).run)
1879
1880if __name__ == '__main__':
1881 main()