blob: 105a7433c03065c97eaee05704c46d0e3d29ea3d [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
R. David Murray3db8a342009-03-30 23:05:48 +000020import test_support
Benjamin Petersondfd79492008-06-13 19:13:39 +000021
Jesse Noller37040cd2008-09-30 00:15:45 +000022
R. David Murray3db8a342009-03-30 23:05:48 +000023_multiprocessing = test_support.import_module('_multiprocessing')
24
Jesse Noller37040cd2008-09-30 00:15:45 +000025# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000026test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000027
Benjamin Petersondfd79492008-06-13 19:13:39 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000032import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000033
34from multiprocessing import util
35
36#
37#
38#
39
Benjamin Petersone79edf52008-07-13 18:34:58 +000040latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000041
Benjamin Petersondfd79492008-06-13 19:13:39 +000042#
43# Constants
44#
45
46LOG_LEVEL = util.SUBWARNING
47#LOG_LEVEL = logging.WARNING
48
49DELTA = 0.1
50CHECK_TIMINGS = False # making true makes tests take a lot longer
51 # and can sometimes cause some non-serious
52 # failures because some calls block a bit
53 # longer than expected
54if CHECK_TIMINGS:
55 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
56else:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
58
59HAVE_GETVALUE = not getattr(_multiprocessing,
60 'HAVE_BROKEN_SEM_GETVALUE', False)
61
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000062WIN32 = (sys.platform == "win32")
63
Benjamin Petersondfd79492008-06-13 19:13:39 +000064#
65# Creates a wrapper for a function which records the time it takes to finish
66#
67
68class TimingWrapper(object):
69
70 def __init__(self, func):
71 self.func = func
72 self.elapsed = None
73
74 def __call__(self, *args, **kwds):
75 t = time.time()
76 try:
77 return self.func(*args, **kwds)
78 finally:
79 self.elapsed = time.time() - t
80
81#
82# Base class for test cases
83#
84
85class BaseTestCase(object):
86
87 ALLOWED_TYPES = ('processes', 'manager', 'threads')
88
89 def assertTimingAlmostEqual(self, a, b):
90 if CHECK_TIMINGS:
91 self.assertAlmostEqual(a, b, 1)
92
93 def assertReturnsIfImplemented(self, value, func, *args):
94 try:
95 res = func(*args)
96 except NotImplementedError:
97 pass
98 else:
99 return self.assertEqual(value, res)
100
101#
102# Return the value of a semaphore
103#
104
105def get_value(self):
106 try:
107 return self.get_value()
108 except AttributeError:
109 try:
110 return self._Semaphore__value
111 except AttributeError:
112 try:
113 return self._value
114 except AttributeError:
115 raise NotImplementedError
116
117#
118# Testcases
119#
120
121class _TestProcess(BaseTestCase):
122
123 ALLOWED_TYPES = ('processes', 'threads')
124
125 def test_current(self):
126 if self.TYPE == 'threads':
127 return
128
129 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000130 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000131
132 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000133 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000134 self.assertTrue(isinstance(authkey, bytes))
135 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000136 self.assertEqual(current.ident, os.getpid())
137 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000138
139 def _test(self, q, *args, **kwds):
140 current = self.current_process()
141 q.put(args)
142 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000143 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000144 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000145 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000146 q.put(current.pid)
147
148 def test_process(self):
149 q = self.Queue(1)
150 e = self.Event()
151 args = (q, 1, 2)
152 kwargs = {'hello':23, 'bye':2.54}
153 name = 'SomeProcess'
154 p = self.Process(
155 target=self._test, args=args, kwargs=kwargs, name=name
156 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158 current = self.current_process()
159
160 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000161 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000163 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000164 self.assertTrue(p not in self.active_children())
165 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000166 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000167
168 p.start()
169
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 self.assertEquals(p.is_alive(), True)
172 self.assertTrue(p in self.active_children())
173
174 self.assertEquals(q.get(), args[1:])
175 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000176 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000177 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000178 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179 self.assertEquals(q.get(), p.pid)
180
181 p.join()
182
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000183 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184 self.assertEquals(p.is_alive(), False)
185 self.assertTrue(p not in self.active_children())
186
187 def _test_terminate(self):
188 time.sleep(1000)
189
190 def test_terminate(self):
191 if self.TYPE == 'threads':
192 return
193
194 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000195 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196 p.start()
197
198 self.assertEqual(p.is_alive(), True)
199 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201
202 p.terminate()
203
204 join = TimingWrapper(p.join)
205 self.assertEqual(join(), None)
206 self.assertTimingAlmostEqual(join.elapsed, 0.0)
207
208 self.assertEqual(p.is_alive(), False)
209 self.assertTrue(p not in self.active_children())
210
211 p.join()
212
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000213 # XXX sometimes get p.exitcode == 0 on Windows ...
214 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215
216 def test_cpu_count(self):
217 try:
218 cpus = multiprocessing.cpu_count()
219 except NotImplementedError:
220 cpus = 1
221 self.assertTrue(type(cpus) is int)
222 self.assertTrue(cpus >= 1)
223
224 def test_active_children(self):
225 self.assertEqual(type(self.active_children()), list)
226
227 p = self.Process(target=time.sleep, args=(DELTA,))
228 self.assertTrue(p not in self.active_children())
229
230 p.start()
231 self.assertTrue(p in self.active_children())
232
233 p.join()
234 self.assertTrue(p not in self.active_children())
235
236 def _test_recursion(self, wconn, id):
237 from multiprocessing import forking
238 wconn.send(id)
239 if len(id) < 2:
240 for i in range(2):
241 p = self.Process(
242 target=self._test_recursion, args=(wconn, id+[i])
243 )
244 p.start()
245 p.join()
246
247 def test_recursion(self):
248 rconn, wconn = self.Pipe(duplex=False)
249 self._test_recursion(wconn, [])
250
251 time.sleep(DELTA)
252 result = []
253 while rconn.poll():
254 result.append(rconn.recv())
255
256 expected = [
257 [],
258 [0],
259 [0, 0],
260 [0, 1],
261 [1],
262 [1, 0],
263 [1, 1]
264 ]
265 self.assertEqual(result, expected)
266
267#
268#
269#
270
271class _UpperCaser(multiprocessing.Process):
272
273 def __init__(self):
274 multiprocessing.Process.__init__(self)
275 self.child_conn, self.parent_conn = multiprocessing.Pipe()
276
277 def run(self):
278 self.parent_conn.close()
279 for s in iter(self.child_conn.recv, None):
280 self.child_conn.send(s.upper())
281 self.child_conn.close()
282
283 def submit(self, s):
284 assert type(s) is str
285 self.parent_conn.send(s)
286 return self.parent_conn.recv()
287
288 def stop(self):
289 self.parent_conn.send(None)
290 self.parent_conn.close()
291 self.child_conn.close()
292
293class _TestSubclassingProcess(BaseTestCase):
294
295 ALLOWED_TYPES = ('processes',)
296
297 def test_subclassing(self):
298 uppercaser = _UpperCaser()
299 uppercaser.start()
300 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
301 self.assertEqual(uppercaser.submit('world'), 'WORLD')
302 uppercaser.stop()
303 uppercaser.join()
304
305#
306#
307#
308
309def queue_empty(q):
310 if hasattr(q, 'empty'):
311 return q.empty()
312 else:
313 return q.qsize() == 0
314
315def queue_full(q, maxsize):
316 if hasattr(q, 'full'):
317 return q.full()
318 else:
319 return q.qsize() == maxsize
320
321
322class _TestQueue(BaseTestCase):
323
324
325 def _test_put(self, queue, child_can_start, parent_can_continue):
326 child_can_start.wait()
327 for i in range(6):
328 queue.get()
329 parent_can_continue.set()
330
331 def test_put(self):
332 MAXSIZE = 6
333 queue = self.Queue(maxsize=MAXSIZE)
334 child_can_start = self.Event()
335 parent_can_continue = self.Event()
336
337 proc = self.Process(
338 target=self._test_put,
339 args=(queue, child_can_start, parent_can_continue)
340 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000341 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000342 proc.start()
343
344 self.assertEqual(queue_empty(queue), True)
345 self.assertEqual(queue_full(queue, MAXSIZE), False)
346
347 queue.put(1)
348 queue.put(2, True)
349 queue.put(3, True, None)
350 queue.put(4, False)
351 queue.put(5, False, None)
352 queue.put_nowait(6)
353
354 # the values may be in buffer but not yet in pipe so sleep a bit
355 time.sleep(DELTA)
356
357 self.assertEqual(queue_empty(queue), False)
358 self.assertEqual(queue_full(queue, MAXSIZE), True)
359
360 put = TimingWrapper(queue.put)
361 put_nowait = TimingWrapper(queue.put_nowait)
362
363 self.assertRaises(Queue.Full, put, 7, False)
364 self.assertTimingAlmostEqual(put.elapsed, 0)
365
366 self.assertRaises(Queue.Full, put, 7, False, None)
367 self.assertTimingAlmostEqual(put.elapsed, 0)
368
369 self.assertRaises(Queue.Full, put_nowait, 7)
370 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
371
372 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
373 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
374
375 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
376 self.assertTimingAlmostEqual(put.elapsed, 0)
377
378 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
379 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
380
381 child_can_start.set()
382 parent_can_continue.wait()
383
384 self.assertEqual(queue_empty(queue), True)
385 self.assertEqual(queue_full(queue, MAXSIZE), False)
386
387 proc.join()
388
389 def _test_get(self, queue, child_can_start, parent_can_continue):
390 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000391 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000392 queue.put(2)
393 queue.put(3)
394 queue.put(4)
395 queue.put(5)
396 parent_can_continue.set()
397
398 def test_get(self):
399 queue = self.Queue()
400 child_can_start = self.Event()
401 parent_can_continue = self.Event()
402
403 proc = self.Process(
404 target=self._test_get,
405 args=(queue, child_can_start, parent_can_continue)
406 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000407 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000408 proc.start()
409
410 self.assertEqual(queue_empty(queue), True)
411
412 child_can_start.set()
413 parent_can_continue.wait()
414
415 time.sleep(DELTA)
416 self.assertEqual(queue_empty(queue), False)
417
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000418 # Hangs unexpectedly, remove for now
419 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000420 self.assertEqual(queue.get(True, None), 2)
421 self.assertEqual(queue.get(True), 3)
422 self.assertEqual(queue.get(timeout=1), 4)
423 self.assertEqual(queue.get_nowait(), 5)
424
425 self.assertEqual(queue_empty(queue), True)
426
427 get = TimingWrapper(queue.get)
428 get_nowait = TimingWrapper(queue.get_nowait)
429
430 self.assertRaises(Queue.Empty, get, False)
431 self.assertTimingAlmostEqual(get.elapsed, 0)
432
433 self.assertRaises(Queue.Empty, get, False, None)
434 self.assertTimingAlmostEqual(get.elapsed, 0)
435
436 self.assertRaises(Queue.Empty, get_nowait)
437 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
438
439 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
440 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
441
442 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
443 self.assertTimingAlmostEqual(get.elapsed, 0)
444
445 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
446 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
447
448 proc.join()
449
450 def _test_fork(self, queue):
451 for i in range(10, 20):
452 queue.put(i)
453 # note that at this point the items may only be buffered, so the
454 # process cannot shutdown until the feeder thread has finished
455 # pushing items onto the pipe.
456
457 def test_fork(self):
458 # Old versions of Queue would fail to create a new feeder
459 # thread for a forked process if the original process had its
460 # own feeder thread. This test checks that this no longer
461 # happens.
462
463 queue = self.Queue()
464
465 # put items on queue so that main process starts a feeder thread
466 for i in range(10):
467 queue.put(i)
468
469 # wait to make sure thread starts before we fork a new process
470 time.sleep(DELTA)
471
472 # fork process
473 p = self.Process(target=self._test_fork, args=(queue,))
474 p.start()
475
476 # check that all expected items are in the queue
477 for i in range(20):
478 self.assertEqual(queue.get(), i)
479 self.assertRaises(Queue.Empty, queue.get, False)
480
481 p.join()
482
483 def test_qsize(self):
484 q = self.Queue()
485 try:
486 self.assertEqual(q.qsize(), 0)
487 except NotImplementedError:
488 return
489 q.put(1)
490 self.assertEqual(q.qsize(), 1)
491 q.put(5)
492 self.assertEqual(q.qsize(), 2)
493 q.get()
494 self.assertEqual(q.qsize(), 1)
495 q.get()
496 self.assertEqual(q.qsize(), 0)
497
498 def _test_task_done(self, q):
499 for obj in iter(q.get, None):
500 time.sleep(DELTA)
501 q.task_done()
502
503 def test_task_done(self):
504 queue = self.JoinableQueue()
505
506 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
507 return
508
509 workers = [self.Process(target=self._test_task_done, args=(queue,))
510 for i in xrange(4)]
511
512 for p in workers:
513 p.start()
514
515 for i in xrange(10):
516 queue.put(i)
517
518 queue.join()
519
520 for p in workers:
521 queue.put(None)
522
523 for p in workers:
524 p.join()
525
526#
527#
528#
529
530class _TestLock(BaseTestCase):
531
532 def test_lock(self):
533 lock = self.Lock()
534 self.assertEqual(lock.acquire(), True)
535 self.assertEqual(lock.acquire(False), False)
536 self.assertEqual(lock.release(), None)
537 self.assertRaises((ValueError, threading.ThreadError), lock.release)
538
539 def test_rlock(self):
540 lock = self.RLock()
541 self.assertEqual(lock.acquire(), True)
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.release(), None)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertRaises((AssertionError, RuntimeError), lock.release)
548
549
550class _TestSemaphore(BaseTestCase):
551
552 def _test_semaphore(self, sem):
553 self.assertReturnsIfImplemented(2, get_value, sem)
554 self.assertEqual(sem.acquire(), True)
555 self.assertReturnsIfImplemented(1, get_value, sem)
556 self.assertEqual(sem.acquire(), True)
557 self.assertReturnsIfImplemented(0, get_value, sem)
558 self.assertEqual(sem.acquire(False), False)
559 self.assertReturnsIfImplemented(0, get_value, sem)
560 self.assertEqual(sem.release(), None)
561 self.assertReturnsIfImplemented(1, get_value, sem)
562 self.assertEqual(sem.release(), None)
563 self.assertReturnsIfImplemented(2, get_value, sem)
564
565 def test_semaphore(self):
566 sem = self.Semaphore(2)
567 self._test_semaphore(sem)
568 self.assertEqual(sem.release(), None)
569 self.assertReturnsIfImplemented(3, get_value, sem)
570 self.assertEqual(sem.release(), None)
571 self.assertReturnsIfImplemented(4, get_value, sem)
572
573 def test_bounded_semaphore(self):
574 sem = self.BoundedSemaphore(2)
575 self._test_semaphore(sem)
576 # Currently fails on OS/X
577 #if HAVE_GETVALUE:
578 # self.assertRaises(ValueError, sem.release)
579 # self.assertReturnsIfImplemented(2, get_value, sem)
580
581 def test_timeout(self):
582 if self.TYPE != 'processes':
583 return
584
585 sem = self.Semaphore(0)
586 acquire = TimingWrapper(sem.acquire)
587
588 self.assertEqual(acquire(False), False)
589 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
590
591 self.assertEqual(acquire(False, None), False)
592 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
593
594 self.assertEqual(acquire(False, TIMEOUT1), False)
595 self.assertTimingAlmostEqual(acquire.elapsed, 0)
596
597 self.assertEqual(acquire(True, TIMEOUT2), False)
598 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
599
600 self.assertEqual(acquire(timeout=TIMEOUT3), False)
601 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
602
603
604class _TestCondition(BaseTestCase):
605
606 def f(self, cond, sleeping, woken, timeout=None):
607 cond.acquire()
608 sleeping.release()
609 cond.wait(timeout)
610 woken.release()
611 cond.release()
612
613 def check_invariant(self, cond):
614 # this is only supposed to succeed when there are no sleepers
615 if self.TYPE == 'processes':
616 try:
617 sleepers = (cond._sleeping_count.get_value() -
618 cond._woken_count.get_value())
619 self.assertEqual(sleepers, 0)
620 self.assertEqual(cond._wait_semaphore.get_value(), 0)
621 except NotImplementedError:
622 pass
623
624 def test_notify(self):
625 cond = self.Condition()
626 sleeping = self.Semaphore(0)
627 woken = self.Semaphore(0)
628
629 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000630 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000631 p.start()
632
633 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000634 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000635 p.start()
636
637 # wait for both children to start sleeping
638 sleeping.acquire()
639 sleeping.acquire()
640
641 # check no process/thread has woken up
642 time.sleep(DELTA)
643 self.assertReturnsIfImplemented(0, get_value, woken)
644
645 # wake up one process/thread
646 cond.acquire()
647 cond.notify()
648 cond.release()
649
650 # check one process/thread has woken up
651 time.sleep(DELTA)
652 self.assertReturnsIfImplemented(1, get_value, woken)
653
654 # wake up another
655 cond.acquire()
656 cond.notify()
657 cond.release()
658
659 # check other has woken up
660 time.sleep(DELTA)
661 self.assertReturnsIfImplemented(2, get_value, woken)
662
663 # check state is not mucked up
664 self.check_invariant(cond)
665 p.join()
666
667 def test_notify_all(self):
668 cond = self.Condition()
669 sleeping = self.Semaphore(0)
670 woken = self.Semaphore(0)
671
672 # start some threads/processes which will timeout
673 for i in range(3):
674 p = self.Process(target=self.f,
675 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000676 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000677 p.start()
678
679 t = threading.Thread(target=self.f,
680 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000681 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000682 t.start()
683
684 # wait for them all to sleep
685 for i in xrange(6):
686 sleeping.acquire()
687
688 # check they have all timed out
689 for i in xrange(6):
690 woken.acquire()
691 self.assertReturnsIfImplemented(0, get_value, woken)
692
693 # check state is not mucked up
694 self.check_invariant(cond)
695
696 # start some more threads/processes
697 for i in range(3):
698 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000699 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000700 p.start()
701
702 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000703 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704 t.start()
705
706 # wait for them to all sleep
707 for i in xrange(6):
708 sleeping.acquire()
709
710 # check no process/thread has woken up
711 time.sleep(DELTA)
712 self.assertReturnsIfImplemented(0, get_value, woken)
713
714 # wake them all up
715 cond.acquire()
716 cond.notify_all()
717 cond.release()
718
719 # check they have all woken
720 time.sleep(DELTA)
721 self.assertReturnsIfImplemented(6, get_value, woken)
722
723 # check state is not mucked up
724 self.check_invariant(cond)
725
726 def test_timeout(self):
727 cond = self.Condition()
728 wait = TimingWrapper(cond.wait)
729 cond.acquire()
730 res = wait(TIMEOUT1)
731 cond.release()
732 self.assertEqual(res, None)
733 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
734
735
736class _TestEvent(BaseTestCase):
737
738 def _test_event(self, event):
739 time.sleep(TIMEOUT2)
740 event.set()
741
742 def test_event(self):
743 event = self.Event()
744 wait = TimingWrapper(event.wait)
745
746 # Removed temporaily, due to API shear, this does not
747 # work with threading._Event objects. is_set == isSet
748 #self.assertEqual(event.is_set(), False)
749
750 self.assertEqual(wait(0.0), None)
751 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
752 self.assertEqual(wait(TIMEOUT1), None)
753 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
754
755 event.set()
756
757 # See note above on the API differences
758 # self.assertEqual(event.is_set(), True)
759 self.assertEqual(wait(), None)
760 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
761 self.assertEqual(wait(TIMEOUT1), None)
762 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
763 # self.assertEqual(event.is_set(), True)
764
765 event.clear()
766
767 #self.assertEqual(event.is_set(), False)
768
769 self.Process(target=self._test_event, args=(event,)).start()
770 self.assertEqual(wait(), None)
771
772#
773#
774#
775
776class _TestValue(BaseTestCase):
777
778 codes_values = [
779 ('i', 4343, 24234),
780 ('d', 3.625, -4.25),
781 ('h', -232, 234),
782 ('c', latin('x'), latin('y'))
783 ]
784
785 def _test(self, values):
786 for sv, cv in zip(values, self.codes_values):
787 sv.value = cv[2]
788
789
790 def test_value(self, raw=False):
791 if self.TYPE != 'processes':
792 return
793
794 if raw:
795 values = [self.RawValue(code, value)
796 for code, value, _ in self.codes_values]
797 else:
798 values = [self.Value(code, value)
799 for code, value, _ in self.codes_values]
800
801 for sv, cv in zip(values, self.codes_values):
802 self.assertEqual(sv.value, cv[1])
803
804 proc = self.Process(target=self._test, args=(values,))
805 proc.start()
806 proc.join()
807
808 for sv, cv in zip(values, self.codes_values):
809 self.assertEqual(sv.value, cv[2])
810
811 def test_rawvalue(self):
812 self.test_value(raw=True)
813
814 def test_getobj_getlock(self):
815 if self.TYPE != 'processes':
816 return
817
818 val1 = self.Value('i', 5)
819 lock1 = val1.get_lock()
820 obj1 = val1.get_obj()
821
822 val2 = self.Value('i', 5, lock=None)
823 lock2 = val2.get_lock()
824 obj2 = val2.get_obj()
825
826 lock = self.Lock()
827 val3 = self.Value('i', 5, lock=lock)
828 lock3 = val3.get_lock()
829 obj3 = val3.get_obj()
830 self.assertEqual(lock, lock3)
831
Jesse Noller6ab22152009-01-18 02:45:38 +0000832 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833 self.assertFalse(hasattr(arr4, 'get_lock'))
834 self.assertFalse(hasattr(arr4, 'get_obj'))
835
Jesse Noller6ab22152009-01-18 02:45:38 +0000836 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
837
838 arr5 = self.RawValue('i', 5)
839 self.assertFalse(hasattr(arr5, 'get_lock'))
840 self.assertFalse(hasattr(arr5, 'get_obj'))
841
Benjamin Petersondfd79492008-06-13 19:13:39 +0000842
843class _TestArray(BaseTestCase):
844
845 def f(self, seq):
846 for i in range(1, len(seq)):
847 seq[i] += seq[i-1]
848
849 def test_array(self, raw=False):
850 if self.TYPE != 'processes':
851 return
852
853 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
854 if raw:
855 arr = self.RawArray('i', seq)
856 else:
857 arr = self.Array('i', seq)
858
859 self.assertEqual(len(arr), len(seq))
860 self.assertEqual(arr[3], seq[3])
861 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
862
863 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
864
865 self.assertEqual(list(arr[:]), seq)
866
867 self.f(seq)
868
869 p = self.Process(target=self.f, args=(arr,))
870 p.start()
871 p.join()
872
873 self.assertEqual(list(arr[:]), seq)
874
875 def test_rawarray(self):
876 self.test_array(raw=True)
877
878 def test_getobj_getlock_obj(self):
879 if self.TYPE != 'processes':
880 return
881
882 arr1 = self.Array('i', range(10))
883 lock1 = arr1.get_lock()
884 obj1 = arr1.get_obj()
885
886 arr2 = self.Array('i', range(10), lock=None)
887 lock2 = arr2.get_lock()
888 obj2 = arr2.get_obj()
889
890 lock = self.Lock()
891 arr3 = self.Array('i', range(10), lock=lock)
892 lock3 = arr3.get_lock()
893 obj3 = arr3.get_obj()
894 self.assertEqual(lock, lock3)
895
Jesse Noller6ab22152009-01-18 02:45:38 +0000896 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000897 self.assertFalse(hasattr(arr4, 'get_lock'))
898 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000899 self.assertRaises(AttributeError,
900 self.Array, 'i', range(10), lock='notalock')
901
902 arr5 = self.RawArray('i', range(10))
903 self.assertFalse(hasattr(arr5, 'get_lock'))
904 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000905
906#
907#
908#
909
910class _TestContainers(BaseTestCase):
911
912 ALLOWED_TYPES = ('manager',)
913
914 def test_list(self):
915 a = self.list(range(10))
916 self.assertEqual(a[:], range(10))
917
918 b = self.list()
919 self.assertEqual(b[:], [])
920
921 b.extend(range(5))
922 self.assertEqual(b[:], range(5))
923
924 self.assertEqual(b[2], 2)
925 self.assertEqual(b[2:10], [2,3,4])
926
927 b *= 2
928 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
929
930 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
931
932 self.assertEqual(a[:], range(10))
933
934 d = [a, b]
935 e = self.list(d)
936 self.assertEqual(
937 e[:],
938 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
939 )
940
941 f = self.list([a])
942 a.append('hello')
943 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
944
945 def test_dict(self):
946 d = self.dict()
947 indices = range(65, 70)
948 for i in indices:
949 d[i] = chr(i)
950 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
951 self.assertEqual(sorted(d.keys()), indices)
952 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
953 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
954
955 def test_namespace(self):
956 n = self.Namespace()
957 n.name = 'Bob'
958 n.job = 'Builder'
959 n._hidden = 'hidden'
960 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
961 del n.job
962 self.assertEqual(str(n), "Namespace(name='Bob')")
963 self.assertTrue(hasattr(n, 'name'))
964 self.assertTrue(not hasattr(n, 'job'))
965
966#
967#
968#
969
970def sqr(x, wait=0.0):
971 time.sleep(wait)
972 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000973class _TestPool(BaseTestCase):
974
975 def test_apply(self):
976 papply = self.pool.apply
977 self.assertEqual(papply(sqr, (5,)), sqr(5))
978 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
979
980 def test_map(self):
981 pmap = self.pool.map
982 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
983 self.assertEqual(pmap(sqr, range(100), chunksize=20),
984 map(sqr, range(100)))
985
986 def test_async(self):
987 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
988 get = TimingWrapper(res.get)
989 self.assertEqual(get(), 49)
990 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
991
992 def test_async_timeout(self):
993 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
994 get = TimingWrapper(res.get)
995 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
996 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
997
998 def test_imap(self):
999 it = self.pool.imap(sqr, range(10))
1000 self.assertEqual(list(it), map(sqr, range(10)))
1001
1002 it = self.pool.imap(sqr, range(10))
1003 for i in range(10):
1004 self.assertEqual(it.next(), i*i)
1005 self.assertRaises(StopIteration, it.next)
1006
1007 it = self.pool.imap(sqr, range(1000), chunksize=100)
1008 for i in range(1000):
1009 self.assertEqual(it.next(), i*i)
1010 self.assertRaises(StopIteration, it.next)
1011
1012 def test_imap_unordered(self):
1013 it = self.pool.imap_unordered(sqr, range(1000))
1014 self.assertEqual(sorted(it), map(sqr, range(1000)))
1015
1016 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1017 self.assertEqual(sorted(it), map(sqr, range(1000)))
1018
1019 def test_make_pool(self):
1020 p = multiprocessing.Pool(3)
1021 self.assertEqual(3, len(p._pool))
1022 p.close()
1023 p.join()
1024
1025 def test_terminate(self):
1026 if self.TYPE == 'manager':
1027 # On Unix a forked process increfs each shared object to
1028 # which its parent process held a reference. If the
1029 # forked process gets terminated then there is likely to
1030 # be a reference leak. So to prevent
1031 # _TestZZZNumberOfObjects from failing we skip this test
1032 # when using a manager.
1033 return
1034
1035 result = self.pool.map_async(
1036 time.sleep, [0.1 for i in range(10000)], chunksize=1
1037 )
1038 self.pool.terminate()
1039 join = TimingWrapper(self.pool.join)
1040 join()
1041 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001042#
1043# Test that manager has expected number of shared objects left
1044#
1045
1046class _TestZZZNumberOfObjects(BaseTestCase):
1047 # Because test cases are sorted alphabetically, this one will get
1048 # run after all the other tests for the manager. It tests that
1049 # there have been no "reference leaks" for the manager's shared
1050 # objects. Note the comment in _TestPool.test_terminate().
1051 ALLOWED_TYPES = ('manager',)
1052
1053 def test_number_of_objects(self):
1054 EXPECTED_NUMBER = 1 # the pool object is still alive
1055 multiprocessing.active_children() # discard dead process objs
1056 gc.collect() # do garbage collection
1057 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001058 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001059 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001060 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001061 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001062
1063 self.assertEqual(refs, EXPECTED_NUMBER)
1064
1065#
1066# Test of creating a customized manager class
1067#
1068
1069from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1070
1071class FooBar(object):
1072 def f(self):
1073 return 'f()'
1074 def g(self):
1075 raise ValueError
1076 def _h(self):
1077 return '_h()'
1078
1079def baz():
1080 for i in xrange(10):
1081 yield i*i
1082
1083class IteratorProxy(BaseProxy):
1084 _exposed_ = ('next', '__next__')
1085 def __iter__(self):
1086 return self
1087 def next(self):
1088 return self._callmethod('next')
1089 def __next__(self):
1090 return self._callmethod('__next__')
1091
1092class MyManager(BaseManager):
1093 pass
1094
1095MyManager.register('Foo', callable=FooBar)
1096MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1097MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1098
1099
1100class _TestMyManager(BaseTestCase):
1101
1102 ALLOWED_TYPES = ('manager',)
1103
1104 def test_mymanager(self):
1105 manager = MyManager()
1106 manager.start()
1107
1108 foo = manager.Foo()
1109 bar = manager.Bar()
1110 baz = manager.baz()
1111
1112 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1113 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1114
1115 self.assertEqual(foo_methods, ['f', 'g'])
1116 self.assertEqual(bar_methods, ['f', '_h'])
1117
1118 self.assertEqual(foo.f(), 'f()')
1119 self.assertRaises(ValueError, foo.g)
1120 self.assertEqual(foo._callmethod('f'), 'f()')
1121 self.assertRaises(RemoteError, foo._callmethod, '_h')
1122
1123 self.assertEqual(bar.f(), 'f()')
1124 self.assertEqual(bar._h(), '_h()')
1125 self.assertEqual(bar._callmethod('f'), 'f()')
1126 self.assertEqual(bar._callmethod('_h'), '_h()')
1127
1128 self.assertEqual(list(baz), [i*i for i in range(10)])
1129
1130 manager.shutdown()
1131
1132#
1133# Test of connecting to a remote server and using xmlrpclib for serialization
1134#
1135
1136_queue = Queue.Queue()
1137def get_queue():
1138 return _queue
1139
1140class QueueManager(BaseManager):
1141 '''manager class used by server process'''
1142QueueManager.register('get_queue', callable=get_queue)
1143
1144class QueueManager2(BaseManager):
1145 '''manager class which specifies the same interface as QueueManager'''
1146QueueManager2.register('get_queue')
1147
1148
1149SERIALIZER = 'xmlrpclib'
1150
1151class _TestRemoteManager(BaseTestCase):
1152
1153 ALLOWED_TYPES = ('manager',)
1154
1155 def _putter(self, address, authkey):
1156 manager = QueueManager2(
1157 address=address, authkey=authkey, serializer=SERIALIZER
1158 )
1159 manager.connect()
1160 queue = manager.get_queue()
1161 queue.put(('hello world', None, True, 2.25))
1162
1163 def test_remote(self):
1164 authkey = os.urandom(32)
1165
1166 manager = QueueManager(
1167 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1168 )
1169 manager.start()
1170
1171 p = self.Process(target=self._putter, args=(manager.address, authkey))
1172 p.start()
1173
1174 manager2 = QueueManager2(
1175 address=manager.address, authkey=authkey, serializer=SERIALIZER
1176 )
1177 manager2.connect()
1178 queue = manager2.get_queue()
1179
1180 # Note that xmlrpclib will deserialize object as a list not a tuple
1181 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1182
1183 # Because we are using xmlrpclib for serialization instead of
1184 # pickle this will cause a serialization error.
1185 self.assertRaises(Exception, queue.put, time.sleep)
1186
1187 # Make queue finalizer run before the server is stopped
1188 del queue
1189 manager.shutdown()
1190
Jesse Noller459a6482009-03-30 15:50:42 +00001191class _TestManagerRestart(BaseTestCase):
1192
1193 def _putter(self, address, authkey):
1194 manager = QueueManager(
1195 address=address, authkey=authkey, serializer=SERIALIZER)
1196 manager.connect()
1197 queue = manager.get_queue()
1198 queue.put('hello world')
1199
1200 def test_rapid_restart(self):
1201 authkey = os.urandom(32)
1202 manager = QueueManager(
1203 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1204 manager.start()
1205
1206 p = self.Process(target=self._putter, args=(manager.address, authkey))
1207 p.start()
1208 queue = manager.get_queue()
1209 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001210 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001211 manager.shutdown()
1212 manager = QueueManager(
1213 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1214 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001215 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001216
Benjamin Petersondfd79492008-06-13 19:13:39 +00001217#
1218#
1219#
1220
1221SENTINEL = latin('')
1222
1223class _TestConnection(BaseTestCase):
1224
1225 ALLOWED_TYPES = ('processes', 'threads')
1226
1227 def _echo(self, conn):
1228 for msg in iter(conn.recv_bytes, SENTINEL):
1229 conn.send_bytes(msg)
1230 conn.close()
1231
1232 def test_connection(self):
1233 conn, child_conn = self.Pipe()
1234
1235 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001236 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001237 p.start()
1238
1239 seq = [1, 2.25, None]
1240 msg = latin('hello world')
1241 longmsg = msg * 10
1242 arr = array.array('i', range(4))
1243
1244 if self.TYPE == 'processes':
1245 self.assertEqual(type(conn.fileno()), int)
1246
1247 self.assertEqual(conn.send(seq), None)
1248 self.assertEqual(conn.recv(), seq)
1249
1250 self.assertEqual(conn.send_bytes(msg), None)
1251 self.assertEqual(conn.recv_bytes(), msg)
1252
1253 if self.TYPE == 'processes':
1254 buffer = array.array('i', [0]*10)
1255 expected = list(arr) + [0] * (10 - len(arr))
1256 self.assertEqual(conn.send_bytes(arr), None)
1257 self.assertEqual(conn.recv_bytes_into(buffer),
1258 len(arr) * buffer.itemsize)
1259 self.assertEqual(list(buffer), expected)
1260
1261 buffer = array.array('i', [0]*10)
1262 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1263 self.assertEqual(conn.send_bytes(arr), None)
1264 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1265 len(arr) * buffer.itemsize)
1266 self.assertEqual(list(buffer), expected)
1267
1268 buffer = bytearray(latin(' ' * 40))
1269 self.assertEqual(conn.send_bytes(longmsg), None)
1270 try:
1271 res = conn.recv_bytes_into(buffer)
1272 except multiprocessing.BufferTooShort, e:
1273 self.assertEqual(e.args, (longmsg,))
1274 else:
1275 self.fail('expected BufferTooShort, got %s' % res)
1276
1277 poll = TimingWrapper(conn.poll)
1278
1279 self.assertEqual(poll(), False)
1280 self.assertTimingAlmostEqual(poll.elapsed, 0)
1281
1282 self.assertEqual(poll(TIMEOUT1), False)
1283 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1284
1285 conn.send(None)
1286
1287 self.assertEqual(poll(TIMEOUT1), True)
1288 self.assertTimingAlmostEqual(poll.elapsed, 0)
1289
1290 self.assertEqual(conn.recv(), None)
1291
1292 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1293 conn.send_bytes(really_big_msg)
1294 self.assertEqual(conn.recv_bytes(), really_big_msg)
1295
1296 conn.send_bytes(SENTINEL) # tell child to quit
1297 child_conn.close()
1298
1299 if self.TYPE == 'processes':
1300 self.assertEqual(conn.readable, True)
1301 self.assertEqual(conn.writable, True)
1302 self.assertRaises(EOFError, conn.recv)
1303 self.assertRaises(EOFError, conn.recv_bytes)
1304
1305 p.join()
1306
1307 def test_duplex_false(self):
1308 reader, writer = self.Pipe(duplex=False)
1309 self.assertEqual(writer.send(1), None)
1310 self.assertEqual(reader.recv(), 1)
1311 if self.TYPE == 'processes':
1312 self.assertEqual(reader.readable, True)
1313 self.assertEqual(reader.writable, False)
1314 self.assertEqual(writer.readable, False)
1315 self.assertEqual(writer.writable, True)
1316 self.assertRaises(IOError, reader.send, 2)
1317 self.assertRaises(IOError, writer.recv)
1318 self.assertRaises(IOError, writer.poll)
1319
1320 def test_spawn_close(self):
1321 # We test that a pipe connection can be closed by parent
1322 # process immediately after child is spawned. On Windows this
1323 # would have sometimes failed on old versions because
1324 # child_conn would be closed before the child got a chance to
1325 # duplicate it.
1326 conn, child_conn = self.Pipe()
1327
1328 p = self.Process(target=self._echo, args=(child_conn,))
1329 p.start()
1330 child_conn.close() # this might complete before child initializes
1331
1332 msg = latin('hello')
1333 conn.send_bytes(msg)
1334 self.assertEqual(conn.recv_bytes(), msg)
1335
1336 conn.send_bytes(SENTINEL)
1337 conn.close()
1338 p.join()
1339
1340 def test_sendbytes(self):
1341 if self.TYPE != 'processes':
1342 return
1343
1344 msg = latin('abcdefghijklmnopqrstuvwxyz')
1345 a, b = self.Pipe()
1346
1347 a.send_bytes(msg)
1348 self.assertEqual(b.recv_bytes(), msg)
1349
1350 a.send_bytes(msg, 5)
1351 self.assertEqual(b.recv_bytes(), msg[5:])
1352
1353 a.send_bytes(msg, 7, 8)
1354 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1355
1356 a.send_bytes(msg, 26)
1357 self.assertEqual(b.recv_bytes(), latin(''))
1358
1359 a.send_bytes(msg, 26, 0)
1360 self.assertEqual(b.recv_bytes(), latin(''))
1361
1362 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1363
1364 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1365
1366 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1367
1368 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1369
1370 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1371
Benjamin Petersondfd79492008-06-13 19:13:39 +00001372class _TestListenerClient(BaseTestCase):
1373
1374 ALLOWED_TYPES = ('processes', 'threads')
1375
1376 def _test(self, address):
1377 conn = self.connection.Client(address)
1378 conn.send('hello')
1379 conn.close()
1380
1381 def test_listener_client(self):
1382 for family in self.connection.families:
1383 l = self.connection.Listener(family=family)
1384 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001385 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001386 p.start()
1387 conn = l.accept()
1388 self.assertEqual(conn.recv(), 'hello')
1389 p.join()
1390 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001391#
1392# Test of sending connection and socket objects between processes
1393#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001394"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001395class _TestPicklingConnections(BaseTestCase):
1396
1397 ALLOWED_TYPES = ('processes',)
1398
1399 def _listener(self, conn, families):
1400 for fam in families:
1401 l = self.connection.Listener(family=fam)
1402 conn.send(l.address)
1403 new_conn = l.accept()
1404 conn.send(new_conn)
1405
1406 if self.TYPE == 'processes':
1407 l = socket.socket()
1408 l.bind(('localhost', 0))
1409 conn.send(l.getsockname())
1410 l.listen(1)
1411 new_conn, addr = l.accept()
1412 conn.send(new_conn)
1413
1414 conn.recv()
1415
1416 def _remote(self, conn):
1417 for (address, msg) in iter(conn.recv, None):
1418 client = self.connection.Client(address)
1419 client.send(msg.upper())
1420 client.close()
1421
1422 if self.TYPE == 'processes':
1423 address, msg = conn.recv()
1424 client = socket.socket()
1425 client.connect(address)
1426 client.sendall(msg.upper())
1427 client.close()
1428
1429 conn.close()
1430
1431 def test_pickling(self):
1432 try:
1433 multiprocessing.allow_connection_pickling()
1434 except ImportError:
1435 return
1436
1437 families = self.connection.families
1438
1439 lconn, lconn0 = self.Pipe()
1440 lp = self.Process(target=self._listener, args=(lconn0, families))
1441 lp.start()
1442 lconn0.close()
1443
1444 rconn, rconn0 = self.Pipe()
1445 rp = self.Process(target=self._remote, args=(rconn0,))
1446 rp.start()
1447 rconn0.close()
1448
1449 for fam in families:
1450 msg = ('This connection uses family %s' % fam).encode('ascii')
1451 address = lconn.recv()
1452 rconn.send((address, msg))
1453 new_conn = lconn.recv()
1454 self.assertEqual(new_conn.recv(), msg.upper())
1455
1456 rconn.send(None)
1457
1458 if self.TYPE == 'processes':
1459 msg = latin('This connection uses a normal socket')
1460 address = lconn.recv()
1461 rconn.send((address, msg))
1462 if hasattr(socket, 'fromfd'):
1463 new_conn = lconn.recv()
1464 self.assertEqual(new_conn.recv(100), msg.upper())
1465 else:
1466 # XXX On Windows with Py2.6 need to backport fromfd()
1467 discard = lconn.recv_bytes()
1468
1469 lconn.send(None)
1470
1471 rconn.close()
1472 lconn.close()
1473
1474 lp.join()
1475 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001476"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001477#
1478#
1479#
1480
1481class _TestHeap(BaseTestCase):
1482
1483 ALLOWED_TYPES = ('processes',)
1484
1485 def test_heap(self):
1486 iterations = 5000
1487 maxblocks = 50
1488 blocks = []
1489
1490 # create and destroy lots of blocks of different sizes
1491 for i in xrange(iterations):
1492 size = int(random.lognormvariate(0, 1) * 1000)
1493 b = multiprocessing.heap.BufferWrapper(size)
1494 blocks.append(b)
1495 if len(blocks) > maxblocks:
1496 i = random.randrange(maxblocks)
1497 del blocks[i]
1498
1499 # get the heap object
1500 heap = multiprocessing.heap.BufferWrapper._heap
1501
1502 # verify the state of the heap
1503 all = []
1504 occupied = 0
1505 for L in heap._len_to_seq.values():
1506 for arena, start, stop in L:
1507 all.append((heap._arenas.index(arena), start, stop,
1508 stop-start, 'free'))
1509 for arena, start, stop in heap._allocated_blocks:
1510 all.append((heap._arenas.index(arena), start, stop,
1511 stop-start, 'occupied'))
1512 occupied += (stop-start)
1513
1514 all.sort()
1515
1516 for i in range(len(all)-1):
1517 (arena, start, stop) = all[i][:3]
1518 (narena, nstart, nstop) = all[i+1][:3]
1519 self.assertTrue((arena != narena and nstart == 0) or
1520 (stop == nstart))
1521
1522#
1523#
1524#
1525
1526try:
1527 from ctypes import Structure, Value, copy, c_int, c_double
1528except ImportError:
1529 Structure = object
1530 c_int = c_double = None
1531
1532class _Foo(Structure):
1533 _fields_ = [
1534 ('x', c_int),
1535 ('y', c_double)
1536 ]
1537
1538class _TestSharedCTypes(BaseTestCase):
1539
1540 ALLOWED_TYPES = ('processes',)
1541
1542 def _double(self, x, y, foo, arr, string):
1543 x.value *= 2
1544 y.value *= 2
1545 foo.x *= 2
1546 foo.y *= 2
1547 string.value *= 2
1548 for i in range(len(arr)):
1549 arr[i] *= 2
1550
1551 def test_sharedctypes(self, lock=False):
1552 if c_int is None:
1553 return
1554
1555 x = Value('i', 7, lock=lock)
1556 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1557 foo = Value(_Foo, 3, 2, lock=lock)
1558 arr = Array('d', range(10), lock=lock)
1559 string = Array('c', 20, lock=lock)
1560 string.value = 'hello'
1561
1562 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1563 p.start()
1564 p.join()
1565
1566 self.assertEqual(x.value, 14)
1567 self.assertAlmostEqual(y.value, 2.0/3.0)
1568 self.assertEqual(foo.x, 6)
1569 self.assertAlmostEqual(foo.y, 4.0)
1570 for i in range(10):
1571 self.assertAlmostEqual(arr[i], i*2)
1572 self.assertEqual(string.value, latin('hellohello'))
1573
1574 def test_synchronize(self):
1575 self.test_sharedctypes(lock=True)
1576
1577 def test_copy(self):
1578 if c_int is None:
1579 return
1580
1581 foo = _Foo(2, 5.0)
1582 bar = copy(foo)
1583 foo.x = 0
1584 foo.y = 0
1585 self.assertEqual(bar.x, 2)
1586 self.assertAlmostEqual(bar.y, 5.0)
1587
1588#
1589#
1590#
1591
1592class _TestFinalize(BaseTestCase):
1593
1594 ALLOWED_TYPES = ('processes',)
1595
1596 def _test_finalize(self, conn):
1597 class Foo(object):
1598 pass
1599
1600 a = Foo()
1601 util.Finalize(a, conn.send, args=('a',))
1602 del a # triggers callback for a
1603
1604 b = Foo()
1605 close_b = util.Finalize(b, conn.send, args=('b',))
1606 close_b() # triggers callback for b
1607 close_b() # does nothing because callback has already been called
1608 del b # does nothing because callback has already been called
1609
1610 c = Foo()
1611 util.Finalize(c, conn.send, args=('c',))
1612
1613 d10 = Foo()
1614 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1615
1616 d01 = Foo()
1617 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1618 d02 = Foo()
1619 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1620 d03 = Foo()
1621 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1622
1623 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1624
1625 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1626
1627 # call mutliprocessing's cleanup function then exit process without
1628 # garbage collecting locals
1629 util._exit_function()
1630 conn.close()
1631 os._exit(0)
1632
1633 def test_finalize(self):
1634 conn, child_conn = self.Pipe()
1635
1636 p = self.Process(target=self._test_finalize, args=(child_conn,))
1637 p.start()
1638 p.join()
1639
1640 result = [obj for obj in iter(conn.recv, 'STOP')]
1641 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1642
1643#
1644# Test that from ... import * works for each module
1645#
1646
1647class _TestImportStar(BaseTestCase):
1648
1649 ALLOWED_TYPES = ('processes',)
1650
1651 def test_import(self):
1652 modules = (
1653 'multiprocessing', 'multiprocessing.connection',
1654 'multiprocessing.heap', 'multiprocessing.managers',
1655 'multiprocessing.pool', 'multiprocessing.process',
1656 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1657 'multiprocessing.synchronize', 'multiprocessing.util'
1658 )
1659
1660 for name in modules:
1661 __import__(name)
1662 mod = sys.modules[name]
1663
1664 for attr in getattr(mod, '__all__', ()):
1665 self.assertTrue(
1666 hasattr(mod, attr),
1667 '%r does not have attribute %r' % (mod, attr)
1668 )
1669
1670#
1671# Quick test that logging works -- does not test logging output
1672#
1673
1674class _TestLogging(BaseTestCase):
1675
1676 ALLOWED_TYPES = ('processes',)
1677
1678 def test_enable_logging(self):
1679 logger = multiprocessing.get_logger()
1680 logger.setLevel(util.SUBWARNING)
1681 self.assertTrue(logger is not None)
1682 logger.debug('this will not be printed')
1683 logger.info('nor will this')
1684 logger.setLevel(LOG_LEVEL)
1685
1686 def _test_level(self, conn):
1687 logger = multiprocessing.get_logger()
1688 conn.send(logger.getEffectiveLevel())
1689
1690 def test_level(self):
1691 LEVEL1 = 32
1692 LEVEL2 = 37
1693
1694 logger = multiprocessing.get_logger()
1695 root_logger = logging.getLogger()
1696 root_level = root_logger.level
1697
1698 reader, writer = multiprocessing.Pipe(duplex=False)
1699
1700 logger.setLevel(LEVEL1)
1701 self.Process(target=self._test_level, args=(writer,)).start()
1702 self.assertEqual(LEVEL1, reader.recv())
1703
1704 logger.setLevel(logging.NOTSET)
1705 root_logger.setLevel(LEVEL2)
1706 self.Process(target=self._test_level, args=(writer,)).start()
1707 self.assertEqual(LEVEL2, reader.recv())
1708
1709 root_logger.setLevel(root_level)
1710 logger.setLevel(level=LOG_LEVEL)
1711
1712#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001713# Test to verify handle verification, see issue 3321
1714#
1715
1716class TestInvalidHandle(unittest.TestCase):
1717
1718 def test_invalid_handles(self):
1719 if WIN32:
1720 return
1721 conn = _multiprocessing.Connection(44977608)
1722 self.assertRaises(IOError, conn.poll)
1723 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1724#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001725# Functions used to create test cases from the base ones in this module
1726#
1727
1728def get_attributes(Source, names):
1729 d = {}
1730 for name in names:
1731 obj = getattr(Source, name)
1732 if type(obj) == type(get_attributes):
1733 obj = staticmethod(obj)
1734 d[name] = obj
1735 return d
1736
1737def create_test_cases(Mixin, type):
1738 result = {}
1739 glob = globals()
1740 Type = type[0].upper() + type[1:]
1741
1742 for name in glob.keys():
1743 if name.startswith('_Test'):
1744 base = glob[name]
1745 if type in base.ALLOWED_TYPES:
1746 newname = 'With' + Type + name[1:]
1747 class Temp(base, unittest.TestCase, Mixin):
1748 pass
1749 result[newname] = Temp
1750 Temp.__name__ = newname
1751 Temp.__module__ = Mixin.__module__
1752 return result
1753
1754#
1755# Create test cases
1756#
1757
1758class ProcessesMixin(object):
1759 TYPE = 'processes'
1760 Process = multiprocessing.Process
1761 locals().update(get_attributes(multiprocessing, (
1762 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1763 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1764 'RawArray', 'current_process', 'active_children', 'Pipe',
1765 'connection', 'JoinableQueue'
1766 )))
1767
1768testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1769globals().update(testcases_processes)
1770
1771
1772class ManagerMixin(object):
1773 TYPE = 'manager'
1774 Process = multiprocessing.Process
1775 manager = object.__new__(multiprocessing.managers.SyncManager)
1776 locals().update(get_attributes(manager, (
1777 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1778 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1779 'Namespace', 'JoinableQueue'
1780 )))
1781
1782testcases_manager = create_test_cases(ManagerMixin, type='manager')
1783globals().update(testcases_manager)
1784
1785
1786class ThreadsMixin(object):
1787 TYPE = 'threads'
1788 Process = multiprocessing.dummy.Process
1789 locals().update(get_attributes(multiprocessing.dummy, (
1790 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1791 'Condition', 'Event', 'Value', 'Array', 'current_process',
1792 'active_children', 'Pipe', 'connection', 'dict', 'list',
1793 'Namespace', 'JoinableQueue'
1794 )))
1795
1796testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1797globals().update(testcases_threads)
1798
Neal Norwitz0c519b32008-08-25 01:50:24 +00001799class OtherTest(unittest.TestCase):
1800 # TODO: add more tests for deliver/answer challenge.
1801 def test_deliver_challenge_auth_failure(self):
1802 class _FakeConnection(object):
1803 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001804 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001805 def send_bytes(self, data):
1806 pass
1807 self.assertRaises(multiprocessing.AuthenticationError,
1808 multiprocessing.connection.deliver_challenge,
1809 _FakeConnection(), b'abc')
1810
1811 def test_answer_challenge_auth_failure(self):
1812 class _FakeConnection(object):
1813 def __init__(self):
1814 self.count = 0
1815 def recv_bytes(self, size):
1816 self.count += 1
1817 if self.count == 1:
1818 return multiprocessing.connection.CHALLENGE
1819 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001820 return b'something bogus'
1821 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001822 def send_bytes(self, data):
1823 pass
1824 self.assertRaises(multiprocessing.AuthenticationError,
1825 multiprocessing.connection.answer_challenge,
1826 _FakeConnection(), b'abc')
1827
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001828testcases_other = [OtherTest, TestInvalidHandle]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001829
Benjamin Petersondfd79492008-06-13 19:13:39 +00001830#
1831#
1832#
1833
1834def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001835 if sys.platform.startswith("linux"):
1836 try:
1837 lock = multiprocessing.RLock()
1838 except OSError:
Benjamin Peterson888a39b2009-03-26 20:48:25 +00001839 from test.test_support import SkipTest
Benjamin Petersonbec087f2009-03-26 21:10:30 +00001840 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001841
Benjamin Petersondfd79492008-06-13 19:13:39 +00001842 if run is None:
1843 from test.test_support import run_unittest as run
1844
1845 util.get_temp_dir() # creates temp directory for use by all processes
1846
1847 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1848
Jesse Noller146b7ab2008-07-02 16:44:09 +00001849 ProcessesMixin.pool = multiprocessing.Pool(4)
1850 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1851 ManagerMixin.manager.__init__()
1852 ManagerMixin.manager.start()
1853 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001854
1855 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001856 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1857 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00001858 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1859 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00001860 )
1861
1862 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1863 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1864 run(suite)
1865
Jesse Noller146b7ab2008-07-02 16:44:09 +00001866 ThreadsMixin.pool.terminate()
1867 ProcessesMixin.pool.terminate()
1868 ManagerMixin.pool.terminate()
1869 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001870
Jesse Noller146b7ab2008-07-02 16:44:09 +00001871 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001872
1873def main():
1874 test_main(unittest.TextTestRunner(verbosity=2).run)
1875
1876if __name__ == '__main__':
1877 main()