blob: 719a2c42a86fbfc317ddedab207d9305e4fed6d4 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
R. David Murraya44c6b32009-07-29 15:40:30 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
15import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000033
34from multiprocessing import util
35
Brian Curtin918616c2010-10-07 02:12:17 +000036try:
37 from multiprocessing.sharedctypes import Value, copy
38 HAS_SHAREDCTYPES = True
39except ImportError:
40 HAS_SHAREDCTYPES = False
41
Benjamin Petersone711caf2008-06-11 16:44:04 +000042#
43#
44#
45
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000046def latin(s):
47 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
Benjamin Petersone711caf2008-06-11 16:44:04 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
54#LOG_LEVEL = logging.WARNING
55
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller6214edd2009-01-19 16:23:53 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersone711caf2008-06-11 16:44:04 +000071#
Florent Xicluna9b0e9182010-03-28 11:42:38 +000072# Some tests require ctypes
73#
74
75try:
Florent Xiclunab4efb3d2010-08-14 18:24:40 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna9b0e9182010-03-28 11:42:38 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersone711caf2008-06-11 16:44:04 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
118#
119# Return the value of a semaphore
120#
121
122def get_value(self):
123 try:
124 return self.get_value()
125 except AttributeError:
126 try:
127 return self._Semaphore__value
128 except AttributeError:
129 try:
130 return self._value
131 except AttributeError:
132 raise NotImplementedError
133
134#
135# Testcases
136#
137
138class _TestProcess(BaseTestCase):
139
140 ALLOWED_TYPES = ('processes', 'threads')
141
142 def test_current(self):
143 if self.TYPE == 'threads':
144 return
145
146 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000147 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000148
149 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000150 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151 self.assertTrue(isinstance(authkey, bytes))
152 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000153 self.assertEqual(current.ident, os.getpid())
154 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155
156 def _test(self, q, *args, **kwds):
157 current = self.current_process()
158 q.put(args)
159 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000162 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 q.put(current.pid)
164
165 def test_process(self):
166 q = self.Queue(1)
167 e = self.Event()
168 args = (q, 1, 2)
169 kwargs = {'hello':23, 'bye':2.54}
170 name = 'SomeProcess'
171 p = self.Process(
172 target=self._test, args=args, kwargs=kwargs, name=name
173 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 current = self.current_process()
176
177 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000180 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000181 self.assertTrue(p not in self.active_children())
182 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000183 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000184
185 p.start()
186
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000187 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000188 self.assertEquals(p.is_alive(), True)
189 self.assertTrue(p in self.active_children())
190
191 self.assertEquals(q.get(), args[1:])
192 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000193 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196 self.assertEquals(q.get(), p.pid)
197
198 p.join()
199
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000200 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201 self.assertEquals(p.is_alive(), False)
202 self.assertTrue(p not in self.active_children())
203
204 def _test_terminate(self):
205 time.sleep(1000)
206
207 def test_terminate(self):
208 if self.TYPE == 'threads':
209 return
210
211 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000212 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213 p.start()
214
215 self.assertEqual(p.is_alive(), True)
216 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000217 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218
219 p.terminate()
220
221 join = TimingWrapper(p.join)
222 self.assertEqual(join(), None)
223 self.assertTimingAlmostEqual(join.elapsed, 0.0)
224
225 self.assertEqual(p.is_alive(), False)
226 self.assertTrue(p not in self.active_children())
227
228 p.join()
229
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 # XXX sometimes get p.exitcode == 0 on Windows ...
231 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232
233 def test_cpu_count(self):
234 try:
235 cpus = multiprocessing.cpu_count()
236 except NotImplementedError:
237 cpus = 1
238 self.assertTrue(type(cpus) is int)
239 self.assertTrue(cpus >= 1)
240
241 def test_active_children(self):
242 self.assertEqual(type(self.active_children()), list)
243
244 p = self.Process(target=time.sleep, args=(DELTA,))
245 self.assertTrue(p not in self.active_children())
246
247 p.start()
248 self.assertTrue(p in self.active_children())
249
250 p.join()
251 self.assertTrue(p not in self.active_children())
252
253 def _test_recursion(self, wconn, id):
254 from multiprocessing import forking
255 wconn.send(id)
256 if len(id) < 2:
257 for i in range(2):
258 p = self.Process(
259 target=self._test_recursion, args=(wconn, id+[i])
260 )
261 p.start()
262 p.join()
263
264 def test_recursion(self):
265 rconn, wconn = self.Pipe(duplex=False)
266 self._test_recursion(wconn, [])
267
268 time.sleep(DELTA)
269 result = []
270 while rconn.poll():
271 result.append(rconn.recv())
272
273 expected = [
274 [],
275 [0],
276 [0, 0],
277 [0, 1],
278 [1],
279 [1, 0],
280 [1, 1]
281 ]
282 self.assertEqual(result, expected)
283
284#
285#
286#
287
288class _UpperCaser(multiprocessing.Process):
289
290 def __init__(self):
291 multiprocessing.Process.__init__(self)
292 self.child_conn, self.parent_conn = multiprocessing.Pipe()
293
294 def run(self):
295 self.parent_conn.close()
296 for s in iter(self.child_conn.recv, None):
297 self.child_conn.send(s.upper())
298 self.child_conn.close()
299
300 def submit(self, s):
301 assert type(s) is str
302 self.parent_conn.send(s)
303 return self.parent_conn.recv()
304
305 def stop(self):
306 self.parent_conn.send(None)
307 self.parent_conn.close()
308 self.child_conn.close()
309
310class _TestSubclassingProcess(BaseTestCase):
311
312 ALLOWED_TYPES = ('processes',)
313
314 def test_subclassing(self):
315 uppercaser = _UpperCaser()
316 uppercaser.start()
317 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
318 self.assertEqual(uppercaser.submit('world'), 'WORLD')
319 uppercaser.stop()
320 uppercaser.join()
321
322#
323#
324#
325
326def queue_empty(q):
327 if hasattr(q, 'empty'):
328 return q.empty()
329 else:
330 return q.qsize() == 0
331
332def queue_full(q, maxsize):
333 if hasattr(q, 'full'):
334 return q.full()
335 else:
336 return q.qsize() == maxsize
337
338
339class _TestQueue(BaseTestCase):
340
341
342 def _test_put(self, queue, child_can_start, parent_can_continue):
343 child_can_start.wait()
344 for i in range(6):
345 queue.get()
346 parent_can_continue.set()
347
348 def test_put(self):
349 MAXSIZE = 6
350 queue = self.Queue(maxsize=MAXSIZE)
351 child_can_start = self.Event()
352 parent_can_continue = self.Event()
353
354 proc = self.Process(
355 target=self._test_put,
356 args=(queue, child_can_start, parent_can_continue)
357 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000358 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000359 proc.start()
360
361 self.assertEqual(queue_empty(queue), True)
362 self.assertEqual(queue_full(queue, MAXSIZE), False)
363
364 queue.put(1)
365 queue.put(2, True)
366 queue.put(3, True, None)
367 queue.put(4, False)
368 queue.put(5, False, None)
369 queue.put_nowait(6)
370
371 # the values may be in buffer but not yet in pipe so sleep a bit
372 time.sleep(DELTA)
373
374 self.assertEqual(queue_empty(queue), False)
375 self.assertEqual(queue_full(queue, MAXSIZE), True)
376
377 put = TimingWrapper(queue.put)
378 put_nowait = TimingWrapper(queue.put_nowait)
379
380 self.assertRaises(pyqueue.Full, put, 7, False)
381 self.assertTimingAlmostEqual(put.elapsed, 0)
382
383 self.assertRaises(pyqueue.Full, put, 7, False, None)
384 self.assertTimingAlmostEqual(put.elapsed, 0)
385
386 self.assertRaises(pyqueue.Full, put_nowait, 7)
387 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
388
389 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
390 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
391
392 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
393 self.assertTimingAlmostEqual(put.elapsed, 0)
394
395 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
396 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
397
398 child_can_start.set()
399 parent_can_continue.wait()
400
401 self.assertEqual(queue_empty(queue), True)
402 self.assertEqual(queue_full(queue, MAXSIZE), False)
403
404 proc.join()
405
406 def _test_get(self, queue, child_can_start, parent_can_continue):
407 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000408 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409 queue.put(2)
410 queue.put(3)
411 queue.put(4)
412 queue.put(5)
413 parent_can_continue.set()
414
415 def test_get(self):
416 queue = self.Queue()
417 child_can_start = self.Event()
418 parent_can_continue = self.Event()
419
420 proc = self.Process(
421 target=self._test_get,
422 args=(queue, child_can_start, parent_can_continue)
423 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000424 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 proc.start()
426
427 self.assertEqual(queue_empty(queue), True)
428
429 child_can_start.set()
430 parent_can_continue.wait()
431
432 time.sleep(DELTA)
433 self.assertEqual(queue_empty(queue), False)
434
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000435 # Hangs unexpectedly, remove for now
436 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437 self.assertEqual(queue.get(True, None), 2)
438 self.assertEqual(queue.get(True), 3)
439 self.assertEqual(queue.get(timeout=1), 4)
440 self.assertEqual(queue.get_nowait(), 5)
441
442 self.assertEqual(queue_empty(queue), True)
443
444 get = TimingWrapper(queue.get)
445 get_nowait = TimingWrapper(queue.get_nowait)
446
447 self.assertRaises(pyqueue.Empty, get, False)
448 self.assertTimingAlmostEqual(get.elapsed, 0)
449
450 self.assertRaises(pyqueue.Empty, get, False, None)
451 self.assertTimingAlmostEqual(get.elapsed, 0)
452
453 self.assertRaises(pyqueue.Empty, get_nowait)
454 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
455
456 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
457 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
458
459 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
463 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
464
465 proc.join()
466
467 def _test_fork(self, queue):
468 for i in range(10, 20):
469 queue.put(i)
470 # note that at this point the items may only be buffered, so the
471 # process cannot shutdown until the feeder thread has finished
472 # pushing items onto the pipe.
473
474 def test_fork(self):
475 # Old versions of Queue would fail to create a new feeder
476 # thread for a forked process if the original process had its
477 # own feeder thread. This test checks that this no longer
478 # happens.
479
480 queue = self.Queue()
481
482 # put items on queue so that main process starts a feeder thread
483 for i in range(10):
484 queue.put(i)
485
486 # wait to make sure thread starts before we fork a new process
487 time.sleep(DELTA)
488
489 # fork process
490 p = self.Process(target=self._test_fork, args=(queue,))
491 p.start()
492
493 # check that all expected items are in the queue
494 for i in range(20):
495 self.assertEqual(queue.get(), i)
496 self.assertRaises(pyqueue.Empty, queue.get, False)
497
498 p.join()
499
500 def test_qsize(self):
501 q = self.Queue()
502 try:
503 self.assertEqual(q.qsize(), 0)
504 except NotImplementedError:
505 return
506 q.put(1)
507 self.assertEqual(q.qsize(), 1)
508 q.put(5)
509 self.assertEqual(q.qsize(), 2)
510 q.get()
511 self.assertEqual(q.qsize(), 1)
512 q.get()
513 self.assertEqual(q.qsize(), 0)
514
515 def _test_task_done(self, q):
516 for obj in iter(q.get, None):
517 time.sleep(DELTA)
518 q.task_done()
519
520 def test_task_done(self):
521 queue = self.JoinableQueue()
522
523 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000524 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000525
526 workers = [self.Process(target=self._test_task_done, args=(queue,))
527 for i in range(4)]
528
529 for p in workers:
530 p.start()
531
532 for i in range(10):
533 queue.put(i)
534
535 queue.join()
536
537 for p in workers:
538 queue.put(None)
539
540 for p in workers:
541 p.join()
542
543#
544#
545#
546
547class _TestLock(BaseTestCase):
548
549 def test_lock(self):
550 lock = self.Lock()
551 self.assertEqual(lock.acquire(), True)
552 self.assertEqual(lock.acquire(False), False)
553 self.assertEqual(lock.release(), None)
554 self.assertRaises((ValueError, threading.ThreadError), lock.release)
555
556 def test_rlock(self):
557 lock = self.RLock()
558 self.assertEqual(lock.acquire(), True)
559 self.assertEqual(lock.acquire(), True)
560 self.assertEqual(lock.acquire(), True)
561 self.assertEqual(lock.release(), None)
562 self.assertEqual(lock.release(), None)
563 self.assertEqual(lock.release(), None)
564 self.assertRaises((AssertionError, RuntimeError), lock.release)
565
Jesse Nollerf8d00852009-03-31 03:25:07 +0000566 def test_lock_context(self):
567 with self.Lock():
568 pass
569
Benjamin Petersone711caf2008-06-11 16:44:04 +0000570
571class _TestSemaphore(BaseTestCase):
572
573 def _test_semaphore(self, sem):
574 self.assertReturnsIfImplemented(2, get_value, sem)
575 self.assertEqual(sem.acquire(), True)
576 self.assertReturnsIfImplemented(1, get_value, sem)
577 self.assertEqual(sem.acquire(), True)
578 self.assertReturnsIfImplemented(0, get_value, sem)
579 self.assertEqual(sem.acquire(False), False)
580 self.assertReturnsIfImplemented(0, get_value, sem)
581 self.assertEqual(sem.release(), None)
582 self.assertReturnsIfImplemented(1, get_value, sem)
583 self.assertEqual(sem.release(), None)
584 self.assertReturnsIfImplemented(2, get_value, sem)
585
586 def test_semaphore(self):
587 sem = self.Semaphore(2)
588 self._test_semaphore(sem)
589 self.assertEqual(sem.release(), None)
590 self.assertReturnsIfImplemented(3, get_value, sem)
591 self.assertEqual(sem.release(), None)
592 self.assertReturnsIfImplemented(4, get_value, sem)
593
594 def test_bounded_semaphore(self):
595 sem = self.BoundedSemaphore(2)
596 self._test_semaphore(sem)
597 # Currently fails on OS/X
598 #if HAVE_GETVALUE:
599 # self.assertRaises(ValueError, sem.release)
600 # self.assertReturnsIfImplemented(2, get_value, sem)
601
602 def test_timeout(self):
603 if self.TYPE != 'processes':
604 return
605
606 sem = self.Semaphore(0)
607 acquire = TimingWrapper(sem.acquire)
608
609 self.assertEqual(acquire(False), False)
610 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
611
612 self.assertEqual(acquire(False, None), False)
613 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
614
615 self.assertEqual(acquire(False, TIMEOUT1), False)
616 self.assertTimingAlmostEqual(acquire.elapsed, 0)
617
618 self.assertEqual(acquire(True, TIMEOUT2), False)
619 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
620
621 self.assertEqual(acquire(timeout=TIMEOUT3), False)
622 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
623
624
625class _TestCondition(BaseTestCase):
626
627 def f(self, cond, sleeping, woken, timeout=None):
628 cond.acquire()
629 sleeping.release()
630 cond.wait(timeout)
631 woken.release()
632 cond.release()
633
634 def check_invariant(self, cond):
635 # this is only supposed to succeed when there are no sleepers
636 if self.TYPE == 'processes':
637 try:
638 sleepers = (cond._sleeping_count.get_value() -
639 cond._woken_count.get_value())
640 self.assertEqual(sleepers, 0)
641 self.assertEqual(cond._wait_semaphore.get_value(), 0)
642 except NotImplementedError:
643 pass
644
645 def test_notify(self):
646 cond = self.Condition()
647 sleeping = self.Semaphore(0)
648 woken = self.Semaphore(0)
649
650 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000651 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000652 p.start()
653
654 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000655 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000656 p.start()
657
658 # wait for both children to start sleeping
659 sleeping.acquire()
660 sleeping.acquire()
661
662 # check no process/thread has woken up
663 time.sleep(DELTA)
664 self.assertReturnsIfImplemented(0, get_value, woken)
665
666 # wake up one process/thread
667 cond.acquire()
668 cond.notify()
669 cond.release()
670
671 # check one process/thread has woken up
672 time.sleep(DELTA)
673 self.assertReturnsIfImplemented(1, get_value, woken)
674
675 # wake up another
676 cond.acquire()
677 cond.notify()
678 cond.release()
679
680 # check other has woken up
681 time.sleep(DELTA)
682 self.assertReturnsIfImplemented(2, get_value, woken)
683
684 # check state is not mucked up
685 self.check_invariant(cond)
686 p.join()
687
688 def test_notify_all(self):
689 cond = self.Condition()
690 sleeping = self.Semaphore(0)
691 woken = self.Semaphore(0)
692
693 # start some threads/processes which will timeout
694 for i in range(3):
695 p = self.Process(target=self.f,
696 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000697 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000698 p.start()
699
700 t = threading.Thread(target=self.f,
701 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000702 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000703 t.start()
704
705 # wait for them all to sleep
706 for i in range(6):
707 sleeping.acquire()
708
709 # check they have all timed out
710 for i in range(6):
711 woken.acquire()
712 self.assertReturnsIfImplemented(0, get_value, woken)
713
714 # check state is not mucked up
715 self.check_invariant(cond)
716
717 # start some more threads/processes
718 for i in range(3):
719 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000720 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721 p.start()
722
723 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000724 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000725 t.start()
726
727 # wait for them to all sleep
728 for i in range(6):
729 sleeping.acquire()
730
731 # check no process/thread has woken up
732 time.sleep(DELTA)
733 self.assertReturnsIfImplemented(0, get_value, woken)
734
735 # wake them all up
736 cond.acquire()
737 cond.notify_all()
738 cond.release()
739
740 # check they have all woken
741 time.sleep(DELTA)
742 self.assertReturnsIfImplemented(6, get_value, woken)
743
744 # check state is not mucked up
745 self.check_invariant(cond)
746
747 def test_timeout(self):
748 cond = self.Condition()
749 wait = TimingWrapper(cond.wait)
750 cond.acquire()
751 res = wait(TIMEOUT1)
752 cond.release()
753 self.assertEqual(res, None)
754 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
755
756
757class _TestEvent(BaseTestCase):
758
759 def _test_event(self, event):
760 time.sleep(TIMEOUT2)
761 event.set()
762
763 def test_event(self):
764 event = self.Event()
765 wait = TimingWrapper(event.wait)
766
767 # Removed temporaily, due to API shear, this does not
768 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000769 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000770
Benjamin Peterson965ce872009-04-05 21:24:58 +0000771 # Removed, threading.Event.wait() will return the value of the __flag
772 # instead of None. API Shear with the semaphore backed mp.Event
773 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000774 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000775 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
777
778 event.set()
779
780 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000781 self.assertEqual(event.is_set(), True)
782 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000783 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000784 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
786 # self.assertEqual(event.is_set(), True)
787
788 event.clear()
789
790 #self.assertEqual(event.is_set(), False)
791
792 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000793 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000794
795#
796#
797#
798
Brian Curtin918616c2010-10-07 02:12:17 +0000799@unittest.skipUnless(HAS_SHAREDCTYPES,
800 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000801class _TestValue(BaseTestCase):
802
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000803 ALLOWED_TYPES = ('processes',)
804
Benjamin Petersone711caf2008-06-11 16:44:04 +0000805 codes_values = [
806 ('i', 4343, 24234),
807 ('d', 3.625, -4.25),
808 ('h', -232, 234),
809 ('c', latin('x'), latin('y'))
810 ]
811
812 def _test(self, values):
813 for sv, cv in zip(values, self.codes_values):
814 sv.value = cv[2]
815
816
817 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000818 if raw:
819 values = [self.RawValue(code, value)
820 for code, value, _ in self.codes_values]
821 else:
822 values = [self.Value(code, value)
823 for code, value, _ in self.codes_values]
824
825 for sv, cv in zip(values, self.codes_values):
826 self.assertEqual(sv.value, cv[1])
827
828 proc = self.Process(target=self._test, args=(values,))
829 proc.start()
830 proc.join()
831
832 for sv, cv in zip(values, self.codes_values):
833 self.assertEqual(sv.value, cv[2])
834
835 def test_rawvalue(self):
836 self.test_value(raw=True)
837
838 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000839 val1 = self.Value('i', 5)
840 lock1 = val1.get_lock()
841 obj1 = val1.get_obj()
842
843 val2 = self.Value('i', 5, lock=None)
844 lock2 = val2.get_lock()
845 obj2 = val2.get_obj()
846
847 lock = self.Lock()
848 val3 = self.Value('i', 5, lock=lock)
849 lock3 = val3.get_lock()
850 obj3 = val3.get_obj()
851 self.assertEqual(lock, lock3)
852
Jesse Nollerb0516a62009-01-18 03:11:38 +0000853 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000854 self.assertFalse(hasattr(arr4, 'get_lock'))
855 self.assertFalse(hasattr(arr4, 'get_obj'))
856
Jesse Nollerb0516a62009-01-18 03:11:38 +0000857 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
858
859 arr5 = self.RawValue('i', 5)
860 self.assertFalse(hasattr(arr5, 'get_lock'))
861 self.assertFalse(hasattr(arr5, 'get_obj'))
862
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863
864class _TestArray(BaseTestCase):
865
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000866 ALLOWED_TYPES = ('processes',)
867
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868 def f(self, seq):
869 for i in range(1, len(seq)):
870 seq[i] += seq[i-1]
871
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000872 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000873 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
875 if raw:
876 arr = self.RawArray('i', seq)
877 else:
878 arr = self.Array('i', seq)
879
880 self.assertEqual(len(arr), len(seq))
881 self.assertEqual(arr[3], seq[3])
882 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
883
884 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
885
886 self.assertEqual(list(arr[:]), seq)
887
888 self.f(seq)
889
890 p = self.Process(target=self.f, args=(arr,))
891 p.start()
892 p.join()
893
894 self.assertEqual(list(arr[:]), seq)
895
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000896 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 def test_rawarray(self):
898 self.test_array(raw=True)
899
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000900 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000901 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000902 arr1 = self.Array('i', list(range(10)))
903 lock1 = arr1.get_lock()
904 obj1 = arr1.get_obj()
905
906 arr2 = self.Array('i', list(range(10)), lock=None)
907 lock2 = arr2.get_lock()
908 obj2 = arr2.get_obj()
909
910 lock = self.Lock()
911 arr3 = self.Array('i', list(range(10)), lock=lock)
912 lock3 = arr3.get_lock()
913 obj3 = arr3.get_obj()
914 self.assertEqual(lock, lock3)
915
Jesse Nollerb0516a62009-01-18 03:11:38 +0000916 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 self.assertFalse(hasattr(arr4, 'get_lock'))
918 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000919 self.assertRaises(AttributeError,
920 self.Array, 'i', range(10), lock='notalock')
921
922 arr5 = self.RawArray('i', range(10))
923 self.assertFalse(hasattr(arr5, 'get_lock'))
924 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000925
926#
927#
928#
929
930class _TestContainers(BaseTestCase):
931
932 ALLOWED_TYPES = ('manager',)
933
934 def test_list(self):
935 a = self.list(list(range(10)))
936 self.assertEqual(a[:], list(range(10)))
937
938 b = self.list()
939 self.assertEqual(b[:], [])
940
941 b.extend(list(range(5)))
942 self.assertEqual(b[:], list(range(5)))
943
944 self.assertEqual(b[2], 2)
945 self.assertEqual(b[2:10], [2,3,4])
946
947 b *= 2
948 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
949
950 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
951
952 self.assertEqual(a[:], list(range(10)))
953
954 d = [a, b]
955 e = self.list(d)
956 self.assertEqual(
957 e[:],
958 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
959 )
960
961 f = self.list([a])
962 a.append('hello')
963 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
964
965 def test_dict(self):
966 d = self.dict()
967 indices = list(range(65, 70))
968 for i in indices:
969 d[i] = chr(i)
970 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
971 self.assertEqual(sorted(d.keys()), indices)
972 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
973 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
974
975 def test_namespace(self):
976 n = self.Namespace()
977 n.name = 'Bob'
978 n.job = 'Builder'
979 n._hidden = 'hidden'
980 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
981 del n.job
982 self.assertEqual(str(n), "Namespace(name='Bob')")
983 self.assertTrue(hasattr(n, 'name'))
984 self.assertTrue(not hasattr(n, 'job'))
985
986#
987#
988#
989
990def sqr(x, wait=0.0):
991 time.sleep(wait)
992 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000993class _TestPool(BaseTestCase):
994
995 def test_apply(self):
996 papply = self.pool.apply
997 self.assertEqual(papply(sqr, (5,)), sqr(5))
998 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
999
1000 def test_map(self):
1001 pmap = self.pool.map
1002 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1003 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1004 list(map(sqr, list(range(100)))))
1005
Georg Brandld80344f2009-08-13 12:26:19 +00001006 def test_map_chunksize(self):
1007 try:
1008 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1009 except multiprocessing.TimeoutError:
1010 self.fail("pool.map_async with chunksize stalled on null list")
1011
Benjamin Petersone711caf2008-06-11 16:44:04 +00001012 def test_async(self):
1013 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1014 get = TimingWrapper(res.get)
1015 self.assertEqual(get(), 49)
1016 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1017
1018 def test_async_timeout(self):
1019 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1020 get = TimingWrapper(res.get)
1021 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1022 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1023
1024 def test_imap(self):
1025 it = self.pool.imap(sqr, list(range(10)))
1026 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1027
1028 it = self.pool.imap(sqr, list(range(10)))
1029 for i in range(10):
1030 self.assertEqual(next(it), i*i)
1031 self.assertRaises(StopIteration, it.__next__)
1032
1033 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1034 for i in range(1000):
1035 self.assertEqual(next(it), i*i)
1036 self.assertRaises(StopIteration, it.__next__)
1037
1038 def test_imap_unordered(self):
1039 it = self.pool.imap_unordered(sqr, list(range(1000)))
1040 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1041
1042 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1043 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1044
1045 def test_make_pool(self):
1046 p = multiprocessing.Pool(3)
1047 self.assertEqual(3, len(p._pool))
1048 p.close()
1049 p.join()
1050
1051 def test_terminate(self):
1052 if self.TYPE == 'manager':
1053 # On Unix a forked process increfs each shared object to
1054 # which its parent process held a reference. If the
1055 # forked process gets terminated then there is likely to
1056 # be a reference leak. So to prevent
1057 # _TestZZZNumberOfObjects from failing we skip this test
1058 # when using a manager.
1059 return
1060
1061 result = self.pool.map_async(
1062 time.sleep, [0.1 for i in range(10000)], chunksize=1
1063 )
1064 self.pool.terminate()
1065 join = TimingWrapper(self.pool.join)
1066 join()
1067 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001068#
1069# Test that manager has expected number of shared objects left
1070#
1071
1072class _TestZZZNumberOfObjects(BaseTestCase):
1073 # Because test cases are sorted alphabetically, this one will get
1074 # run after all the other tests for the manager. It tests that
1075 # there have been no "reference leaks" for the manager's shared
1076 # objects. Note the comment in _TestPool.test_terminate().
1077 ALLOWED_TYPES = ('manager',)
1078
1079 def test_number_of_objects(self):
1080 EXPECTED_NUMBER = 1 # the pool object is still alive
1081 multiprocessing.active_children() # discard dead process objs
1082 gc.collect() # do garbage collection
1083 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001084 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001086 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001087 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088
1089 self.assertEqual(refs, EXPECTED_NUMBER)
1090
1091#
1092# Test of creating a customized manager class
1093#
1094
1095from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1096
1097class FooBar(object):
1098 def f(self):
1099 return 'f()'
1100 def g(self):
1101 raise ValueError
1102 def _h(self):
1103 return '_h()'
1104
1105def baz():
1106 for i in range(10):
1107 yield i*i
1108
1109class IteratorProxy(BaseProxy):
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001110 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001111 def __iter__(self):
1112 return self
1113 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001114 return self._callmethod('__next__')
1115
1116class MyManager(BaseManager):
1117 pass
1118
1119MyManager.register('Foo', callable=FooBar)
1120MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1121MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1122
1123
1124class _TestMyManager(BaseTestCase):
1125
1126 ALLOWED_TYPES = ('manager',)
1127
1128 def test_mymanager(self):
1129 manager = MyManager()
1130 manager.start()
1131
1132 foo = manager.Foo()
1133 bar = manager.Bar()
1134 baz = manager.baz()
1135
1136 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1137 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1138
1139 self.assertEqual(foo_methods, ['f', 'g'])
1140 self.assertEqual(bar_methods, ['f', '_h'])
1141
1142 self.assertEqual(foo.f(), 'f()')
1143 self.assertRaises(ValueError, foo.g)
1144 self.assertEqual(foo._callmethod('f'), 'f()')
1145 self.assertRaises(RemoteError, foo._callmethod, '_h')
1146
1147 self.assertEqual(bar.f(), 'f()')
1148 self.assertEqual(bar._h(), '_h()')
1149 self.assertEqual(bar._callmethod('f'), 'f()')
1150 self.assertEqual(bar._callmethod('_h'), '_h()')
1151
1152 self.assertEqual(list(baz), [i*i for i in range(10)])
1153
1154 manager.shutdown()
1155
1156#
1157# Test of connecting to a remote server and using xmlrpclib for serialization
1158#
1159
1160_queue = pyqueue.Queue()
1161def get_queue():
1162 return _queue
1163
1164class QueueManager(BaseManager):
1165 '''manager class used by server process'''
1166QueueManager.register('get_queue', callable=get_queue)
1167
1168class QueueManager2(BaseManager):
1169 '''manager class which specifies the same interface as QueueManager'''
1170QueueManager2.register('get_queue')
1171
1172
1173SERIALIZER = 'xmlrpclib'
1174
1175class _TestRemoteManager(BaseTestCase):
1176
1177 ALLOWED_TYPES = ('manager',)
1178
1179 def _putter(self, address, authkey):
1180 manager = QueueManager2(
1181 address=address, authkey=authkey, serializer=SERIALIZER
1182 )
1183 manager.connect()
1184 queue = manager.get_queue()
1185 queue.put(('hello world', None, True, 2.25))
1186
1187 def test_remote(self):
1188 authkey = os.urandom(32)
1189
1190 manager = QueueManager(
1191 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1192 )
1193 manager.start()
1194
1195 p = self.Process(target=self._putter, args=(manager.address, authkey))
1196 p.start()
1197
1198 manager2 = QueueManager2(
1199 address=manager.address, authkey=authkey, serializer=SERIALIZER
1200 )
1201 manager2.connect()
1202 queue = manager2.get_queue()
1203
1204 # Note that xmlrpclib will deserialize object as a list not a tuple
1205 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1206
1207 # Because we are using xmlrpclib for serialization instead of
1208 # pickle this will cause a serialization error.
1209 self.assertRaises(Exception, queue.put, time.sleep)
1210
1211 # Make queue finalizer run before the server is stopped
1212 del queue
1213 manager.shutdown()
1214
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001215class _TestManagerRestart(BaseTestCase):
1216
1217 def _putter(self, address, authkey):
1218 manager = QueueManager(
1219 address=address, authkey=authkey, serializer=SERIALIZER)
1220 manager.connect()
1221 queue = manager.get_queue()
1222 queue.put('hello world')
1223
1224 def test_rapid_restart(self):
1225 authkey = os.urandom(32)
1226 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001227 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1228 addr = manager.get_server().address
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001229 manager.start()
1230
1231 p = self.Process(target=self._putter, args=(manager.address, authkey))
1232 p.start()
1233 queue = manager.get_queue()
1234 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001235 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001236 manager.shutdown()
1237 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001238 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001239 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001240 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001241
Benjamin Petersone711caf2008-06-11 16:44:04 +00001242#
1243#
1244#
1245
1246SENTINEL = latin('')
1247
1248class _TestConnection(BaseTestCase):
1249
1250 ALLOWED_TYPES = ('processes', 'threads')
1251
1252 def _echo(self, conn):
1253 for msg in iter(conn.recv_bytes, SENTINEL):
1254 conn.send_bytes(msg)
1255 conn.close()
1256
1257 def test_connection(self):
1258 conn, child_conn = self.Pipe()
1259
1260 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001261 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001262 p.start()
1263
1264 seq = [1, 2.25, None]
1265 msg = latin('hello world')
1266 longmsg = msg * 10
1267 arr = array.array('i', list(range(4)))
1268
1269 if self.TYPE == 'processes':
1270 self.assertEqual(type(conn.fileno()), int)
1271
1272 self.assertEqual(conn.send(seq), None)
1273 self.assertEqual(conn.recv(), seq)
1274
1275 self.assertEqual(conn.send_bytes(msg), None)
1276 self.assertEqual(conn.recv_bytes(), msg)
1277
1278 if self.TYPE == 'processes':
1279 buffer = array.array('i', [0]*10)
1280 expected = list(arr) + [0] * (10 - len(arr))
1281 self.assertEqual(conn.send_bytes(arr), None)
1282 self.assertEqual(conn.recv_bytes_into(buffer),
1283 len(arr) * buffer.itemsize)
1284 self.assertEqual(list(buffer), expected)
1285
1286 buffer = array.array('i', [0]*10)
1287 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1288 self.assertEqual(conn.send_bytes(arr), None)
1289 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1290 len(arr) * buffer.itemsize)
1291 self.assertEqual(list(buffer), expected)
1292
1293 buffer = bytearray(latin(' ' * 40))
1294 self.assertEqual(conn.send_bytes(longmsg), None)
1295 try:
1296 res = conn.recv_bytes_into(buffer)
1297 except multiprocessing.BufferTooShort as e:
1298 self.assertEqual(e.args, (longmsg,))
1299 else:
1300 self.fail('expected BufferTooShort, got %s' % res)
1301
1302 poll = TimingWrapper(conn.poll)
1303
1304 self.assertEqual(poll(), False)
1305 self.assertTimingAlmostEqual(poll.elapsed, 0)
1306
1307 self.assertEqual(poll(TIMEOUT1), False)
1308 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1309
1310 conn.send(None)
1311
1312 self.assertEqual(poll(TIMEOUT1), True)
1313 self.assertTimingAlmostEqual(poll.elapsed, 0)
1314
1315 self.assertEqual(conn.recv(), None)
1316
1317 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1318 conn.send_bytes(really_big_msg)
1319 self.assertEqual(conn.recv_bytes(), really_big_msg)
1320
1321 conn.send_bytes(SENTINEL) # tell child to quit
1322 child_conn.close()
1323
1324 if self.TYPE == 'processes':
1325 self.assertEqual(conn.readable, True)
1326 self.assertEqual(conn.writable, True)
1327 self.assertRaises(EOFError, conn.recv)
1328 self.assertRaises(EOFError, conn.recv_bytes)
1329
1330 p.join()
1331
1332 def test_duplex_false(self):
1333 reader, writer = self.Pipe(duplex=False)
1334 self.assertEqual(writer.send(1), None)
1335 self.assertEqual(reader.recv(), 1)
1336 if self.TYPE == 'processes':
1337 self.assertEqual(reader.readable, True)
1338 self.assertEqual(reader.writable, False)
1339 self.assertEqual(writer.readable, False)
1340 self.assertEqual(writer.writable, True)
1341 self.assertRaises(IOError, reader.send, 2)
1342 self.assertRaises(IOError, writer.recv)
1343 self.assertRaises(IOError, writer.poll)
1344
1345 def test_spawn_close(self):
1346 # We test that a pipe connection can be closed by parent
1347 # process immediately after child is spawned. On Windows this
1348 # would have sometimes failed on old versions because
1349 # child_conn would be closed before the child got a chance to
1350 # duplicate it.
1351 conn, child_conn = self.Pipe()
1352
1353 p = self.Process(target=self._echo, args=(child_conn,))
1354 p.start()
1355 child_conn.close() # this might complete before child initializes
1356
1357 msg = latin('hello')
1358 conn.send_bytes(msg)
1359 self.assertEqual(conn.recv_bytes(), msg)
1360
1361 conn.send_bytes(SENTINEL)
1362 conn.close()
1363 p.join()
1364
1365 def test_sendbytes(self):
1366 if self.TYPE != 'processes':
1367 return
1368
1369 msg = latin('abcdefghijklmnopqrstuvwxyz')
1370 a, b = self.Pipe()
1371
1372 a.send_bytes(msg)
1373 self.assertEqual(b.recv_bytes(), msg)
1374
1375 a.send_bytes(msg, 5)
1376 self.assertEqual(b.recv_bytes(), msg[5:])
1377
1378 a.send_bytes(msg, 7, 8)
1379 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1380
1381 a.send_bytes(msg, 26)
1382 self.assertEqual(b.recv_bytes(), latin(''))
1383
1384 a.send_bytes(msg, 26, 0)
1385 self.assertEqual(b.recv_bytes(), latin(''))
1386
1387 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1388
1389 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1390
1391 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1392
1393 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1394
1395 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1396
Benjamin Petersone711caf2008-06-11 16:44:04 +00001397class _TestListenerClient(BaseTestCase):
1398
1399 ALLOWED_TYPES = ('processes', 'threads')
1400
1401 def _test(self, address):
1402 conn = self.connection.Client(address)
1403 conn.send('hello')
1404 conn.close()
1405
1406 def test_listener_client(self):
1407 for family in self.connection.families:
1408 l = self.connection.Listener(family=family)
1409 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001410 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001411 p.start()
1412 conn = l.accept()
1413 self.assertEqual(conn.recv(), 'hello')
1414 p.join()
1415 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001416#
1417# Test of sending connection and socket objects between processes
1418#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001419"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001420class _TestPicklingConnections(BaseTestCase):
1421
1422 ALLOWED_TYPES = ('processes',)
1423
1424 def _listener(self, conn, families):
1425 for fam in families:
1426 l = self.connection.Listener(family=fam)
1427 conn.send(l.address)
1428 new_conn = l.accept()
1429 conn.send(new_conn)
1430
1431 if self.TYPE == 'processes':
1432 l = socket.socket()
1433 l.bind(('localhost', 0))
1434 conn.send(l.getsockname())
1435 l.listen(1)
1436 new_conn, addr = l.accept()
1437 conn.send(new_conn)
1438
1439 conn.recv()
1440
1441 def _remote(self, conn):
1442 for (address, msg) in iter(conn.recv, None):
1443 client = self.connection.Client(address)
1444 client.send(msg.upper())
1445 client.close()
1446
1447 if self.TYPE == 'processes':
1448 address, msg = conn.recv()
1449 client = socket.socket()
1450 client.connect(address)
1451 client.sendall(msg.upper())
1452 client.close()
1453
1454 conn.close()
1455
1456 def test_pickling(self):
1457 try:
1458 multiprocessing.allow_connection_pickling()
1459 except ImportError:
1460 return
1461
1462 families = self.connection.families
1463
1464 lconn, lconn0 = self.Pipe()
1465 lp = self.Process(target=self._listener, args=(lconn0, families))
1466 lp.start()
1467 lconn0.close()
1468
1469 rconn, rconn0 = self.Pipe()
1470 rp = self.Process(target=self._remote, args=(rconn0,))
1471 rp.start()
1472 rconn0.close()
1473
1474 for fam in families:
1475 msg = ('This connection uses family %s' % fam).encode('ascii')
1476 address = lconn.recv()
1477 rconn.send((address, msg))
1478 new_conn = lconn.recv()
1479 self.assertEqual(new_conn.recv(), msg.upper())
1480
1481 rconn.send(None)
1482
1483 if self.TYPE == 'processes':
1484 msg = latin('This connection uses a normal socket')
1485 address = lconn.recv()
1486 rconn.send((address, msg))
1487 if hasattr(socket, 'fromfd'):
1488 new_conn = lconn.recv()
1489 self.assertEqual(new_conn.recv(100), msg.upper())
1490 else:
1491 # XXX On Windows with Py2.6 need to backport fromfd()
1492 discard = lconn.recv_bytes()
1493
1494 lconn.send(None)
1495
1496 rconn.close()
1497 lconn.close()
1498
1499 lp.join()
1500 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001501"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001502#
1503#
1504#
1505
1506class _TestHeap(BaseTestCase):
1507
1508 ALLOWED_TYPES = ('processes',)
1509
1510 def test_heap(self):
1511 iterations = 5000
1512 maxblocks = 50
1513 blocks = []
1514
1515 # create and destroy lots of blocks of different sizes
1516 for i in range(iterations):
1517 size = int(random.lognormvariate(0, 1) * 1000)
1518 b = multiprocessing.heap.BufferWrapper(size)
1519 blocks.append(b)
1520 if len(blocks) > maxblocks:
1521 i = random.randrange(maxblocks)
1522 del blocks[i]
1523
1524 # get the heap object
1525 heap = multiprocessing.heap.BufferWrapper._heap
1526
1527 # verify the state of the heap
1528 all = []
1529 occupied = 0
1530 for L in list(heap._len_to_seq.values()):
1531 for arena, start, stop in L:
1532 all.append((heap._arenas.index(arena), start, stop,
1533 stop-start, 'free'))
1534 for arena, start, stop in heap._allocated_blocks:
1535 all.append((heap._arenas.index(arena), start, stop,
1536 stop-start, 'occupied'))
1537 occupied += (stop-start)
1538
1539 all.sort()
1540
1541 for i in range(len(all)-1):
1542 (arena, start, stop) = all[i][:3]
1543 (narena, nstart, nstop) = all[i+1][:3]
1544 self.assertTrue((arena != narena and nstart == 0) or
1545 (stop == nstart))
1546
1547#
1548#
1549#
1550
Benjamin Petersone711caf2008-06-11 16:44:04 +00001551class _Foo(Structure):
1552 _fields_ = [
1553 ('x', c_int),
1554 ('y', c_double)
1555 ]
1556
Brian Curtin918616c2010-10-07 02:12:17 +00001557@unittest.skipUnless(HAS_SHAREDCTYPES,
1558 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001559class _TestSharedCTypes(BaseTestCase):
1560
1561 ALLOWED_TYPES = ('processes',)
1562
1563 def _double(self, x, y, foo, arr, string):
1564 x.value *= 2
1565 y.value *= 2
1566 foo.x *= 2
1567 foo.y *= 2
1568 string.value *= 2
1569 for i in range(len(arr)):
1570 arr[i] *= 2
1571
1572 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001573 x = Value('i', 7, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001574 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001575 foo = Value(_Foo, 3, 2, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001576 arr = self.Array('d', list(range(10)), lock=lock)
1577 string = self.Array('c', 20, lock=lock)
1578 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001579
1580 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1581 p.start()
1582 p.join()
1583
1584 self.assertEqual(x.value, 14)
1585 self.assertAlmostEqual(y.value, 2.0/3.0)
1586 self.assertEqual(foo.x, 6)
1587 self.assertAlmostEqual(foo.y, 4.0)
1588 for i in range(10):
1589 self.assertAlmostEqual(arr[i], i*2)
1590 self.assertEqual(string.value, latin('hellohello'))
1591
1592 def test_synchronize(self):
1593 self.test_sharedctypes(lock=True)
1594
1595 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001596 foo = _Foo(2, 5.0)
Brian Curtin918616c2010-10-07 02:12:17 +00001597 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001598 foo.x = 0
1599 foo.y = 0
1600 self.assertEqual(bar.x, 2)
1601 self.assertAlmostEqual(bar.y, 5.0)
1602
1603#
1604#
1605#
1606
1607class _TestFinalize(BaseTestCase):
1608
1609 ALLOWED_TYPES = ('processes',)
1610
1611 def _test_finalize(self, conn):
1612 class Foo(object):
1613 pass
1614
1615 a = Foo()
1616 util.Finalize(a, conn.send, args=('a',))
1617 del a # triggers callback for a
1618
1619 b = Foo()
1620 close_b = util.Finalize(b, conn.send, args=('b',))
1621 close_b() # triggers callback for b
1622 close_b() # does nothing because callback has already been called
1623 del b # does nothing because callback has already been called
1624
1625 c = Foo()
1626 util.Finalize(c, conn.send, args=('c',))
1627
1628 d10 = Foo()
1629 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1630
1631 d01 = Foo()
1632 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1633 d02 = Foo()
1634 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1635 d03 = Foo()
1636 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1637
1638 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1639
1640 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1641
1642 # call mutliprocessing's cleanup function then exit process without
1643 # garbage collecting locals
1644 util._exit_function()
1645 conn.close()
1646 os._exit(0)
1647
1648 def test_finalize(self):
1649 conn, child_conn = self.Pipe()
1650
1651 p = self.Process(target=self._test_finalize, args=(child_conn,))
1652 p.start()
1653 p.join()
1654
1655 result = [obj for obj in iter(conn.recv, 'STOP')]
1656 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1657
1658#
1659# Test that from ... import * works for each module
1660#
1661
1662class _TestImportStar(BaseTestCase):
1663
1664 ALLOWED_TYPES = ('processes',)
1665
1666 def test_import(self):
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001667 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001668 'multiprocessing', 'multiprocessing.connection',
1669 'multiprocessing.heap', 'multiprocessing.managers',
1670 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001671 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001672 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001673 ]
1674
1675 if c_int is not None:
1676 # This module requires _ctypes
1677 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001678
1679 for name in modules:
1680 __import__(name)
1681 mod = sys.modules[name]
1682
1683 for attr in getattr(mod, '__all__', ()):
1684 self.assertTrue(
1685 hasattr(mod, attr),
1686 '%r does not have attribute %r' % (mod, attr)
1687 )
1688
1689#
1690# Quick test that logging works -- does not test logging output
1691#
1692
1693class _TestLogging(BaseTestCase):
1694
1695 ALLOWED_TYPES = ('processes',)
1696
1697 def test_enable_logging(self):
1698 logger = multiprocessing.get_logger()
1699 logger.setLevel(util.SUBWARNING)
1700 self.assertTrue(logger is not None)
1701 logger.debug('this will not be printed')
1702 logger.info('nor will this')
1703 logger.setLevel(LOG_LEVEL)
1704
1705 def _test_level(self, conn):
1706 logger = multiprocessing.get_logger()
1707 conn.send(logger.getEffectiveLevel())
1708
1709 def test_level(self):
1710 LEVEL1 = 32
1711 LEVEL2 = 37
1712
1713 logger = multiprocessing.get_logger()
1714 root_logger = logging.getLogger()
1715 root_level = root_logger.level
1716
1717 reader, writer = multiprocessing.Pipe(duplex=False)
1718
1719 logger.setLevel(LEVEL1)
1720 self.Process(target=self._test_level, args=(writer,)).start()
1721 self.assertEqual(LEVEL1, reader.recv())
1722
1723 logger.setLevel(logging.NOTSET)
1724 root_logger.setLevel(LEVEL2)
1725 self.Process(target=self._test_level, args=(writer,)).start()
1726 self.assertEqual(LEVEL2, reader.recv())
1727
1728 root_logger.setLevel(root_level)
1729 logger.setLevel(level=LOG_LEVEL)
1730
1731#
Jesse Noller6214edd2009-01-19 16:23:53 +00001732# Test to verify handle verification, see issue 3321
1733#
1734
1735class TestInvalidHandle(unittest.TestCase):
1736
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001737 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001738 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001739 conn = _multiprocessing.Connection(44977608)
1740 self.assertRaises(IOError, conn.poll)
1741 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001742
Jesse Noller6214edd2009-01-19 16:23:53 +00001743#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001744# Functions used to create test cases from the base ones in this module
1745#
1746
1747def get_attributes(Source, names):
1748 d = {}
1749 for name in names:
1750 obj = getattr(Source, name)
1751 if type(obj) == type(get_attributes):
1752 obj = staticmethod(obj)
1753 d[name] = obj
1754 return d
1755
1756def create_test_cases(Mixin, type):
1757 result = {}
1758 glob = globals()
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001759 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001760
1761 for name in list(glob.keys()):
1762 if name.startswith('_Test'):
1763 base = glob[name]
1764 if type in base.ALLOWED_TYPES:
1765 newname = 'With' + Type + name[1:]
1766 class Temp(base, unittest.TestCase, Mixin):
1767 pass
1768 result[newname] = Temp
1769 Temp.__name__ = newname
1770 Temp.__module__ = Mixin.__module__
1771 return result
1772
1773#
1774# Create test cases
1775#
1776
1777class ProcessesMixin(object):
1778 TYPE = 'processes'
1779 Process = multiprocessing.Process
1780 locals().update(get_attributes(multiprocessing, (
1781 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1782 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1783 'RawArray', 'current_process', 'active_children', 'Pipe',
1784 'connection', 'JoinableQueue'
1785 )))
1786
1787testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1788globals().update(testcases_processes)
1789
1790
1791class ManagerMixin(object):
1792 TYPE = 'manager'
1793 Process = multiprocessing.Process
1794 manager = object.__new__(multiprocessing.managers.SyncManager)
1795 locals().update(get_attributes(manager, (
1796 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1797 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1798 'Namespace', 'JoinableQueue'
1799 )))
1800
1801testcases_manager = create_test_cases(ManagerMixin, type='manager')
1802globals().update(testcases_manager)
1803
1804
1805class ThreadsMixin(object):
1806 TYPE = 'threads'
1807 Process = multiprocessing.dummy.Process
1808 locals().update(get_attributes(multiprocessing.dummy, (
1809 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1810 'Condition', 'Event', 'Value', 'Array', 'current_process',
1811 'active_children', 'Pipe', 'connection', 'dict', 'list',
1812 'Namespace', 'JoinableQueue'
1813 )))
1814
1815testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1816globals().update(testcases_threads)
1817
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001818class OtherTest(unittest.TestCase):
1819 # TODO: add more tests for deliver/answer challenge.
1820 def test_deliver_challenge_auth_failure(self):
1821 class _FakeConnection(object):
1822 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001823 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001824 def send_bytes(self, data):
1825 pass
1826 self.assertRaises(multiprocessing.AuthenticationError,
1827 multiprocessing.connection.deliver_challenge,
1828 _FakeConnection(), b'abc')
1829
1830 def test_answer_challenge_auth_failure(self):
1831 class _FakeConnection(object):
1832 def __init__(self):
1833 self.count = 0
1834 def recv_bytes(self, size):
1835 self.count += 1
1836 if self.count == 1:
1837 return multiprocessing.connection.CHALLENGE
1838 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001839 return b'something bogus'
1840 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001841 def send_bytes(self, data):
1842 pass
1843 self.assertRaises(multiprocessing.AuthenticationError,
1844 multiprocessing.connection.answer_challenge,
1845 _FakeConnection(), b'abc')
1846
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001847#
1848# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1849#
1850
1851def initializer(ns):
1852 ns.test += 1
1853
1854class TestInitializers(unittest.TestCase):
1855 def setUp(self):
1856 self.mgr = multiprocessing.Manager()
1857 self.ns = self.mgr.Namespace()
1858 self.ns.test = 0
1859
1860 def tearDown(self):
1861 self.mgr.shutdown()
1862
1863 def test_manager_initializer(self):
1864 m = multiprocessing.managers.SyncManager()
1865 self.assertRaises(TypeError, m.start, 1)
1866 m.start(initializer, (self.ns,))
1867 self.assertEqual(self.ns.test, 1)
1868 m.shutdown()
1869
1870 def test_pool_initializer(self):
1871 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1872 p = multiprocessing.Pool(1, initializer, (self.ns,))
1873 p.close()
1874 p.join()
1875 self.assertEqual(self.ns.test, 1)
1876
R. David Murraya44c6b32009-07-29 15:40:30 +00001877#
1878# Issue 5155, 5313, 5331: Test process in processes
1879# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1880#
1881
1882def _ThisSubProcess(q):
1883 try:
1884 item = q.get(block=False)
1885 except pyqueue.Empty:
1886 pass
1887
1888def _TestProcess(q):
1889 queue = multiprocessing.Queue()
1890 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1891 subProc.start()
1892 subProc.join()
1893
1894def _afunc(x):
1895 return x*x
1896
1897def pool_in_process():
1898 pool = multiprocessing.Pool(processes=4)
1899 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1900
1901class _file_like(object):
1902 def __init__(self, delegate):
1903 self._delegate = delegate
1904 self._pid = None
1905
1906 @property
1907 def cache(self):
1908 pid = os.getpid()
1909 # There are no race conditions since fork keeps only the running thread
1910 if pid != self._pid:
1911 self._pid = pid
1912 self._cache = []
1913 return self._cache
1914
1915 def write(self, data):
1916 self.cache.append(data)
1917
1918 def flush(self):
1919 self._delegate.write(''.join(self.cache))
1920 self._cache = []
1921
1922class TestStdinBadfiledescriptor(unittest.TestCase):
1923
1924 def test_queue_in_process(self):
1925 queue = multiprocessing.Queue()
1926 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1927 proc.start()
1928 proc.join()
1929
1930 def test_pool_in_process(self):
1931 p = multiprocessing.Process(target=pool_in_process)
1932 p.start()
1933 p.join()
1934
1935 def test_flushing(self):
1936 sio = io.StringIO()
1937 flike = _file_like(sio)
1938 flike.write('foo')
1939 proc = multiprocessing.Process(target=lambda: flike.flush())
1940 flike.flush()
1941 assert sio.getvalue() == 'foo'
1942
1943testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1944 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001945
Benjamin Petersone711caf2008-06-11 16:44:04 +00001946#
1947#
1948#
1949
1950def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001951 if sys.platform.startswith("linux"):
1952 try:
1953 lock = multiprocessing.RLock()
1954 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00001955 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001956
Benjamin Petersone711caf2008-06-11 16:44:04 +00001957 if run is None:
1958 from test.support import run_unittest as run
1959
1960 util.get_temp_dir() # creates temp directory for use by all processes
1961
1962 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1963
Benjamin Peterson41181742008-07-02 20:22:54 +00001964 ProcessesMixin.pool = multiprocessing.Pool(4)
1965 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1966 ManagerMixin.manager.__init__()
1967 ManagerMixin.manager.start()
1968 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001969
1970 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001971 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1972 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001973 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1974 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001975 )
1976
1977 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1978 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1979 run(suite)
1980
Benjamin Peterson41181742008-07-02 20:22:54 +00001981 ThreadsMixin.pool.terminate()
1982 ProcessesMixin.pool.terminate()
1983 ManagerMixin.pool.terminate()
1984 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001985
Benjamin Peterson41181742008-07-02 20:22:54 +00001986 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001987
1988def main():
1989 test_main(unittest.TextTestRunner(verbosity=2).run)
1990
1991if __name__ == '__main__':
1992 main()