blob: 7582cc3cf471662985209b8032690dd84cbe17fa [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
R. David Murray17438dc2009-07-21 17:02:14 +000020from StringIO import StringIO
R. David Murraya657b0c2009-12-14 22:18:57 +000021from test import test_support
Benjamin Petersondfd79492008-06-13 19:13:39 +000022
Jesse Noller37040cd2008-09-30 00:15:45 +000023
24# Work around broken sem_open implementations
25try:
26 import multiprocessing.synchronize
27except ImportError, e:
28 from test.test_support import TestSkipped
29 raise TestSkipped(e)
30
Benjamin Petersondfd79492008-06-13 19:13:39 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000035import multiprocessing.pool
36import _multiprocessing
37
38from multiprocessing import util
39
40#
41#
42#
43
Benjamin Petersone79edf52008-07-13 18:34:58 +000044latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000045
Benjamin Petersondfd79492008-06-13 19:13:39 +000046#
47# Constants
48#
49
50LOG_LEVEL = util.SUBWARNING
51#LOG_LEVEL = logging.WARNING
52
53DELTA = 0.1
54CHECK_TIMINGS = False # making true makes tests take a lot longer
55 # and can sometimes cause some non-serious
56 # failures because some calls block a bit
57 # longer than expected
58if CHECK_TIMINGS:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
60else:
61 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
62
63HAVE_GETVALUE = not getattr(_multiprocessing,
64 'HAVE_BROKEN_SEM_GETVALUE', False)
65
Jesse Nollere6bab482009-03-30 16:11:16 +000066WIN32 = (sys.platform == "win32")
67
Benjamin Petersondfd79492008-06-13 19:13:39 +000068#
69# Creates a wrapper for a function which records the time it takes to finish
70#
71
72class TimingWrapper(object):
73
74 def __init__(self, func):
75 self.func = func
76 self.elapsed = None
77
78 def __call__(self, *args, **kwds):
79 t = time.time()
80 try:
81 return self.func(*args, **kwds)
82 finally:
83 self.elapsed = time.time() - t
84
85#
86# Base class for test cases
87#
88
89class BaseTestCase(object):
90
91 ALLOWED_TYPES = ('processes', 'manager', 'threads')
92
93 def assertTimingAlmostEqual(self, a, b):
94 if CHECK_TIMINGS:
95 self.assertAlmostEqual(a, b, 1)
96
97 def assertReturnsIfImplemented(self, value, func, *args):
98 try:
99 res = func(*args)
100 except NotImplementedError:
101 pass
102 else:
103 return self.assertEqual(value, res)
104
105#
106# Return the value of a semaphore
107#
108
109def get_value(self):
110 try:
111 return self.get_value()
112 except AttributeError:
113 try:
114 return self._Semaphore__value
115 except AttributeError:
116 try:
117 return self._value
118 except AttributeError:
119 raise NotImplementedError
120
121#
122# Testcases
123#
124
125class _TestProcess(BaseTestCase):
126
127 ALLOWED_TYPES = ('processes', 'threads')
128
129 def test_current(self):
130 if self.TYPE == 'threads':
131 return
132
133 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000134 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000135
136 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000137 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000138 self.assertTrue(isinstance(authkey, bytes))
139 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000140 self.assertEqual(current.ident, os.getpid())
141 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000142
143 def _test(self, q, *args, **kwds):
144 current = self.current_process()
145 q.put(args)
146 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000147 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000148 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000149 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000150 q.put(current.pid)
151
152 def test_process(self):
153 q = self.Queue(1)
154 e = self.Event()
155 args = (q, 1, 2)
156 kwargs = {'hello':23, 'bye':2.54}
157 name = 'SomeProcess'
158 p = self.Process(
159 target=self._test, args=args, kwargs=kwargs, name=name
160 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000161 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162 current = self.current_process()
163
164 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000165 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000167 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000168 self.assertTrue(p not in self.active_children())
169 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171
172 p.start()
173
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000174 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000175 self.assertEquals(p.is_alive(), True)
176 self.assertTrue(p in self.active_children())
177
178 self.assertEquals(q.get(), args[1:])
179 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000180 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000181 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000182 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000183 self.assertEquals(q.get(), p.pid)
184
185 p.join()
186
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188 self.assertEquals(p.is_alive(), False)
189 self.assertTrue(p not in self.active_children())
190
191 def _test_terminate(self):
192 time.sleep(1000)
193
194 def test_terminate(self):
195 if self.TYPE == 'threads':
196 return
197
198 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000199 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000200 p.start()
201
202 self.assertEqual(p.is_alive(), True)
203 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000204 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205
206 p.terminate()
207
208 join = TimingWrapper(p.join)
209 self.assertEqual(join(), None)
210 self.assertTimingAlmostEqual(join.elapsed, 0.0)
211
212 self.assertEqual(p.is_alive(), False)
213 self.assertTrue(p not in self.active_children())
214
215 p.join()
216
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000217 # XXX sometimes get p.exitcode == 0 on Windows ...
218 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000219
220 def test_cpu_count(self):
221 try:
222 cpus = multiprocessing.cpu_count()
223 except NotImplementedError:
224 cpus = 1
225 self.assertTrue(type(cpus) is int)
226 self.assertTrue(cpus >= 1)
227
228 def test_active_children(self):
229 self.assertEqual(type(self.active_children()), list)
230
231 p = self.Process(target=time.sleep, args=(DELTA,))
232 self.assertTrue(p not in self.active_children())
233
234 p.start()
235 self.assertTrue(p in self.active_children())
236
237 p.join()
238 self.assertTrue(p not in self.active_children())
239
240 def _test_recursion(self, wconn, id):
241 from multiprocessing import forking
242 wconn.send(id)
243 if len(id) < 2:
244 for i in range(2):
245 p = self.Process(
246 target=self._test_recursion, args=(wconn, id+[i])
247 )
248 p.start()
249 p.join()
250
251 def test_recursion(self):
252 rconn, wconn = self.Pipe(duplex=False)
253 self._test_recursion(wconn, [])
254
255 time.sleep(DELTA)
256 result = []
257 while rconn.poll():
258 result.append(rconn.recv())
259
260 expected = [
261 [],
262 [0],
263 [0, 0],
264 [0, 1],
265 [1],
266 [1, 0],
267 [1, 1]
268 ]
269 self.assertEqual(result, expected)
270
271#
272#
273#
274
275class _UpperCaser(multiprocessing.Process):
276
277 def __init__(self):
278 multiprocessing.Process.__init__(self)
279 self.child_conn, self.parent_conn = multiprocessing.Pipe()
280
281 def run(self):
282 self.parent_conn.close()
283 for s in iter(self.child_conn.recv, None):
284 self.child_conn.send(s.upper())
285 self.child_conn.close()
286
287 def submit(self, s):
288 assert type(s) is str
289 self.parent_conn.send(s)
290 return self.parent_conn.recv()
291
292 def stop(self):
293 self.parent_conn.send(None)
294 self.parent_conn.close()
295 self.child_conn.close()
296
297class _TestSubclassingProcess(BaseTestCase):
298
299 ALLOWED_TYPES = ('processes',)
300
301 def test_subclassing(self):
302 uppercaser = _UpperCaser()
303 uppercaser.start()
304 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
305 self.assertEqual(uppercaser.submit('world'), 'WORLD')
306 uppercaser.stop()
307 uppercaser.join()
308
309#
310#
311#
312
313def queue_empty(q):
314 if hasattr(q, 'empty'):
315 return q.empty()
316 else:
317 return q.qsize() == 0
318
319def queue_full(q, maxsize):
320 if hasattr(q, 'full'):
321 return q.full()
322 else:
323 return q.qsize() == maxsize
324
325
326class _TestQueue(BaseTestCase):
327
328
329 def _test_put(self, queue, child_can_start, parent_can_continue):
330 child_can_start.wait()
331 for i in range(6):
332 queue.get()
333 parent_can_continue.set()
334
335 def test_put(self):
336 MAXSIZE = 6
337 queue = self.Queue(maxsize=MAXSIZE)
338 child_can_start = self.Event()
339 parent_can_continue = self.Event()
340
341 proc = self.Process(
342 target=self._test_put,
343 args=(queue, child_can_start, parent_can_continue)
344 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000345 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000346 proc.start()
347
348 self.assertEqual(queue_empty(queue), True)
349 self.assertEqual(queue_full(queue, MAXSIZE), False)
350
351 queue.put(1)
352 queue.put(2, True)
353 queue.put(3, True, None)
354 queue.put(4, False)
355 queue.put(5, False, None)
356 queue.put_nowait(6)
357
358 # the values may be in buffer but not yet in pipe so sleep a bit
359 time.sleep(DELTA)
360
361 self.assertEqual(queue_empty(queue), False)
362 self.assertEqual(queue_full(queue, MAXSIZE), True)
363
364 put = TimingWrapper(queue.put)
365 put_nowait = TimingWrapper(queue.put_nowait)
366
367 self.assertRaises(Queue.Full, put, 7, False)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(Queue.Full, put, 7, False, None)
371 self.assertTimingAlmostEqual(put.elapsed, 0)
372
373 self.assertRaises(Queue.Full, put_nowait, 7)
374 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
375
376 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
377 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
378
379 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
380 self.assertTimingAlmostEqual(put.elapsed, 0)
381
382 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
383 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
384
385 child_can_start.set()
386 parent_can_continue.wait()
387
388 self.assertEqual(queue_empty(queue), True)
389 self.assertEqual(queue_full(queue, MAXSIZE), False)
390
391 proc.join()
392
393 def _test_get(self, queue, child_can_start, parent_can_continue):
394 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000395 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000396 queue.put(2)
397 queue.put(3)
398 queue.put(4)
399 queue.put(5)
400 parent_can_continue.set()
401
402 def test_get(self):
403 queue = self.Queue()
404 child_can_start = self.Event()
405 parent_can_continue = self.Event()
406
407 proc = self.Process(
408 target=self._test_get,
409 args=(queue, child_can_start, parent_can_continue)
410 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000411 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000412 proc.start()
413
414 self.assertEqual(queue_empty(queue), True)
415
416 child_can_start.set()
417 parent_can_continue.wait()
418
419 time.sleep(DELTA)
420 self.assertEqual(queue_empty(queue), False)
421
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000422 # Hangs unexpectedly, remove for now
423 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000424 self.assertEqual(queue.get(True, None), 2)
425 self.assertEqual(queue.get(True), 3)
426 self.assertEqual(queue.get(timeout=1), 4)
427 self.assertEqual(queue.get_nowait(), 5)
428
429 self.assertEqual(queue_empty(queue), True)
430
431 get = TimingWrapper(queue.get)
432 get_nowait = TimingWrapper(queue.get_nowait)
433
434 self.assertRaises(Queue.Empty, get, False)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(Queue.Empty, get, False, None)
438 self.assertTimingAlmostEqual(get.elapsed, 0)
439
440 self.assertRaises(Queue.Empty, get_nowait)
441 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
442
443 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
444 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
445
446 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
447 self.assertTimingAlmostEqual(get.elapsed, 0)
448
449 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
450 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
451
452 proc.join()
453
454 def _test_fork(self, queue):
455 for i in range(10, 20):
456 queue.put(i)
457 # note that at this point the items may only be buffered, so the
458 # process cannot shutdown until the feeder thread has finished
459 # pushing items onto the pipe.
460
461 def test_fork(self):
462 # Old versions of Queue would fail to create a new feeder
463 # thread for a forked process if the original process had its
464 # own feeder thread. This test checks that this no longer
465 # happens.
466
467 queue = self.Queue()
468
469 # put items on queue so that main process starts a feeder thread
470 for i in range(10):
471 queue.put(i)
472
473 # wait to make sure thread starts before we fork a new process
474 time.sleep(DELTA)
475
476 # fork process
477 p = self.Process(target=self._test_fork, args=(queue,))
478 p.start()
479
480 # check that all expected items are in the queue
481 for i in range(20):
482 self.assertEqual(queue.get(), i)
483 self.assertRaises(Queue.Empty, queue.get, False)
484
485 p.join()
486
487 def test_qsize(self):
488 q = self.Queue()
489 try:
490 self.assertEqual(q.qsize(), 0)
491 except NotImplementedError:
492 return
493 q.put(1)
494 self.assertEqual(q.qsize(), 1)
495 q.put(5)
496 self.assertEqual(q.qsize(), 2)
497 q.get()
498 self.assertEqual(q.qsize(), 1)
499 q.get()
500 self.assertEqual(q.qsize(), 0)
501
502 def _test_task_done(self, q):
503 for obj in iter(q.get, None):
504 time.sleep(DELTA)
505 q.task_done()
506
507 def test_task_done(self):
508 queue = self.JoinableQueue()
509
510 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
511 return
512
513 workers = [self.Process(target=self._test_task_done, args=(queue,))
514 for i in xrange(4)]
515
516 for p in workers:
517 p.start()
518
519 for i in xrange(10):
520 queue.put(i)
521
522 queue.join()
523
524 for p in workers:
525 queue.put(None)
526
527 for p in workers:
528 p.join()
529
530#
531#
532#
533
534class _TestLock(BaseTestCase):
535
536 def test_lock(self):
537 lock = self.Lock()
538 self.assertEqual(lock.acquire(), True)
539 self.assertEqual(lock.acquire(False), False)
540 self.assertEqual(lock.release(), None)
541 self.assertRaises((ValueError, threading.ThreadError), lock.release)
542
543 def test_rlock(self):
544 lock = self.RLock()
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.acquire(), True)
547 self.assertEqual(lock.acquire(), True)
548 self.assertEqual(lock.release(), None)
549 self.assertEqual(lock.release(), None)
550 self.assertEqual(lock.release(), None)
551 self.assertRaises((AssertionError, RuntimeError), lock.release)
552
Jesse Nollerf0d21c72009-03-30 23:38:36 +0000553 def test_lock_context(self):
554 with self.Lock():
555 pass
556
Benjamin Petersondfd79492008-06-13 19:13:39 +0000557
558class _TestSemaphore(BaseTestCase):
559
560 def _test_semaphore(self, sem):
561 self.assertReturnsIfImplemented(2, get_value, sem)
562 self.assertEqual(sem.acquire(), True)
563 self.assertReturnsIfImplemented(1, get_value, sem)
564 self.assertEqual(sem.acquire(), True)
565 self.assertReturnsIfImplemented(0, get_value, sem)
566 self.assertEqual(sem.acquire(False), False)
567 self.assertReturnsIfImplemented(0, get_value, sem)
568 self.assertEqual(sem.release(), None)
569 self.assertReturnsIfImplemented(1, get_value, sem)
570 self.assertEqual(sem.release(), None)
571 self.assertReturnsIfImplemented(2, get_value, sem)
572
573 def test_semaphore(self):
574 sem = self.Semaphore(2)
575 self._test_semaphore(sem)
576 self.assertEqual(sem.release(), None)
577 self.assertReturnsIfImplemented(3, get_value, sem)
578 self.assertEqual(sem.release(), None)
579 self.assertReturnsIfImplemented(4, get_value, sem)
580
581 def test_bounded_semaphore(self):
582 sem = self.BoundedSemaphore(2)
583 self._test_semaphore(sem)
584 # Currently fails on OS/X
585 #if HAVE_GETVALUE:
586 # self.assertRaises(ValueError, sem.release)
587 # self.assertReturnsIfImplemented(2, get_value, sem)
588
589 def test_timeout(self):
590 if self.TYPE != 'processes':
591 return
592
593 sem = self.Semaphore(0)
594 acquire = TimingWrapper(sem.acquire)
595
596 self.assertEqual(acquire(False), False)
597 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
598
599 self.assertEqual(acquire(False, None), False)
600 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
601
602 self.assertEqual(acquire(False, TIMEOUT1), False)
603 self.assertTimingAlmostEqual(acquire.elapsed, 0)
604
605 self.assertEqual(acquire(True, TIMEOUT2), False)
606 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
607
608 self.assertEqual(acquire(timeout=TIMEOUT3), False)
609 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
610
611
612class _TestCondition(BaseTestCase):
613
614 def f(self, cond, sleeping, woken, timeout=None):
615 cond.acquire()
616 sleeping.release()
617 cond.wait(timeout)
618 woken.release()
619 cond.release()
620
621 def check_invariant(self, cond):
622 # this is only supposed to succeed when there are no sleepers
623 if self.TYPE == 'processes':
624 try:
625 sleepers = (cond._sleeping_count.get_value() -
626 cond._woken_count.get_value())
627 self.assertEqual(sleepers, 0)
628 self.assertEqual(cond._wait_semaphore.get_value(), 0)
629 except NotImplementedError:
630 pass
631
632 def test_notify(self):
633 cond = self.Condition()
634 sleeping = self.Semaphore(0)
635 woken = self.Semaphore(0)
636
637 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000638 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000639 p.start()
640
641 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000642 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000643 p.start()
644
645 # wait for both children to start sleeping
646 sleeping.acquire()
647 sleeping.acquire()
648
649 # check no process/thread has woken up
650 time.sleep(DELTA)
651 self.assertReturnsIfImplemented(0, get_value, woken)
652
653 # wake up one process/thread
654 cond.acquire()
655 cond.notify()
656 cond.release()
657
658 # check one process/thread has woken up
659 time.sleep(DELTA)
660 self.assertReturnsIfImplemented(1, get_value, woken)
661
662 # wake up another
663 cond.acquire()
664 cond.notify()
665 cond.release()
666
667 # check other has woken up
668 time.sleep(DELTA)
669 self.assertReturnsIfImplemented(2, get_value, woken)
670
671 # check state is not mucked up
672 self.check_invariant(cond)
673 p.join()
674
675 def test_notify_all(self):
676 cond = self.Condition()
677 sleeping = self.Semaphore(0)
678 woken = self.Semaphore(0)
679
680 # start some threads/processes which will timeout
681 for i in range(3):
682 p = self.Process(target=self.f,
683 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000684 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000685 p.start()
686
687 t = threading.Thread(target=self.f,
688 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000689 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000690 t.start()
691
692 # wait for them all to sleep
693 for i in xrange(6):
694 sleeping.acquire()
695
696 # check they have all timed out
697 for i in xrange(6):
698 woken.acquire()
699 self.assertReturnsIfImplemented(0, get_value, woken)
700
701 # check state is not mucked up
702 self.check_invariant(cond)
703
704 # start some more threads/processes
705 for i in range(3):
706 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000707 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000708 p.start()
709
710 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000711 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000712 t.start()
713
714 # wait for them to all sleep
715 for i in xrange(6):
716 sleeping.acquire()
717
718 # check no process/thread has woken up
719 time.sleep(DELTA)
720 self.assertReturnsIfImplemented(0, get_value, woken)
721
722 # wake them all up
723 cond.acquire()
724 cond.notify_all()
725 cond.release()
726
727 # check they have all woken
728 time.sleep(DELTA)
729 self.assertReturnsIfImplemented(6, get_value, woken)
730
731 # check state is not mucked up
732 self.check_invariant(cond)
733
734 def test_timeout(self):
735 cond = self.Condition()
736 wait = TimingWrapper(cond.wait)
737 cond.acquire()
738 res = wait(TIMEOUT1)
739 cond.release()
740 self.assertEqual(res, None)
741 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
742
743
744class _TestEvent(BaseTestCase):
745
746 def _test_event(self, event):
747 time.sleep(TIMEOUT2)
748 event.set()
749
750 def test_event(self):
751 event = self.Event()
752 wait = TimingWrapper(event.wait)
753
754 # Removed temporaily, due to API shear, this does not
755 # work with threading._Event objects. is_set == isSet
756 #self.assertEqual(event.is_set(), False)
757
758 self.assertEqual(wait(0.0), None)
759 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
760 self.assertEqual(wait(TIMEOUT1), None)
761 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
762
763 event.set()
764
765 # See note above on the API differences
766 # self.assertEqual(event.is_set(), True)
767 self.assertEqual(wait(), None)
768 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
769 self.assertEqual(wait(TIMEOUT1), None)
770 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
771 # self.assertEqual(event.is_set(), True)
772
773 event.clear()
774
775 #self.assertEqual(event.is_set(), False)
776
777 self.Process(target=self._test_event, args=(event,)).start()
778 self.assertEqual(wait(), None)
779
780#
781#
782#
783
784class _TestValue(BaseTestCase):
785
786 codes_values = [
787 ('i', 4343, 24234),
788 ('d', 3.625, -4.25),
789 ('h', -232, 234),
790 ('c', latin('x'), latin('y'))
791 ]
792
793 def _test(self, values):
794 for sv, cv in zip(values, self.codes_values):
795 sv.value = cv[2]
796
797
798 def test_value(self, raw=False):
799 if self.TYPE != 'processes':
800 return
801
802 if raw:
803 values = [self.RawValue(code, value)
804 for code, value, _ in self.codes_values]
805 else:
806 values = [self.Value(code, value)
807 for code, value, _ in self.codes_values]
808
809 for sv, cv in zip(values, self.codes_values):
810 self.assertEqual(sv.value, cv[1])
811
812 proc = self.Process(target=self._test, args=(values,))
813 proc.start()
814 proc.join()
815
816 for sv, cv in zip(values, self.codes_values):
817 self.assertEqual(sv.value, cv[2])
818
819 def test_rawvalue(self):
820 self.test_value(raw=True)
821
822 def test_getobj_getlock(self):
823 if self.TYPE != 'processes':
824 return
825
826 val1 = self.Value('i', 5)
827 lock1 = val1.get_lock()
828 obj1 = val1.get_obj()
829
830 val2 = self.Value('i', 5, lock=None)
831 lock2 = val2.get_lock()
832 obj2 = val2.get_obj()
833
834 lock = self.Lock()
835 val3 = self.Value('i', 5, lock=lock)
836 lock3 = val3.get_lock()
837 obj3 = val3.get_obj()
838 self.assertEqual(lock, lock3)
839
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000840 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000841 self.assertFalse(hasattr(arr4, 'get_lock'))
842 self.assertFalse(hasattr(arr4, 'get_obj'))
843
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000844 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
845
846 arr5 = self.RawValue('i', 5)
847 self.assertFalse(hasattr(arr5, 'get_lock'))
848 self.assertFalse(hasattr(arr5, 'get_obj'))
849
Benjamin Petersondfd79492008-06-13 19:13:39 +0000850
851class _TestArray(BaseTestCase):
852
853 def f(self, seq):
854 for i in range(1, len(seq)):
855 seq[i] += seq[i-1]
856
857 def test_array(self, raw=False):
858 if self.TYPE != 'processes':
859 return
860
861 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
862 if raw:
863 arr = self.RawArray('i', seq)
864 else:
865 arr = self.Array('i', seq)
866
867 self.assertEqual(len(arr), len(seq))
868 self.assertEqual(arr[3], seq[3])
869 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
870
871 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
872
873 self.assertEqual(list(arr[:]), seq)
874
875 self.f(seq)
876
877 p = self.Process(target=self.f, args=(arr,))
878 p.start()
879 p.join()
880
881 self.assertEqual(list(arr[:]), seq)
882
883 def test_rawarray(self):
884 self.test_array(raw=True)
885
886 def test_getobj_getlock_obj(self):
887 if self.TYPE != 'processes':
888 return
889
890 arr1 = self.Array('i', range(10))
891 lock1 = arr1.get_lock()
892 obj1 = arr1.get_obj()
893
894 arr2 = self.Array('i', range(10), lock=None)
895 lock2 = arr2.get_lock()
896 obj2 = arr2.get_obj()
897
898 lock = self.Lock()
899 arr3 = self.Array('i', range(10), lock=lock)
900 lock3 = arr3.get_lock()
901 obj3 = arr3.get_obj()
902 self.assertEqual(lock, lock3)
903
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000904 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000905 self.assertFalse(hasattr(arr4, 'get_lock'))
906 self.assertFalse(hasattr(arr4, 'get_obj'))
Benjamin Petersonafd7eaa2009-01-18 04:01:18 +0000907 self.assertRaises(AttributeError,
908 self.Array, 'i', range(10), lock='notalock')
909
910 arr5 = self.RawArray('i', range(10))
911 self.assertFalse(hasattr(arr5, 'get_lock'))
912 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000913
914#
915#
916#
917
918class _TestContainers(BaseTestCase):
919
920 ALLOWED_TYPES = ('manager',)
921
922 def test_list(self):
923 a = self.list(range(10))
924 self.assertEqual(a[:], range(10))
925
926 b = self.list()
927 self.assertEqual(b[:], [])
928
929 b.extend(range(5))
930 self.assertEqual(b[:], range(5))
931
932 self.assertEqual(b[2], 2)
933 self.assertEqual(b[2:10], [2,3,4])
934
935 b *= 2
936 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
937
938 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
939
940 self.assertEqual(a[:], range(10))
941
942 d = [a, b]
943 e = self.list(d)
944 self.assertEqual(
945 e[:],
946 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
947 )
948
949 f = self.list([a])
950 a.append('hello')
951 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
952
953 def test_dict(self):
954 d = self.dict()
955 indices = range(65, 70)
956 for i in indices:
957 d[i] = chr(i)
958 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
959 self.assertEqual(sorted(d.keys()), indices)
960 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
961 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
962
963 def test_namespace(self):
964 n = self.Namespace()
965 n.name = 'Bob'
966 n.job = 'Builder'
967 n._hidden = 'hidden'
968 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
969 del n.job
970 self.assertEqual(str(n), "Namespace(name='Bob')")
971 self.assertTrue(hasattr(n, 'name'))
972 self.assertTrue(not hasattr(n, 'job'))
973
974#
975#
976#
977
978def sqr(x, wait=0.0):
979 time.sleep(wait)
980 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000981class _TestPool(BaseTestCase):
982
983 def test_apply(self):
984 papply = self.pool.apply
985 self.assertEqual(papply(sqr, (5,)), sqr(5))
986 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
987
988 def test_map(self):
989 pmap = self.pool.map
990 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
991 self.assertEqual(pmap(sqr, range(100), chunksize=20),
992 map(sqr, range(100)))
993
994 def test_async(self):
995 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
996 get = TimingWrapper(res.get)
997 self.assertEqual(get(), 49)
998 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
999
1000 def test_async_timeout(self):
1001 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1002 get = TimingWrapper(res.get)
1003 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1004 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1005
1006 def test_imap(self):
1007 it = self.pool.imap(sqr, range(10))
1008 self.assertEqual(list(it), map(sqr, range(10)))
1009
1010 it = self.pool.imap(sqr, range(10))
1011 for i in range(10):
1012 self.assertEqual(it.next(), i*i)
1013 self.assertRaises(StopIteration, it.next)
1014
1015 it = self.pool.imap(sqr, range(1000), chunksize=100)
1016 for i in range(1000):
1017 self.assertEqual(it.next(), i*i)
1018 self.assertRaises(StopIteration, it.next)
1019
1020 def test_imap_unordered(self):
1021 it = self.pool.imap_unordered(sqr, range(1000))
1022 self.assertEqual(sorted(it), map(sqr, range(1000)))
1023
1024 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1025 self.assertEqual(sorted(it), map(sqr, range(1000)))
1026
1027 def test_make_pool(self):
1028 p = multiprocessing.Pool(3)
1029 self.assertEqual(3, len(p._pool))
1030 p.close()
1031 p.join()
1032
1033 def test_terminate(self):
1034 if self.TYPE == 'manager':
1035 # On Unix a forked process increfs each shared object to
1036 # which its parent process held a reference. If the
1037 # forked process gets terminated then there is likely to
1038 # be a reference leak. So to prevent
1039 # _TestZZZNumberOfObjects from failing we skip this test
1040 # when using a manager.
1041 return
1042
1043 result = self.pool.map_async(
1044 time.sleep, [0.1 for i in range(10000)], chunksize=1
1045 )
1046 self.pool.terminate()
1047 join = TimingWrapper(self.pool.join)
1048 join()
1049 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001050#
1051# Test that manager has expected number of shared objects left
1052#
1053
1054class _TestZZZNumberOfObjects(BaseTestCase):
1055 # Because test cases are sorted alphabetically, this one will get
1056 # run after all the other tests for the manager. It tests that
1057 # there have been no "reference leaks" for the manager's shared
1058 # objects. Note the comment in _TestPool.test_terminate().
1059 ALLOWED_TYPES = ('manager',)
1060
1061 def test_number_of_objects(self):
1062 EXPECTED_NUMBER = 1 # the pool object is still alive
1063 multiprocessing.active_children() # discard dead process objs
1064 gc.collect() # do garbage collection
1065 refs = self.manager._number_of_objects()
1066 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001067 print self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001068
1069 self.assertEqual(refs, EXPECTED_NUMBER)
1070
1071#
1072# Test of creating a customized manager class
1073#
1074
1075from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1076
1077class FooBar(object):
1078 def f(self):
1079 return 'f()'
1080 def g(self):
1081 raise ValueError
1082 def _h(self):
1083 return '_h()'
1084
1085def baz():
1086 for i in xrange(10):
1087 yield i*i
1088
1089class IteratorProxy(BaseProxy):
1090 _exposed_ = ('next', '__next__')
1091 def __iter__(self):
1092 return self
1093 def next(self):
1094 return self._callmethod('next')
1095 def __next__(self):
1096 return self._callmethod('__next__')
1097
1098class MyManager(BaseManager):
1099 pass
1100
1101MyManager.register('Foo', callable=FooBar)
1102MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1103MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1104
1105
1106class _TestMyManager(BaseTestCase):
1107
1108 ALLOWED_TYPES = ('manager',)
1109
1110 def test_mymanager(self):
1111 manager = MyManager()
1112 manager.start()
1113
1114 foo = manager.Foo()
1115 bar = manager.Bar()
1116 baz = manager.baz()
1117
1118 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1119 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1120
1121 self.assertEqual(foo_methods, ['f', 'g'])
1122 self.assertEqual(bar_methods, ['f', '_h'])
1123
1124 self.assertEqual(foo.f(), 'f()')
1125 self.assertRaises(ValueError, foo.g)
1126 self.assertEqual(foo._callmethod('f'), 'f()')
1127 self.assertRaises(RemoteError, foo._callmethod, '_h')
1128
1129 self.assertEqual(bar.f(), 'f()')
1130 self.assertEqual(bar._h(), '_h()')
1131 self.assertEqual(bar._callmethod('f'), 'f()')
1132 self.assertEqual(bar._callmethod('_h'), '_h()')
1133
1134 self.assertEqual(list(baz), [i*i for i in range(10)])
1135
1136 manager.shutdown()
1137
1138#
1139# Test of connecting to a remote server and using xmlrpclib for serialization
1140#
1141
1142_queue = Queue.Queue()
1143def get_queue():
1144 return _queue
1145
1146class QueueManager(BaseManager):
1147 '''manager class used by server process'''
1148QueueManager.register('get_queue', callable=get_queue)
1149
1150class QueueManager2(BaseManager):
1151 '''manager class which specifies the same interface as QueueManager'''
1152QueueManager2.register('get_queue')
1153
1154
1155SERIALIZER = 'xmlrpclib'
1156
1157class _TestRemoteManager(BaseTestCase):
1158
1159 ALLOWED_TYPES = ('manager',)
1160
1161 def _putter(self, address, authkey):
1162 manager = QueueManager2(
1163 address=address, authkey=authkey, serializer=SERIALIZER
1164 )
1165 manager.connect()
1166 queue = manager.get_queue()
1167 queue.put(('hello world', None, True, 2.25))
1168
1169 def test_remote(self):
1170 authkey = os.urandom(32)
1171
1172 manager = QueueManager(
1173 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1174 )
1175 manager.start()
1176
1177 p = self.Process(target=self._putter, args=(manager.address, authkey))
1178 p.start()
1179
1180 manager2 = QueueManager2(
1181 address=manager.address, authkey=authkey, serializer=SERIALIZER
1182 )
1183 manager2.connect()
1184 queue = manager2.get_queue()
1185
1186 # Note that xmlrpclib will deserialize object as a list not a tuple
1187 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1188
1189 # Because we are using xmlrpclib for serialization instead of
1190 # pickle this will cause a serialization error.
1191 self.assertRaises(Exception, queue.put, time.sleep)
1192
1193 # Make queue finalizer run before the server is stopped
1194 del queue
1195 manager.shutdown()
1196
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001197class _TestManagerRestart(BaseTestCase):
1198
1199 def _putter(self, address, authkey):
1200 manager = QueueManager(
1201 address=address, authkey=authkey, serializer=SERIALIZER)
1202 manager.connect()
1203 queue = manager.get_queue()
1204 queue.put('hello world')
1205
1206 def test_rapid_restart(self):
1207 authkey = os.urandom(32)
1208 manager = QueueManager(
Antoine Pitrou838c1ee2010-04-30 23:10:44 +00001209 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1210 addr = manager.get_server().address
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001211 manager.start()
1212
1213 p = self.Process(target=self._putter, args=(manager.address, authkey))
1214 p.start()
1215 queue = manager.get_queue()
1216 self.assertEqual(queue.get(), 'hello world')
Jesse Nollerd5ac4442009-03-30 21:57:36 +00001217 del queue
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001218 manager.shutdown()
1219 manager = QueueManager(
Antoine Pitrou838c1ee2010-04-30 23:10:44 +00001220 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001221 manager.start()
Jesse Nollerd5ac4442009-03-30 21:57:36 +00001222 manager.shutdown()
Jesse Nollerb48cfa62009-03-30 16:19:10 +00001223
Benjamin Petersondfd79492008-06-13 19:13:39 +00001224#
1225#
1226#
1227
1228SENTINEL = latin('')
1229
1230class _TestConnection(BaseTestCase):
1231
1232 ALLOWED_TYPES = ('processes', 'threads')
1233
1234 def _echo(self, conn):
1235 for msg in iter(conn.recv_bytes, SENTINEL):
1236 conn.send_bytes(msg)
1237 conn.close()
1238
1239 def test_connection(self):
1240 conn, child_conn = self.Pipe()
1241
1242 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001243 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001244 p.start()
1245
1246 seq = [1, 2.25, None]
1247 msg = latin('hello world')
1248 longmsg = msg * 10
1249 arr = array.array('i', range(4))
1250
1251 if self.TYPE == 'processes':
1252 self.assertEqual(type(conn.fileno()), int)
1253
1254 self.assertEqual(conn.send(seq), None)
1255 self.assertEqual(conn.recv(), seq)
1256
1257 self.assertEqual(conn.send_bytes(msg), None)
1258 self.assertEqual(conn.recv_bytes(), msg)
1259
1260 if self.TYPE == 'processes':
1261 buffer = array.array('i', [0]*10)
1262 expected = list(arr) + [0] * (10 - len(arr))
1263 self.assertEqual(conn.send_bytes(arr), None)
1264 self.assertEqual(conn.recv_bytes_into(buffer),
1265 len(arr) * buffer.itemsize)
1266 self.assertEqual(list(buffer), expected)
1267
1268 buffer = array.array('i', [0]*10)
1269 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1270 self.assertEqual(conn.send_bytes(arr), None)
1271 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1272 len(arr) * buffer.itemsize)
1273 self.assertEqual(list(buffer), expected)
1274
1275 buffer = bytearray(latin(' ' * 40))
1276 self.assertEqual(conn.send_bytes(longmsg), None)
1277 try:
1278 res = conn.recv_bytes_into(buffer)
1279 except multiprocessing.BufferTooShort, e:
1280 self.assertEqual(e.args, (longmsg,))
1281 else:
1282 self.fail('expected BufferTooShort, got %s' % res)
1283
1284 poll = TimingWrapper(conn.poll)
1285
1286 self.assertEqual(poll(), False)
1287 self.assertTimingAlmostEqual(poll.elapsed, 0)
1288
1289 self.assertEqual(poll(TIMEOUT1), False)
1290 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1291
1292 conn.send(None)
1293
1294 self.assertEqual(poll(TIMEOUT1), True)
1295 self.assertTimingAlmostEqual(poll.elapsed, 0)
1296
1297 self.assertEqual(conn.recv(), None)
1298
1299 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1300 conn.send_bytes(really_big_msg)
1301 self.assertEqual(conn.recv_bytes(), really_big_msg)
1302
1303 conn.send_bytes(SENTINEL) # tell child to quit
1304 child_conn.close()
1305
1306 if self.TYPE == 'processes':
1307 self.assertEqual(conn.readable, True)
1308 self.assertEqual(conn.writable, True)
1309 self.assertRaises(EOFError, conn.recv)
1310 self.assertRaises(EOFError, conn.recv_bytes)
1311
1312 p.join()
1313
1314 def test_duplex_false(self):
1315 reader, writer = self.Pipe(duplex=False)
1316 self.assertEqual(writer.send(1), None)
1317 self.assertEqual(reader.recv(), 1)
1318 if self.TYPE == 'processes':
1319 self.assertEqual(reader.readable, True)
1320 self.assertEqual(reader.writable, False)
1321 self.assertEqual(writer.readable, False)
1322 self.assertEqual(writer.writable, True)
1323 self.assertRaises(IOError, reader.send, 2)
1324 self.assertRaises(IOError, writer.recv)
1325 self.assertRaises(IOError, writer.poll)
1326
1327 def test_spawn_close(self):
1328 # We test that a pipe connection can be closed by parent
1329 # process immediately after child is spawned. On Windows this
1330 # would have sometimes failed on old versions because
1331 # child_conn would be closed before the child got a chance to
1332 # duplicate it.
1333 conn, child_conn = self.Pipe()
1334
1335 p = self.Process(target=self._echo, args=(child_conn,))
1336 p.start()
1337 child_conn.close() # this might complete before child initializes
1338
1339 msg = latin('hello')
1340 conn.send_bytes(msg)
1341 self.assertEqual(conn.recv_bytes(), msg)
1342
1343 conn.send_bytes(SENTINEL)
1344 conn.close()
1345 p.join()
1346
1347 def test_sendbytes(self):
1348 if self.TYPE != 'processes':
1349 return
1350
1351 msg = latin('abcdefghijklmnopqrstuvwxyz')
1352 a, b = self.Pipe()
1353
1354 a.send_bytes(msg)
1355 self.assertEqual(b.recv_bytes(), msg)
1356
1357 a.send_bytes(msg, 5)
1358 self.assertEqual(b.recv_bytes(), msg[5:])
1359
1360 a.send_bytes(msg, 7, 8)
1361 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1362
1363 a.send_bytes(msg, 26)
1364 self.assertEqual(b.recv_bytes(), latin(''))
1365
1366 a.send_bytes(msg, 26, 0)
1367 self.assertEqual(b.recv_bytes(), latin(''))
1368
1369 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1370
1371 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1372
1373 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1374
1375 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1376
1377 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1378
Benjamin Petersondfd79492008-06-13 19:13:39 +00001379class _TestListenerClient(BaseTestCase):
1380
1381 ALLOWED_TYPES = ('processes', 'threads')
1382
1383 def _test(self, address):
1384 conn = self.connection.Client(address)
1385 conn.send('hello')
1386 conn.close()
1387
1388 def test_listener_client(self):
1389 for family in self.connection.families:
1390 l = self.connection.Listener(family=family)
1391 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001392 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001393 p.start()
1394 conn = l.accept()
1395 self.assertEqual(conn.recv(), 'hello')
1396 p.join()
1397 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001398#
1399# Test of sending connection and socket objects between processes
1400#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001401"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001402class _TestPicklingConnections(BaseTestCase):
1403
1404 ALLOWED_TYPES = ('processes',)
1405
1406 def _listener(self, conn, families):
1407 for fam in families:
1408 l = self.connection.Listener(family=fam)
1409 conn.send(l.address)
1410 new_conn = l.accept()
1411 conn.send(new_conn)
1412
1413 if self.TYPE == 'processes':
1414 l = socket.socket()
1415 l.bind(('localhost', 0))
1416 conn.send(l.getsockname())
1417 l.listen(1)
1418 new_conn, addr = l.accept()
1419 conn.send(new_conn)
1420
1421 conn.recv()
1422
1423 def _remote(self, conn):
1424 for (address, msg) in iter(conn.recv, None):
1425 client = self.connection.Client(address)
1426 client.send(msg.upper())
1427 client.close()
1428
1429 if self.TYPE == 'processes':
1430 address, msg = conn.recv()
1431 client = socket.socket()
1432 client.connect(address)
1433 client.sendall(msg.upper())
1434 client.close()
1435
1436 conn.close()
1437
1438 def test_pickling(self):
1439 try:
1440 multiprocessing.allow_connection_pickling()
1441 except ImportError:
1442 return
1443
1444 families = self.connection.families
1445
1446 lconn, lconn0 = self.Pipe()
1447 lp = self.Process(target=self._listener, args=(lconn0, families))
1448 lp.start()
1449 lconn0.close()
1450
1451 rconn, rconn0 = self.Pipe()
1452 rp = self.Process(target=self._remote, args=(rconn0,))
1453 rp.start()
1454 rconn0.close()
1455
1456 for fam in families:
1457 msg = ('This connection uses family %s' % fam).encode('ascii')
1458 address = lconn.recv()
1459 rconn.send((address, msg))
1460 new_conn = lconn.recv()
1461 self.assertEqual(new_conn.recv(), msg.upper())
1462
1463 rconn.send(None)
1464
1465 if self.TYPE == 'processes':
1466 msg = latin('This connection uses a normal socket')
1467 address = lconn.recv()
1468 rconn.send((address, msg))
1469 if hasattr(socket, 'fromfd'):
1470 new_conn = lconn.recv()
1471 self.assertEqual(new_conn.recv(100), msg.upper())
1472 else:
1473 # XXX On Windows with Py2.6 need to backport fromfd()
1474 discard = lconn.recv_bytes()
1475
1476 lconn.send(None)
1477
1478 rconn.close()
1479 lconn.close()
1480
1481 lp.join()
1482 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001483"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001484#
1485#
1486#
1487
1488class _TestHeap(BaseTestCase):
1489
1490 ALLOWED_TYPES = ('processes',)
1491
1492 def test_heap(self):
1493 iterations = 5000
1494 maxblocks = 50
1495 blocks = []
1496
1497 # create and destroy lots of blocks of different sizes
1498 for i in xrange(iterations):
1499 size = int(random.lognormvariate(0, 1) * 1000)
1500 b = multiprocessing.heap.BufferWrapper(size)
1501 blocks.append(b)
1502 if len(blocks) > maxblocks:
1503 i = random.randrange(maxblocks)
1504 del blocks[i]
1505
1506 # get the heap object
1507 heap = multiprocessing.heap.BufferWrapper._heap
1508
1509 # verify the state of the heap
1510 all = []
1511 occupied = 0
1512 for L in heap._len_to_seq.values():
1513 for arena, start, stop in L:
1514 all.append((heap._arenas.index(arena), start, stop,
1515 stop-start, 'free'))
1516 for arena, start, stop in heap._allocated_blocks:
1517 all.append((heap._arenas.index(arena), start, stop,
1518 stop-start, 'occupied'))
1519 occupied += (stop-start)
1520
1521 all.sort()
1522
1523 for i in range(len(all)-1):
1524 (arena, start, stop) = all[i][:3]
1525 (narena, nstart, nstop) = all[i+1][:3]
1526 self.assertTrue((arena != narena and nstart == 0) or
1527 (stop == nstart))
1528
1529#
1530#
1531#
1532
1533try:
1534 from ctypes import Structure, Value, copy, c_int, c_double
1535except ImportError:
1536 Structure = object
1537 c_int = c_double = None
1538
1539class _Foo(Structure):
1540 _fields_ = [
1541 ('x', c_int),
1542 ('y', c_double)
1543 ]
1544
1545class _TestSharedCTypes(BaseTestCase):
1546
1547 ALLOWED_TYPES = ('processes',)
1548
1549 def _double(self, x, y, foo, arr, string):
1550 x.value *= 2
1551 y.value *= 2
1552 foo.x *= 2
1553 foo.y *= 2
1554 string.value *= 2
1555 for i in range(len(arr)):
1556 arr[i] *= 2
1557
1558 def test_sharedctypes(self, lock=False):
1559 if c_int is None:
1560 return
1561
1562 x = Value('i', 7, lock=lock)
Georg Brandl708c4872010-02-07 12:01:19 +00001563 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001564 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl708c4872010-02-07 12:01:19 +00001565 arr = self.Array('d', range(10), lock=lock)
1566 string = self.Array('c', 20, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001567 string.value = 'hello'
1568
1569 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1570 p.start()
1571 p.join()
1572
1573 self.assertEqual(x.value, 14)
1574 self.assertAlmostEqual(y.value, 2.0/3.0)
1575 self.assertEqual(foo.x, 6)
1576 self.assertAlmostEqual(foo.y, 4.0)
1577 for i in range(10):
1578 self.assertAlmostEqual(arr[i], i*2)
1579 self.assertEqual(string.value, latin('hellohello'))
1580
1581 def test_synchronize(self):
1582 self.test_sharedctypes(lock=True)
1583
1584 def test_copy(self):
1585 if c_int is None:
1586 return
1587
1588 foo = _Foo(2, 5.0)
1589 bar = copy(foo)
1590 foo.x = 0
1591 foo.y = 0
1592 self.assertEqual(bar.x, 2)
1593 self.assertAlmostEqual(bar.y, 5.0)
1594
1595#
1596#
1597#
1598
1599class _TestFinalize(BaseTestCase):
1600
1601 ALLOWED_TYPES = ('processes',)
1602
1603 def _test_finalize(self, conn):
1604 class Foo(object):
1605 pass
1606
1607 a = Foo()
1608 util.Finalize(a, conn.send, args=('a',))
1609 del a # triggers callback for a
1610
1611 b = Foo()
1612 close_b = util.Finalize(b, conn.send, args=('b',))
1613 close_b() # triggers callback for b
1614 close_b() # does nothing because callback has already been called
1615 del b # does nothing because callback has already been called
1616
1617 c = Foo()
1618 util.Finalize(c, conn.send, args=('c',))
1619
1620 d10 = Foo()
1621 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1622
1623 d01 = Foo()
1624 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1625 d02 = Foo()
1626 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1627 d03 = Foo()
1628 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1629
1630 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1631
1632 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1633
1634 # call mutliprocessing's cleanup function then exit process without
1635 # garbage collecting locals
1636 util._exit_function()
1637 conn.close()
1638 os._exit(0)
1639
1640 def test_finalize(self):
1641 conn, child_conn = self.Pipe()
1642
1643 p = self.Process(target=self._test_finalize, args=(child_conn,))
1644 p.start()
1645 p.join()
1646
1647 result = [obj for obj in iter(conn.recv, 'STOP')]
1648 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1649
1650#
1651# Test that from ... import * works for each module
1652#
1653
1654class _TestImportStar(BaseTestCase):
1655
1656 ALLOWED_TYPES = ('processes',)
1657
1658 def test_import(self):
1659 modules = (
1660 'multiprocessing', 'multiprocessing.connection',
1661 'multiprocessing.heap', 'multiprocessing.managers',
1662 'multiprocessing.pool', 'multiprocessing.process',
1663 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1664 'multiprocessing.synchronize', 'multiprocessing.util'
1665 )
1666
1667 for name in modules:
1668 __import__(name)
1669 mod = sys.modules[name]
1670
1671 for attr in getattr(mod, '__all__', ()):
1672 self.assertTrue(
1673 hasattr(mod, attr),
1674 '%r does not have attribute %r' % (mod, attr)
1675 )
1676
1677#
1678# Quick test that logging works -- does not test logging output
1679#
1680
1681class _TestLogging(BaseTestCase):
1682
1683 ALLOWED_TYPES = ('processes',)
1684
1685 def test_enable_logging(self):
1686 logger = multiprocessing.get_logger()
1687 logger.setLevel(util.SUBWARNING)
1688 self.assertTrue(logger is not None)
1689 logger.debug('this will not be printed')
1690 logger.info('nor will this')
1691 logger.setLevel(LOG_LEVEL)
1692
1693 def _test_level(self, conn):
1694 logger = multiprocessing.get_logger()
1695 conn.send(logger.getEffectiveLevel())
1696
1697 def test_level(self):
1698 LEVEL1 = 32
1699 LEVEL2 = 37
1700
1701 logger = multiprocessing.get_logger()
1702 root_logger = logging.getLogger()
1703 root_level = root_logger.level
1704
1705 reader, writer = multiprocessing.Pipe(duplex=False)
1706
1707 logger.setLevel(LEVEL1)
1708 self.Process(target=self._test_level, args=(writer,)).start()
1709 self.assertEqual(LEVEL1, reader.recv())
1710
1711 logger.setLevel(logging.NOTSET)
1712 root_logger.setLevel(LEVEL2)
1713 self.Process(target=self._test_level, args=(writer,)).start()
1714 self.assertEqual(LEVEL2, reader.recv())
1715
1716 root_logger.setLevel(root_level)
1717 logger.setLevel(level=LOG_LEVEL)
1718
1719#
Jesse Nollere6bab482009-03-30 16:11:16 +00001720# Test to verify handle verification, see issue 3321
1721#
1722
1723class TestInvalidHandle(unittest.TestCase):
1724
1725 def test_invalid_handles(self):
1726 if WIN32:
1727 return
1728 conn = _multiprocessing.Connection(44977608)
1729 self.assertRaises(IOError, conn.poll)
1730 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1731#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001732# Functions used to create test cases from the base ones in this module
1733#
1734
1735def get_attributes(Source, names):
1736 d = {}
1737 for name in names:
1738 obj = getattr(Source, name)
1739 if type(obj) == type(get_attributes):
1740 obj = staticmethod(obj)
1741 d[name] = obj
1742 return d
1743
1744def create_test_cases(Mixin, type):
1745 result = {}
1746 glob = globals()
1747 Type = type[0].upper() + type[1:]
1748
1749 for name in glob.keys():
1750 if name.startswith('_Test'):
1751 base = glob[name]
1752 if type in base.ALLOWED_TYPES:
1753 newname = 'With' + Type + name[1:]
1754 class Temp(base, unittest.TestCase, Mixin):
1755 pass
1756 result[newname] = Temp
1757 Temp.__name__ = newname
1758 Temp.__module__ = Mixin.__module__
1759 return result
1760
1761#
1762# Create test cases
1763#
1764
1765class ProcessesMixin(object):
1766 TYPE = 'processes'
1767 Process = multiprocessing.Process
1768 locals().update(get_attributes(multiprocessing, (
1769 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1770 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1771 'RawArray', 'current_process', 'active_children', 'Pipe',
1772 'connection', 'JoinableQueue'
1773 )))
1774
1775testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1776globals().update(testcases_processes)
1777
1778
1779class ManagerMixin(object):
1780 TYPE = 'manager'
1781 Process = multiprocessing.Process
1782 manager = object.__new__(multiprocessing.managers.SyncManager)
1783 locals().update(get_attributes(manager, (
1784 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1785 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1786 'Namespace', 'JoinableQueue'
1787 )))
1788
1789testcases_manager = create_test_cases(ManagerMixin, type='manager')
1790globals().update(testcases_manager)
1791
1792
1793class ThreadsMixin(object):
1794 TYPE = 'threads'
1795 Process = multiprocessing.dummy.Process
1796 locals().update(get_attributes(multiprocessing.dummy, (
1797 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1798 'Condition', 'Event', 'Value', 'Array', 'current_process',
1799 'active_children', 'Pipe', 'connection', 'dict', 'list',
1800 'Namespace', 'JoinableQueue'
1801 )))
1802
1803testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1804globals().update(testcases_threads)
1805
Neal Norwitz0c519b32008-08-25 01:50:24 +00001806class OtherTest(unittest.TestCase):
1807 # TODO: add more tests for deliver/answer challenge.
1808 def test_deliver_challenge_auth_failure(self):
1809 class _FakeConnection(object):
1810 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001811 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001812 def send_bytes(self, data):
1813 pass
1814 self.assertRaises(multiprocessing.AuthenticationError,
1815 multiprocessing.connection.deliver_challenge,
1816 _FakeConnection(), b'abc')
1817
1818 def test_answer_challenge_auth_failure(self):
1819 class _FakeConnection(object):
1820 def __init__(self):
1821 self.count = 0
1822 def recv_bytes(self, size):
1823 self.count += 1
1824 if self.count == 1:
1825 return multiprocessing.connection.CHALLENGE
1826 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001827 return b'something bogus'
1828 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001829 def send_bytes(self, data):
1830 pass
1831 self.assertRaises(multiprocessing.AuthenticationError,
1832 multiprocessing.connection.answer_challenge,
1833 _FakeConnection(), b'abc')
1834
R. David Murray17438dc2009-07-21 17:02:14 +00001835#
1836# Issue 5155, 5313, 5331: Test process in processes
1837# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1838#
1839
1840def _ThisSubProcess(q):
1841 try:
1842 item = q.get(block=False)
1843 except Queue.Empty:
1844 pass
1845
1846def _TestProcess(q):
1847 queue = multiprocessing.Queue()
1848 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1849 subProc.start()
1850 subProc.join()
1851
1852def _afunc(x):
1853 return x*x
1854
1855def pool_in_process():
1856 pool = multiprocessing.Pool(processes=4)
1857 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1858
1859class _file_like(object):
1860 def __init__(self, delegate):
1861 self._delegate = delegate
1862 self._pid = None
1863
1864 @property
1865 def cache(self):
1866 pid = os.getpid()
1867 # There are no race conditions since fork keeps only the running thread
1868 if pid != self._pid:
1869 self._pid = pid
1870 self._cache = []
1871 return self._cache
1872
1873 def write(self, data):
1874 self.cache.append(data)
1875
1876 def flush(self):
1877 self._delegate.write(''.join(self.cache))
1878 self._cache = []
1879
1880class TestStdinBadfiledescriptor(unittest.TestCase):
1881
1882 def test_queue_in_process(self):
1883 queue = multiprocessing.Queue()
1884 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1885 proc.start()
1886 proc.join()
1887
1888 def test_pool_in_process(self):
1889 p = multiprocessing.Process(target=pool_in_process)
1890 p.start()
1891 p.join()
1892
1893 def test_flushing(self):
1894 sio = StringIO()
1895 flike = _file_like(sio)
1896 flike.write('foo')
1897 proc = multiprocessing.Process(target=lambda: flike.flush())
1898 flike.flush()
1899 assert sio.getvalue() == 'foo'
1900
1901testcases_other = [OtherTest, TestInvalidHandle, TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001902
Benjamin Petersondfd79492008-06-13 19:13:39 +00001903#
1904#
1905#
1906
1907def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001908 if sys.platform.startswith("linux"):
1909 try:
1910 lock = multiprocessing.RLock()
1911 except OSError:
1912 from test.test_support import TestSkipped
1913 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001914
Benjamin Petersondfd79492008-06-13 19:13:39 +00001915 if run is None:
1916 from test.test_support import run_unittest as run
1917
1918 util.get_temp_dir() # creates temp directory for use by all processes
1919
1920 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1921
Jesse Noller146b7ab2008-07-02 16:44:09 +00001922 ProcessesMixin.pool = multiprocessing.Pool(4)
1923 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1924 ManagerMixin.manager.__init__()
1925 ManagerMixin.manager.start()
1926 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001927
1928 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001929 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1930 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00001931 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1932 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00001933 )
1934
1935 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1936 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Ezio Melottia65e2af2010-08-02 19:56:05 +00001937 with test_support._check_py3k_warnings(
1938 (".+__(get|set)slice__ has been removed", DeprecationWarning)):
1939 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001940
Jesse Noller146b7ab2008-07-02 16:44:09 +00001941 ThreadsMixin.pool.terminate()
1942 ProcessesMixin.pool.terminate()
1943 ManagerMixin.pool.terminate()
1944 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001945
Jesse Noller146b7ab2008-07-02 16:44:09 +00001946 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001947
1948def main():
1949 test_main(unittest.TextTestRunner(verbosity=2).run)
1950
1951if __name__ == '__main__':
1952 main()