blob: 869cf3bc8aa5299f7d5729edc58e09af88bfc132 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
R. David Murraya44c6b32009-07-29 15:40:30 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
15import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000033
34from multiprocessing import util
35
Brian Curtin918616c2010-10-07 02:12:17 +000036try:
37 from multiprocessing.sharedctypes import Value, copy
38 HAS_SHAREDCTYPES = True
39except ImportError:
40 HAS_SHAREDCTYPES = False
41
Benjamin Petersone711caf2008-06-11 16:44:04 +000042#
43#
44#
45
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000046def latin(s):
47 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
Benjamin Petersone711caf2008-06-11 16:44:04 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
54#LOG_LEVEL = logging.WARNING
55
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller6214edd2009-01-19 16:23:53 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersone711caf2008-06-11 16:44:04 +000071#
Florent Xicluna9b0e9182010-03-28 11:42:38 +000072# Some tests require ctypes
73#
74
75try:
Florent Xiclunab4efb3d2010-08-14 18:24:40 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna9b0e9182010-03-28 11:42:38 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersone711caf2008-06-11 16:44:04 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
118#
119# Return the value of a semaphore
120#
121
122def get_value(self):
123 try:
124 return self.get_value()
125 except AttributeError:
126 try:
127 return self._Semaphore__value
128 except AttributeError:
129 try:
130 return self._value
131 except AttributeError:
132 raise NotImplementedError
133
134#
135# Testcases
136#
137
138class _TestProcess(BaseTestCase):
139
140 ALLOWED_TYPES = ('processes', 'threads')
141
142 def test_current(self):
143 if self.TYPE == 'threads':
144 return
145
146 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000147 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000148
149 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000150 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151 self.assertTrue(isinstance(authkey, bytes))
152 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000153 self.assertEqual(current.ident, os.getpid())
154 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155
156 def _test(self, q, *args, **kwds):
157 current = self.current_process()
158 q.put(args)
159 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000162 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 q.put(current.pid)
164
165 def test_process(self):
166 q = self.Queue(1)
167 e = self.Event()
168 args = (q, 1, 2)
169 kwargs = {'hello':23, 'bye':2.54}
170 name = 'SomeProcess'
171 p = self.Process(
172 target=self._test, args=args, kwargs=kwargs, name=name
173 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 current = self.current_process()
176
177 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000180 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000181 self.assertTrue(p not in self.active_children())
182 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000183 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000184
185 p.start()
186
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000187 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000188 self.assertEquals(p.is_alive(), True)
189 self.assertTrue(p in self.active_children())
190
191 self.assertEquals(q.get(), args[1:])
192 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000193 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196 self.assertEquals(q.get(), p.pid)
197
198 p.join()
199
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000200 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201 self.assertEquals(p.is_alive(), False)
202 self.assertTrue(p not in self.active_children())
203
204 def _test_terminate(self):
205 time.sleep(1000)
206
207 def test_terminate(self):
208 if self.TYPE == 'threads':
209 return
210
211 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000212 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213 p.start()
214
215 self.assertEqual(p.is_alive(), True)
216 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000217 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218
219 p.terminate()
220
221 join = TimingWrapper(p.join)
222 self.assertEqual(join(), None)
223 self.assertTimingAlmostEqual(join.elapsed, 0.0)
224
225 self.assertEqual(p.is_alive(), False)
226 self.assertTrue(p not in self.active_children())
227
228 p.join()
229
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 # XXX sometimes get p.exitcode == 0 on Windows ...
231 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232
233 def test_cpu_count(self):
234 try:
235 cpus = multiprocessing.cpu_count()
236 except NotImplementedError:
237 cpus = 1
238 self.assertTrue(type(cpus) is int)
239 self.assertTrue(cpus >= 1)
240
241 def test_active_children(self):
242 self.assertEqual(type(self.active_children()), list)
243
244 p = self.Process(target=time.sleep, args=(DELTA,))
245 self.assertTrue(p not in self.active_children())
246
247 p.start()
248 self.assertTrue(p in self.active_children())
249
250 p.join()
251 self.assertTrue(p not in self.active_children())
252
253 def _test_recursion(self, wconn, id):
254 from multiprocessing import forking
255 wconn.send(id)
256 if len(id) < 2:
257 for i in range(2):
258 p = self.Process(
259 target=self._test_recursion, args=(wconn, id+[i])
260 )
261 p.start()
262 p.join()
263
264 def test_recursion(self):
265 rconn, wconn = self.Pipe(duplex=False)
266 self._test_recursion(wconn, [])
267
268 time.sleep(DELTA)
269 result = []
270 while rconn.poll():
271 result.append(rconn.recv())
272
273 expected = [
274 [],
275 [0],
276 [0, 0],
277 [0, 1],
278 [1],
279 [1, 0],
280 [1, 1]
281 ]
282 self.assertEqual(result, expected)
283
284#
285#
286#
287
288class _UpperCaser(multiprocessing.Process):
289
290 def __init__(self):
291 multiprocessing.Process.__init__(self)
292 self.child_conn, self.parent_conn = multiprocessing.Pipe()
293
294 def run(self):
295 self.parent_conn.close()
296 for s in iter(self.child_conn.recv, None):
297 self.child_conn.send(s.upper())
298 self.child_conn.close()
299
300 def submit(self, s):
301 assert type(s) is str
302 self.parent_conn.send(s)
303 return self.parent_conn.recv()
304
305 def stop(self):
306 self.parent_conn.send(None)
307 self.parent_conn.close()
308 self.child_conn.close()
309
310class _TestSubclassingProcess(BaseTestCase):
311
312 ALLOWED_TYPES = ('processes',)
313
314 def test_subclassing(self):
315 uppercaser = _UpperCaser()
316 uppercaser.start()
317 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
318 self.assertEqual(uppercaser.submit('world'), 'WORLD')
319 uppercaser.stop()
320 uppercaser.join()
321
322#
323#
324#
325
326def queue_empty(q):
327 if hasattr(q, 'empty'):
328 return q.empty()
329 else:
330 return q.qsize() == 0
331
332def queue_full(q, maxsize):
333 if hasattr(q, 'full'):
334 return q.full()
335 else:
336 return q.qsize() == maxsize
337
338
339class _TestQueue(BaseTestCase):
340
341
342 def _test_put(self, queue, child_can_start, parent_can_continue):
343 child_can_start.wait()
344 for i in range(6):
345 queue.get()
346 parent_can_continue.set()
347
348 def test_put(self):
349 MAXSIZE = 6
350 queue = self.Queue(maxsize=MAXSIZE)
351 child_can_start = self.Event()
352 parent_can_continue = self.Event()
353
354 proc = self.Process(
355 target=self._test_put,
356 args=(queue, child_can_start, parent_can_continue)
357 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000358 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000359 proc.start()
360
361 self.assertEqual(queue_empty(queue), True)
362 self.assertEqual(queue_full(queue, MAXSIZE), False)
363
364 queue.put(1)
365 queue.put(2, True)
366 queue.put(3, True, None)
367 queue.put(4, False)
368 queue.put(5, False, None)
369 queue.put_nowait(6)
370
371 # the values may be in buffer but not yet in pipe so sleep a bit
372 time.sleep(DELTA)
373
374 self.assertEqual(queue_empty(queue), False)
375 self.assertEqual(queue_full(queue, MAXSIZE), True)
376
377 put = TimingWrapper(queue.put)
378 put_nowait = TimingWrapper(queue.put_nowait)
379
380 self.assertRaises(pyqueue.Full, put, 7, False)
381 self.assertTimingAlmostEqual(put.elapsed, 0)
382
383 self.assertRaises(pyqueue.Full, put, 7, False, None)
384 self.assertTimingAlmostEqual(put.elapsed, 0)
385
386 self.assertRaises(pyqueue.Full, put_nowait, 7)
387 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
388
389 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
390 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
391
392 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
393 self.assertTimingAlmostEqual(put.elapsed, 0)
394
395 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
396 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
397
398 child_can_start.set()
399 parent_can_continue.wait()
400
401 self.assertEqual(queue_empty(queue), True)
402 self.assertEqual(queue_full(queue, MAXSIZE), False)
403
404 proc.join()
405
406 def _test_get(self, queue, child_can_start, parent_can_continue):
407 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000408 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409 queue.put(2)
410 queue.put(3)
411 queue.put(4)
412 queue.put(5)
413 parent_can_continue.set()
414
415 def test_get(self):
416 queue = self.Queue()
417 child_can_start = self.Event()
418 parent_can_continue = self.Event()
419
420 proc = self.Process(
421 target=self._test_get,
422 args=(queue, child_can_start, parent_can_continue)
423 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000424 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 proc.start()
426
427 self.assertEqual(queue_empty(queue), True)
428
429 child_can_start.set()
430 parent_can_continue.wait()
431
432 time.sleep(DELTA)
433 self.assertEqual(queue_empty(queue), False)
434
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000435 # Hangs unexpectedly, remove for now
436 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437 self.assertEqual(queue.get(True, None), 2)
438 self.assertEqual(queue.get(True), 3)
439 self.assertEqual(queue.get(timeout=1), 4)
440 self.assertEqual(queue.get_nowait(), 5)
441
442 self.assertEqual(queue_empty(queue), True)
443
444 get = TimingWrapper(queue.get)
445 get_nowait = TimingWrapper(queue.get_nowait)
446
447 self.assertRaises(pyqueue.Empty, get, False)
448 self.assertTimingAlmostEqual(get.elapsed, 0)
449
450 self.assertRaises(pyqueue.Empty, get, False, None)
451 self.assertTimingAlmostEqual(get.elapsed, 0)
452
453 self.assertRaises(pyqueue.Empty, get_nowait)
454 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
455
456 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
457 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
458
459 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
463 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
464
465 proc.join()
466
467 def _test_fork(self, queue):
468 for i in range(10, 20):
469 queue.put(i)
470 # note that at this point the items may only be buffered, so the
471 # process cannot shutdown until the feeder thread has finished
472 # pushing items onto the pipe.
473
474 def test_fork(self):
475 # Old versions of Queue would fail to create a new feeder
476 # thread for a forked process if the original process had its
477 # own feeder thread. This test checks that this no longer
478 # happens.
479
480 queue = self.Queue()
481
482 # put items on queue so that main process starts a feeder thread
483 for i in range(10):
484 queue.put(i)
485
486 # wait to make sure thread starts before we fork a new process
487 time.sleep(DELTA)
488
489 # fork process
490 p = self.Process(target=self._test_fork, args=(queue,))
491 p.start()
492
493 # check that all expected items are in the queue
494 for i in range(20):
495 self.assertEqual(queue.get(), i)
496 self.assertRaises(pyqueue.Empty, queue.get, False)
497
498 p.join()
499
500 def test_qsize(self):
501 q = self.Queue()
502 try:
503 self.assertEqual(q.qsize(), 0)
504 except NotImplementedError:
505 return
506 q.put(1)
507 self.assertEqual(q.qsize(), 1)
508 q.put(5)
509 self.assertEqual(q.qsize(), 2)
510 q.get()
511 self.assertEqual(q.qsize(), 1)
512 q.get()
513 self.assertEqual(q.qsize(), 0)
514
515 def _test_task_done(self, q):
516 for obj in iter(q.get, None):
517 time.sleep(DELTA)
518 q.task_done()
519
520 def test_task_done(self):
521 queue = self.JoinableQueue()
522
523 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000524 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000525
526 workers = [self.Process(target=self._test_task_done, args=(queue,))
527 for i in range(4)]
528
529 for p in workers:
530 p.start()
531
532 for i in range(10):
533 queue.put(i)
534
535 queue.join()
536
537 for p in workers:
538 queue.put(None)
539
540 for p in workers:
541 p.join()
542
543#
544#
545#
546
547class _TestLock(BaseTestCase):
548
549 def test_lock(self):
550 lock = self.Lock()
551 self.assertEqual(lock.acquire(), True)
552 self.assertEqual(lock.acquire(False), False)
553 self.assertEqual(lock.release(), None)
554 self.assertRaises((ValueError, threading.ThreadError), lock.release)
555
556 def test_rlock(self):
557 lock = self.RLock()
558 self.assertEqual(lock.acquire(), True)
559 self.assertEqual(lock.acquire(), True)
560 self.assertEqual(lock.acquire(), True)
561 self.assertEqual(lock.release(), None)
562 self.assertEqual(lock.release(), None)
563 self.assertEqual(lock.release(), None)
564 self.assertRaises((AssertionError, RuntimeError), lock.release)
565
Jesse Nollerf8d00852009-03-31 03:25:07 +0000566 def test_lock_context(self):
567 with self.Lock():
568 pass
569
Benjamin Petersone711caf2008-06-11 16:44:04 +0000570
571class _TestSemaphore(BaseTestCase):
572
573 def _test_semaphore(self, sem):
574 self.assertReturnsIfImplemented(2, get_value, sem)
575 self.assertEqual(sem.acquire(), True)
576 self.assertReturnsIfImplemented(1, get_value, sem)
577 self.assertEqual(sem.acquire(), True)
578 self.assertReturnsIfImplemented(0, get_value, sem)
579 self.assertEqual(sem.acquire(False), False)
580 self.assertReturnsIfImplemented(0, get_value, sem)
581 self.assertEqual(sem.release(), None)
582 self.assertReturnsIfImplemented(1, get_value, sem)
583 self.assertEqual(sem.release(), None)
584 self.assertReturnsIfImplemented(2, get_value, sem)
585
586 def test_semaphore(self):
587 sem = self.Semaphore(2)
588 self._test_semaphore(sem)
589 self.assertEqual(sem.release(), None)
590 self.assertReturnsIfImplemented(3, get_value, sem)
591 self.assertEqual(sem.release(), None)
592 self.assertReturnsIfImplemented(4, get_value, sem)
593
594 def test_bounded_semaphore(self):
595 sem = self.BoundedSemaphore(2)
596 self._test_semaphore(sem)
597 # Currently fails on OS/X
598 #if HAVE_GETVALUE:
599 # self.assertRaises(ValueError, sem.release)
600 # self.assertReturnsIfImplemented(2, get_value, sem)
601
602 def test_timeout(self):
603 if self.TYPE != 'processes':
604 return
605
606 sem = self.Semaphore(0)
607 acquire = TimingWrapper(sem.acquire)
608
609 self.assertEqual(acquire(False), False)
610 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
611
612 self.assertEqual(acquire(False, None), False)
613 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
614
615 self.assertEqual(acquire(False, TIMEOUT1), False)
616 self.assertTimingAlmostEqual(acquire.elapsed, 0)
617
618 self.assertEqual(acquire(True, TIMEOUT2), False)
619 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
620
621 self.assertEqual(acquire(timeout=TIMEOUT3), False)
622 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
623
624
625class _TestCondition(BaseTestCase):
626
627 def f(self, cond, sleeping, woken, timeout=None):
628 cond.acquire()
629 sleeping.release()
630 cond.wait(timeout)
631 woken.release()
632 cond.release()
633
634 def check_invariant(self, cond):
635 # this is only supposed to succeed when there are no sleepers
636 if self.TYPE == 'processes':
637 try:
638 sleepers = (cond._sleeping_count.get_value() -
639 cond._woken_count.get_value())
640 self.assertEqual(sleepers, 0)
641 self.assertEqual(cond._wait_semaphore.get_value(), 0)
642 except NotImplementedError:
643 pass
644
645 def test_notify(self):
646 cond = self.Condition()
647 sleeping = self.Semaphore(0)
648 woken = self.Semaphore(0)
649
650 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000651 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000652 p.start()
653
654 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000655 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000656 p.start()
657
658 # wait for both children to start sleeping
659 sleeping.acquire()
660 sleeping.acquire()
661
662 # check no process/thread has woken up
663 time.sleep(DELTA)
664 self.assertReturnsIfImplemented(0, get_value, woken)
665
666 # wake up one process/thread
667 cond.acquire()
668 cond.notify()
669 cond.release()
670
671 # check one process/thread has woken up
672 time.sleep(DELTA)
673 self.assertReturnsIfImplemented(1, get_value, woken)
674
675 # wake up another
676 cond.acquire()
677 cond.notify()
678 cond.release()
679
680 # check other has woken up
681 time.sleep(DELTA)
682 self.assertReturnsIfImplemented(2, get_value, woken)
683
684 # check state is not mucked up
685 self.check_invariant(cond)
686 p.join()
687
688 def test_notify_all(self):
689 cond = self.Condition()
690 sleeping = self.Semaphore(0)
691 woken = self.Semaphore(0)
692
693 # start some threads/processes which will timeout
694 for i in range(3):
695 p = self.Process(target=self.f,
696 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000697 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000698 p.start()
699
700 t = threading.Thread(target=self.f,
701 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000702 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000703 t.start()
704
705 # wait for them all to sleep
706 for i in range(6):
707 sleeping.acquire()
708
709 # check they have all timed out
710 for i in range(6):
711 woken.acquire()
712 self.assertReturnsIfImplemented(0, get_value, woken)
713
714 # check state is not mucked up
715 self.check_invariant(cond)
716
717 # start some more threads/processes
718 for i in range(3):
719 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000720 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721 p.start()
722
723 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000724 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000725 t.start()
726
727 # wait for them to all sleep
728 for i in range(6):
729 sleeping.acquire()
730
731 # check no process/thread has woken up
732 time.sleep(DELTA)
733 self.assertReturnsIfImplemented(0, get_value, woken)
734
735 # wake them all up
736 cond.acquire()
737 cond.notify_all()
738 cond.release()
739
740 # check they have all woken
741 time.sleep(DELTA)
742 self.assertReturnsIfImplemented(6, get_value, woken)
743
744 # check state is not mucked up
745 self.check_invariant(cond)
746
747 def test_timeout(self):
748 cond = self.Condition()
749 wait = TimingWrapper(cond.wait)
750 cond.acquire()
751 res = wait(TIMEOUT1)
752 cond.release()
753 self.assertEqual(res, None)
754 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
755
756
757class _TestEvent(BaseTestCase):
758
759 def _test_event(self, event):
760 time.sleep(TIMEOUT2)
761 event.set()
762
763 def test_event(self):
764 event = self.Event()
765 wait = TimingWrapper(event.wait)
766
767 # Removed temporaily, due to API shear, this does not
768 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000769 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000770
Benjamin Peterson965ce872009-04-05 21:24:58 +0000771 # Removed, threading.Event.wait() will return the value of the __flag
772 # instead of None. API Shear with the semaphore backed mp.Event
773 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000774 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000775 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
777
778 event.set()
779
780 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000781 self.assertEqual(event.is_set(), True)
782 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000783 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000784 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
786 # self.assertEqual(event.is_set(), True)
787
788 event.clear()
789
790 #self.assertEqual(event.is_set(), False)
791
792 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000793 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000794
795#
796#
797#
798
Brian Curtin918616c2010-10-07 02:12:17 +0000799@unittest.skipUnless(HAS_SHAREDCTYPES,
800 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000801class _TestValue(BaseTestCase):
802
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000803 ALLOWED_TYPES = ('processes',)
804
Benjamin Petersone711caf2008-06-11 16:44:04 +0000805 codes_values = [
806 ('i', 4343, 24234),
807 ('d', 3.625, -4.25),
808 ('h', -232, 234),
809 ('c', latin('x'), latin('y'))
810 ]
811
812 def _test(self, values):
813 for sv, cv in zip(values, self.codes_values):
814 sv.value = cv[2]
815
816
817 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000818 if raw:
819 values = [self.RawValue(code, value)
820 for code, value, _ in self.codes_values]
821 else:
822 values = [self.Value(code, value)
823 for code, value, _ in self.codes_values]
824
825 for sv, cv in zip(values, self.codes_values):
826 self.assertEqual(sv.value, cv[1])
827
828 proc = self.Process(target=self._test, args=(values,))
829 proc.start()
830 proc.join()
831
832 for sv, cv in zip(values, self.codes_values):
833 self.assertEqual(sv.value, cv[2])
834
835 def test_rawvalue(self):
836 self.test_value(raw=True)
837
838 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000839 val1 = self.Value('i', 5)
840 lock1 = val1.get_lock()
841 obj1 = val1.get_obj()
842
843 val2 = self.Value('i', 5, lock=None)
844 lock2 = val2.get_lock()
845 obj2 = val2.get_obj()
846
847 lock = self.Lock()
848 val3 = self.Value('i', 5, lock=lock)
849 lock3 = val3.get_lock()
850 obj3 = val3.get_obj()
851 self.assertEqual(lock, lock3)
852
Jesse Nollerb0516a62009-01-18 03:11:38 +0000853 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000854 self.assertFalse(hasattr(arr4, 'get_lock'))
855 self.assertFalse(hasattr(arr4, 'get_obj'))
856
Jesse Nollerb0516a62009-01-18 03:11:38 +0000857 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
858
859 arr5 = self.RawValue('i', 5)
860 self.assertFalse(hasattr(arr5, 'get_lock'))
861 self.assertFalse(hasattr(arr5, 'get_obj'))
862
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863
864class _TestArray(BaseTestCase):
865
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000866 ALLOWED_TYPES = ('processes',)
867
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868 def f(self, seq):
869 for i in range(1, len(seq)):
870 seq[i] += seq[i-1]
871
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000872 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000873 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
875 if raw:
876 arr = self.RawArray('i', seq)
877 else:
878 arr = self.Array('i', seq)
879
880 self.assertEqual(len(arr), len(seq))
881 self.assertEqual(arr[3], seq[3])
882 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
883
884 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
885
886 self.assertEqual(list(arr[:]), seq)
887
888 self.f(seq)
889
890 p = self.Process(target=self.f, args=(arr,))
891 p.start()
892 p.join()
893
894 self.assertEqual(list(arr[:]), seq)
895
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000896 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 def test_rawarray(self):
898 self.test_array(raw=True)
899
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000900 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000901 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000902 arr1 = self.Array('i', list(range(10)))
903 lock1 = arr1.get_lock()
904 obj1 = arr1.get_obj()
905
906 arr2 = self.Array('i', list(range(10)), lock=None)
907 lock2 = arr2.get_lock()
908 obj2 = arr2.get_obj()
909
910 lock = self.Lock()
911 arr3 = self.Array('i', list(range(10)), lock=lock)
912 lock3 = arr3.get_lock()
913 obj3 = arr3.get_obj()
914 self.assertEqual(lock, lock3)
915
Jesse Nollerb0516a62009-01-18 03:11:38 +0000916 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 self.assertFalse(hasattr(arr4, 'get_lock'))
918 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000919 self.assertRaises(AttributeError,
920 self.Array, 'i', range(10), lock='notalock')
921
922 arr5 = self.RawArray('i', range(10))
923 self.assertFalse(hasattr(arr5, 'get_lock'))
924 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000925
926#
927#
928#
929
930class _TestContainers(BaseTestCase):
931
932 ALLOWED_TYPES = ('manager',)
933
934 def test_list(self):
935 a = self.list(list(range(10)))
936 self.assertEqual(a[:], list(range(10)))
937
938 b = self.list()
939 self.assertEqual(b[:], [])
940
941 b.extend(list(range(5)))
942 self.assertEqual(b[:], list(range(5)))
943
944 self.assertEqual(b[2], 2)
945 self.assertEqual(b[2:10], [2,3,4])
946
947 b *= 2
948 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
949
950 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
951
952 self.assertEqual(a[:], list(range(10)))
953
954 d = [a, b]
955 e = self.list(d)
956 self.assertEqual(
957 e[:],
958 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
959 )
960
961 f = self.list([a])
962 a.append('hello')
963 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
964
965 def test_dict(self):
966 d = self.dict()
967 indices = list(range(65, 70))
968 for i in indices:
969 d[i] = chr(i)
970 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
971 self.assertEqual(sorted(d.keys()), indices)
972 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
973 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
974
975 def test_namespace(self):
976 n = self.Namespace()
977 n.name = 'Bob'
978 n.job = 'Builder'
979 n._hidden = 'hidden'
980 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
981 del n.job
982 self.assertEqual(str(n), "Namespace(name='Bob')")
983 self.assertTrue(hasattr(n, 'name'))
984 self.assertTrue(not hasattr(n, 'job'))
985
986#
987#
988#
989
990def sqr(x, wait=0.0):
991 time.sleep(wait)
992 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000993class _TestPool(BaseTestCase):
994
995 def test_apply(self):
996 papply = self.pool.apply
997 self.assertEqual(papply(sqr, (5,)), sqr(5))
998 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
999
1000 def test_map(self):
1001 pmap = self.pool.map
1002 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1003 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1004 list(map(sqr, list(range(100)))))
1005
Georg Brandld80344f2009-08-13 12:26:19 +00001006 def test_map_chunksize(self):
1007 try:
1008 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1009 except multiprocessing.TimeoutError:
1010 self.fail("pool.map_async with chunksize stalled on null list")
1011
Benjamin Petersone711caf2008-06-11 16:44:04 +00001012 def test_async(self):
1013 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1014 get = TimingWrapper(res.get)
1015 self.assertEqual(get(), 49)
1016 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1017
1018 def test_async_timeout(self):
1019 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1020 get = TimingWrapper(res.get)
1021 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1022 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1023
1024 def test_imap(self):
1025 it = self.pool.imap(sqr, list(range(10)))
1026 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1027
1028 it = self.pool.imap(sqr, list(range(10)))
1029 for i in range(10):
1030 self.assertEqual(next(it), i*i)
1031 self.assertRaises(StopIteration, it.__next__)
1032
1033 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1034 for i in range(1000):
1035 self.assertEqual(next(it), i*i)
1036 self.assertRaises(StopIteration, it.__next__)
1037
1038 def test_imap_unordered(self):
1039 it = self.pool.imap_unordered(sqr, list(range(1000)))
1040 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1041
1042 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1043 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1044
1045 def test_make_pool(self):
1046 p = multiprocessing.Pool(3)
1047 self.assertEqual(3, len(p._pool))
1048 p.close()
1049 p.join()
1050
1051 def test_terminate(self):
1052 if self.TYPE == 'manager':
1053 # On Unix a forked process increfs each shared object to
1054 # which its parent process held a reference. If the
1055 # forked process gets terminated then there is likely to
1056 # be a reference leak. So to prevent
1057 # _TestZZZNumberOfObjects from failing we skip this test
1058 # when using a manager.
1059 return
1060
1061 result = self.pool.map_async(
1062 time.sleep, [0.1 for i in range(10000)], chunksize=1
1063 )
1064 self.pool.terminate()
1065 join = TimingWrapper(self.pool.join)
1066 join()
1067 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001068#
1069# Test that manager has expected number of shared objects left
1070#
1071
1072class _TestZZZNumberOfObjects(BaseTestCase):
1073 # Because test cases are sorted alphabetically, this one will get
1074 # run after all the other tests for the manager. It tests that
1075 # there have been no "reference leaks" for the manager's shared
1076 # objects. Note the comment in _TestPool.test_terminate().
1077 ALLOWED_TYPES = ('manager',)
1078
1079 def test_number_of_objects(self):
1080 EXPECTED_NUMBER = 1 # the pool object is still alive
1081 multiprocessing.active_children() # discard dead process objs
1082 gc.collect() # do garbage collection
1083 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001084 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001086 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001087 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088
1089 self.assertEqual(refs, EXPECTED_NUMBER)
1090
1091#
1092# Test of creating a customized manager class
1093#
1094
1095from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1096
1097class FooBar(object):
1098 def f(self):
1099 return 'f()'
1100 def g(self):
1101 raise ValueError
1102 def _h(self):
1103 return '_h()'
1104
1105def baz():
1106 for i in range(10):
1107 yield i*i
1108
1109class IteratorProxy(BaseProxy):
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001110 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001111 def __iter__(self):
1112 return self
1113 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001114 return self._callmethod('__next__')
1115
1116class MyManager(BaseManager):
1117 pass
1118
1119MyManager.register('Foo', callable=FooBar)
1120MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1121MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1122
1123
1124class _TestMyManager(BaseTestCase):
1125
1126 ALLOWED_TYPES = ('manager',)
1127
1128 def test_mymanager(self):
1129 manager = MyManager()
1130 manager.start()
1131
1132 foo = manager.Foo()
1133 bar = manager.Bar()
1134 baz = manager.baz()
1135
1136 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1137 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1138
1139 self.assertEqual(foo_methods, ['f', 'g'])
1140 self.assertEqual(bar_methods, ['f', '_h'])
1141
1142 self.assertEqual(foo.f(), 'f()')
1143 self.assertRaises(ValueError, foo.g)
1144 self.assertEqual(foo._callmethod('f'), 'f()')
1145 self.assertRaises(RemoteError, foo._callmethod, '_h')
1146
1147 self.assertEqual(bar.f(), 'f()')
1148 self.assertEqual(bar._h(), '_h()')
1149 self.assertEqual(bar._callmethod('f'), 'f()')
1150 self.assertEqual(bar._callmethod('_h'), '_h()')
1151
1152 self.assertEqual(list(baz), [i*i for i in range(10)])
1153
1154 manager.shutdown()
1155
1156#
1157# Test of connecting to a remote server and using xmlrpclib for serialization
1158#
1159
1160_queue = pyqueue.Queue()
1161def get_queue():
1162 return _queue
1163
1164class QueueManager(BaseManager):
1165 '''manager class used by server process'''
1166QueueManager.register('get_queue', callable=get_queue)
1167
1168class QueueManager2(BaseManager):
1169 '''manager class which specifies the same interface as QueueManager'''
1170QueueManager2.register('get_queue')
1171
1172
1173SERIALIZER = 'xmlrpclib'
1174
1175class _TestRemoteManager(BaseTestCase):
1176
1177 ALLOWED_TYPES = ('manager',)
1178
1179 def _putter(self, address, authkey):
1180 manager = QueueManager2(
1181 address=address, authkey=authkey, serializer=SERIALIZER
1182 )
1183 manager.connect()
1184 queue = manager.get_queue()
1185 queue.put(('hello world', None, True, 2.25))
1186
1187 def test_remote(self):
1188 authkey = os.urandom(32)
1189
1190 manager = QueueManager(
1191 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1192 )
1193 manager.start()
1194
1195 p = self.Process(target=self._putter, args=(manager.address, authkey))
1196 p.start()
1197
1198 manager2 = QueueManager2(
1199 address=manager.address, authkey=authkey, serializer=SERIALIZER
1200 )
1201 manager2.connect()
1202 queue = manager2.get_queue()
1203
1204 # Note that xmlrpclib will deserialize object as a list not a tuple
1205 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1206
1207 # Because we are using xmlrpclib for serialization instead of
1208 # pickle this will cause a serialization error.
1209 self.assertRaises(Exception, queue.put, time.sleep)
1210
1211 # Make queue finalizer run before the server is stopped
1212 del queue
1213 manager.shutdown()
1214
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001215class _TestManagerRestart(BaseTestCase):
1216
1217 def _putter(self, address, authkey):
1218 manager = QueueManager(
1219 address=address, authkey=authkey, serializer=SERIALIZER)
1220 manager.connect()
1221 queue = manager.get_queue()
1222 queue.put('hello world')
1223
1224 def test_rapid_restart(self):
1225 authkey = os.urandom(32)
1226 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001227 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin9e2fadc2010-11-01 05:12:34 +00001228 srvr = manager.get_server()
1229 addr = srvr.address
1230 # Close the connection.Listener socket which gets opened as a part
1231 # of manager.get_server(). It's not needed for the test.
1232 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001233 manager.start()
1234
1235 p = self.Process(target=self._putter, args=(manager.address, authkey))
1236 p.start()
1237 queue = manager.get_queue()
1238 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001239 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001240 manager.shutdown()
1241 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001242 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001243 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001244 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001245
Benjamin Petersone711caf2008-06-11 16:44:04 +00001246#
1247#
1248#
1249
1250SENTINEL = latin('')
1251
1252class _TestConnection(BaseTestCase):
1253
1254 ALLOWED_TYPES = ('processes', 'threads')
1255
1256 def _echo(self, conn):
1257 for msg in iter(conn.recv_bytes, SENTINEL):
1258 conn.send_bytes(msg)
1259 conn.close()
1260
1261 def test_connection(self):
1262 conn, child_conn = self.Pipe()
1263
1264 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001265 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001266 p.start()
1267
1268 seq = [1, 2.25, None]
1269 msg = latin('hello world')
1270 longmsg = msg * 10
1271 arr = array.array('i', list(range(4)))
1272
1273 if self.TYPE == 'processes':
1274 self.assertEqual(type(conn.fileno()), int)
1275
1276 self.assertEqual(conn.send(seq), None)
1277 self.assertEqual(conn.recv(), seq)
1278
1279 self.assertEqual(conn.send_bytes(msg), None)
1280 self.assertEqual(conn.recv_bytes(), msg)
1281
1282 if self.TYPE == 'processes':
1283 buffer = array.array('i', [0]*10)
1284 expected = list(arr) + [0] * (10 - len(arr))
1285 self.assertEqual(conn.send_bytes(arr), None)
1286 self.assertEqual(conn.recv_bytes_into(buffer),
1287 len(arr) * buffer.itemsize)
1288 self.assertEqual(list(buffer), expected)
1289
1290 buffer = array.array('i', [0]*10)
1291 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1292 self.assertEqual(conn.send_bytes(arr), None)
1293 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1294 len(arr) * buffer.itemsize)
1295 self.assertEqual(list(buffer), expected)
1296
1297 buffer = bytearray(latin(' ' * 40))
1298 self.assertEqual(conn.send_bytes(longmsg), None)
1299 try:
1300 res = conn.recv_bytes_into(buffer)
1301 except multiprocessing.BufferTooShort as e:
1302 self.assertEqual(e.args, (longmsg,))
1303 else:
1304 self.fail('expected BufferTooShort, got %s' % res)
1305
1306 poll = TimingWrapper(conn.poll)
1307
1308 self.assertEqual(poll(), False)
1309 self.assertTimingAlmostEqual(poll.elapsed, 0)
1310
1311 self.assertEqual(poll(TIMEOUT1), False)
1312 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1313
1314 conn.send(None)
1315
1316 self.assertEqual(poll(TIMEOUT1), True)
1317 self.assertTimingAlmostEqual(poll.elapsed, 0)
1318
1319 self.assertEqual(conn.recv(), None)
1320
1321 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1322 conn.send_bytes(really_big_msg)
1323 self.assertEqual(conn.recv_bytes(), really_big_msg)
1324
1325 conn.send_bytes(SENTINEL) # tell child to quit
1326 child_conn.close()
1327
1328 if self.TYPE == 'processes':
1329 self.assertEqual(conn.readable, True)
1330 self.assertEqual(conn.writable, True)
1331 self.assertRaises(EOFError, conn.recv)
1332 self.assertRaises(EOFError, conn.recv_bytes)
1333
1334 p.join()
1335
1336 def test_duplex_false(self):
1337 reader, writer = self.Pipe(duplex=False)
1338 self.assertEqual(writer.send(1), None)
1339 self.assertEqual(reader.recv(), 1)
1340 if self.TYPE == 'processes':
1341 self.assertEqual(reader.readable, True)
1342 self.assertEqual(reader.writable, False)
1343 self.assertEqual(writer.readable, False)
1344 self.assertEqual(writer.writable, True)
1345 self.assertRaises(IOError, reader.send, 2)
1346 self.assertRaises(IOError, writer.recv)
1347 self.assertRaises(IOError, writer.poll)
1348
1349 def test_spawn_close(self):
1350 # We test that a pipe connection can be closed by parent
1351 # process immediately after child is spawned. On Windows this
1352 # would have sometimes failed on old versions because
1353 # child_conn would be closed before the child got a chance to
1354 # duplicate it.
1355 conn, child_conn = self.Pipe()
1356
1357 p = self.Process(target=self._echo, args=(child_conn,))
1358 p.start()
1359 child_conn.close() # this might complete before child initializes
1360
1361 msg = latin('hello')
1362 conn.send_bytes(msg)
1363 self.assertEqual(conn.recv_bytes(), msg)
1364
1365 conn.send_bytes(SENTINEL)
1366 conn.close()
1367 p.join()
1368
1369 def test_sendbytes(self):
1370 if self.TYPE != 'processes':
1371 return
1372
1373 msg = latin('abcdefghijklmnopqrstuvwxyz')
1374 a, b = self.Pipe()
1375
1376 a.send_bytes(msg)
1377 self.assertEqual(b.recv_bytes(), msg)
1378
1379 a.send_bytes(msg, 5)
1380 self.assertEqual(b.recv_bytes(), msg[5:])
1381
1382 a.send_bytes(msg, 7, 8)
1383 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1384
1385 a.send_bytes(msg, 26)
1386 self.assertEqual(b.recv_bytes(), latin(''))
1387
1388 a.send_bytes(msg, 26, 0)
1389 self.assertEqual(b.recv_bytes(), latin(''))
1390
1391 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1392
1393 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1394
1395 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1396
1397 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1398
1399 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1400
Benjamin Petersone711caf2008-06-11 16:44:04 +00001401class _TestListenerClient(BaseTestCase):
1402
1403 ALLOWED_TYPES = ('processes', 'threads')
1404
1405 def _test(self, address):
1406 conn = self.connection.Client(address)
1407 conn.send('hello')
1408 conn.close()
1409
1410 def test_listener_client(self):
1411 for family in self.connection.families:
1412 l = self.connection.Listener(family=family)
1413 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001414 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001415 p.start()
1416 conn = l.accept()
1417 self.assertEqual(conn.recv(), 'hello')
1418 p.join()
1419 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001420#
1421# Test of sending connection and socket objects between processes
1422#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001423"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001424class _TestPicklingConnections(BaseTestCase):
1425
1426 ALLOWED_TYPES = ('processes',)
1427
1428 def _listener(self, conn, families):
1429 for fam in families:
1430 l = self.connection.Listener(family=fam)
1431 conn.send(l.address)
1432 new_conn = l.accept()
1433 conn.send(new_conn)
1434
1435 if self.TYPE == 'processes':
1436 l = socket.socket()
1437 l.bind(('localhost', 0))
1438 conn.send(l.getsockname())
1439 l.listen(1)
1440 new_conn, addr = l.accept()
1441 conn.send(new_conn)
1442
1443 conn.recv()
1444
1445 def _remote(self, conn):
1446 for (address, msg) in iter(conn.recv, None):
1447 client = self.connection.Client(address)
1448 client.send(msg.upper())
1449 client.close()
1450
1451 if self.TYPE == 'processes':
1452 address, msg = conn.recv()
1453 client = socket.socket()
1454 client.connect(address)
1455 client.sendall(msg.upper())
1456 client.close()
1457
1458 conn.close()
1459
1460 def test_pickling(self):
1461 try:
1462 multiprocessing.allow_connection_pickling()
1463 except ImportError:
1464 return
1465
1466 families = self.connection.families
1467
1468 lconn, lconn0 = self.Pipe()
1469 lp = self.Process(target=self._listener, args=(lconn0, families))
1470 lp.start()
1471 lconn0.close()
1472
1473 rconn, rconn0 = self.Pipe()
1474 rp = self.Process(target=self._remote, args=(rconn0,))
1475 rp.start()
1476 rconn0.close()
1477
1478 for fam in families:
1479 msg = ('This connection uses family %s' % fam).encode('ascii')
1480 address = lconn.recv()
1481 rconn.send((address, msg))
1482 new_conn = lconn.recv()
1483 self.assertEqual(new_conn.recv(), msg.upper())
1484
1485 rconn.send(None)
1486
1487 if self.TYPE == 'processes':
1488 msg = latin('This connection uses a normal socket')
1489 address = lconn.recv()
1490 rconn.send((address, msg))
1491 if hasattr(socket, 'fromfd'):
1492 new_conn = lconn.recv()
1493 self.assertEqual(new_conn.recv(100), msg.upper())
1494 else:
1495 # XXX On Windows with Py2.6 need to backport fromfd()
1496 discard = lconn.recv_bytes()
1497
1498 lconn.send(None)
1499
1500 rconn.close()
1501 lconn.close()
1502
1503 lp.join()
1504 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001505"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001506#
1507#
1508#
1509
1510class _TestHeap(BaseTestCase):
1511
1512 ALLOWED_TYPES = ('processes',)
1513
1514 def test_heap(self):
1515 iterations = 5000
1516 maxblocks = 50
1517 blocks = []
1518
1519 # create and destroy lots of blocks of different sizes
1520 for i in range(iterations):
1521 size = int(random.lognormvariate(0, 1) * 1000)
1522 b = multiprocessing.heap.BufferWrapper(size)
1523 blocks.append(b)
1524 if len(blocks) > maxblocks:
1525 i = random.randrange(maxblocks)
1526 del blocks[i]
1527
1528 # get the heap object
1529 heap = multiprocessing.heap.BufferWrapper._heap
1530
1531 # verify the state of the heap
1532 all = []
1533 occupied = 0
1534 for L in list(heap._len_to_seq.values()):
1535 for arena, start, stop in L:
1536 all.append((heap._arenas.index(arena), start, stop,
1537 stop-start, 'free'))
1538 for arena, start, stop in heap._allocated_blocks:
1539 all.append((heap._arenas.index(arena), start, stop,
1540 stop-start, 'occupied'))
1541 occupied += (stop-start)
1542
1543 all.sort()
1544
1545 for i in range(len(all)-1):
1546 (arena, start, stop) = all[i][:3]
1547 (narena, nstart, nstop) = all[i+1][:3]
1548 self.assertTrue((arena != narena and nstart == 0) or
1549 (stop == nstart))
1550
1551#
1552#
1553#
1554
Benjamin Petersone711caf2008-06-11 16:44:04 +00001555class _Foo(Structure):
1556 _fields_ = [
1557 ('x', c_int),
1558 ('y', c_double)
1559 ]
1560
Brian Curtin918616c2010-10-07 02:12:17 +00001561@unittest.skipUnless(HAS_SHAREDCTYPES,
1562 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001563class _TestSharedCTypes(BaseTestCase):
1564
1565 ALLOWED_TYPES = ('processes',)
1566
1567 def _double(self, x, y, foo, arr, string):
1568 x.value *= 2
1569 y.value *= 2
1570 foo.x *= 2
1571 foo.y *= 2
1572 string.value *= 2
1573 for i in range(len(arr)):
1574 arr[i] *= 2
1575
1576 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001577 x = Value('i', 7, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001578 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001579 foo = Value(_Foo, 3, 2, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001580 arr = self.Array('d', list(range(10)), lock=lock)
1581 string = self.Array('c', 20, lock=lock)
1582 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001583
1584 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1585 p.start()
1586 p.join()
1587
1588 self.assertEqual(x.value, 14)
1589 self.assertAlmostEqual(y.value, 2.0/3.0)
1590 self.assertEqual(foo.x, 6)
1591 self.assertAlmostEqual(foo.y, 4.0)
1592 for i in range(10):
1593 self.assertAlmostEqual(arr[i], i*2)
1594 self.assertEqual(string.value, latin('hellohello'))
1595
1596 def test_synchronize(self):
1597 self.test_sharedctypes(lock=True)
1598
1599 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001600 foo = _Foo(2, 5.0)
Brian Curtin918616c2010-10-07 02:12:17 +00001601 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001602 foo.x = 0
1603 foo.y = 0
1604 self.assertEqual(bar.x, 2)
1605 self.assertAlmostEqual(bar.y, 5.0)
1606
1607#
1608#
1609#
1610
1611class _TestFinalize(BaseTestCase):
1612
1613 ALLOWED_TYPES = ('processes',)
1614
1615 def _test_finalize(self, conn):
1616 class Foo(object):
1617 pass
1618
1619 a = Foo()
1620 util.Finalize(a, conn.send, args=('a',))
1621 del a # triggers callback for a
1622
1623 b = Foo()
1624 close_b = util.Finalize(b, conn.send, args=('b',))
1625 close_b() # triggers callback for b
1626 close_b() # does nothing because callback has already been called
1627 del b # does nothing because callback has already been called
1628
1629 c = Foo()
1630 util.Finalize(c, conn.send, args=('c',))
1631
1632 d10 = Foo()
1633 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1634
1635 d01 = Foo()
1636 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1637 d02 = Foo()
1638 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1639 d03 = Foo()
1640 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1641
1642 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1643
1644 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1645
1646 # call mutliprocessing's cleanup function then exit process without
1647 # garbage collecting locals
1648 util._exit_function()
1649 conn.close()
1650 os._exit(0)
1651
1652 def test_finalize(self):
1653 conn, child_conn = self.Pipe()
1654
1655 p = self.Process(target=self._test_finalize, args=(child_conn,))
1656 p.start()
1657 p.join()
1658
1659 result = [obj for obj in iter(conn.recv, 'STOP')]
1660 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1661
1662#
1663# Test that from ... import * works for each module
1664#
1665
1666class _TestImportStar(BaseTestCase):
1667
1668 ALLOWED_TYPES = ('processes',)
1669
1670 def test_import(self):
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001671 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001672 'multiprocessing', 'multiprocessing.connection',
1673 'multiprocessing.heap', 'multiprocessing.managers',
1674 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001675 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001676 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001677 ]
1678
1679 if c_int is not None:
1680 # This module requires _ctypes
1681 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001682
1683 for name in modules:
1684 __import__(name)
1685 mod = sys.modules[name]
1686
1687 for attr in getattr(mod, '__all__', ()):
1688 self.assertTrue(
1689 hasattr(mod, attr),
1690 '%r does not have attribute %r' % (mod, attr)
1691 )
1692
1693#
1694# Quick test that logging works -- does not test logging output
1695#
1696
1697class _TestLogging(BaseTestCase):
1698
1699 ALLOWED_TYPES = ('processes',)
1700
1701 def test_enable_logging(self):
1702 logger = multiprocessing.get_logger()
1703 logger.setLevel(util.SUBWARNING)
1704 self.assertTrue(logger is not None)
1705 logger.debug('this will not be printed')
1706 logger.info('nor will this')
1707 logger.setLevel(LOG_LEVEL)
1708
1709 def _test_level(self, conn):
1710 logger = multiprocessing.get_logger()
1711 conn.send(logger.getEffectiveLevel())
1712
1713 def test_level(self):
1714 LEVEL1 = 32
1715 LEVEL2 = 37
1716
1717 logger = multiprocessing.get_logger()
1718 root_logger = logging.getLogger()
1719 root_level = root_logger.level
1720
1721 reader, writer = multiprocessing.Pipe(duplex=False)
1722
1723 logger.setLevel(LEVEL1)
1724 self.Process(target=self._test_level, args=(writer,)).start()
1725 self.assertEqual(LEVEL1, reader.recv())
1726
1727 logger.setLevel(logging.NOTSET)
1728 root_logger.setLevel(LEVEL2)
1729 self.Process(target=self._test_level, args=(writer,)).start()
1730 self.assertEqual(LEVEL2, reader.recv())
1731
1732 root_logger.setLevel(root_level)
1733 logger.setLevel(level=LOG_LEVEL)
1734
1735#
Jesse Noller6214edd2009-01-19 16:23:53 +00001736# Test to verify handle verification, see issue 3321
1737#
1738
1739class TestInvalidHandle(unittest.TestCase):
1740
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001741 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001742 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001743 conn = _multiprocessing.Connection(44977608)
1744 self.assertRaises(IOError, conn.poll)
1745 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001746
Jesse Noller6214edd2009-01-19 16:23:53 +00001747#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001748# Functions used to create test cases from the base ones in this module
1749#
1750
1751def get_attributes(Source, names):
1752 d = {}
1753 for name in names:
1754 obj = getattr(Source, name)
1755 if type(obj) == type(get_attributes):
1756 obj = staticmethod(obj)
1757 d[name] = obj
1758 return d
1759
1760def create_test_cases(Mixin, type):
1761 result = {}
1762 glob = globals()
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001763 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001764
1765 for name in list(glob.keys()):
1766 if name.startswith('_Test'):
1767 base = glob[name]
1768 if type in base.ALLOWED_TYPES:
1769 newname = 'With' + Type + name[1:]
1770 class Temp(base, unittest.TestCase, Mixin):
1771 pass
1772 result[newname] = Temp
1773 Temp.__name__ = newname
1774 Temp.__module__ = Mixin.__module__
1775 return result
1776
1777#
1778# Create test cases
1779#
1780
1781class ProcessesMixin(object):
1782 TYPE = 'processes'
1783 Process = multiprocessing.Process
1784 locals().update(get_attributes(multiprocessing, (
1785 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1786 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1787 'RawArray', 'current_process', 'active_children', 'Pipe',
1788 'connection', 'JoinableQueue'
1789 )))
1790
1791testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1792globals().update(testcases_processes)
1793
1794
1795class ManagerMixin(object):
1796 TYPE = 'manager'
1797 Process = multiprocessing.Process
1798 manager = object.__new__(multiprocessing.managers.SyncManager)
1799 locals().update(get_attributes(manager, (
1800 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1801 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1802 'Namespace', 'JoinableQueue'
1803 )))
1804
1805testcases_manager = create_test_cases(ManagerMixin, type='manager')
1806globals().update(testcases_manager)
1807
1808
1809class ThreadsMixin(object):
1810 TYPE = 'threads'
1811 Process = multiprocessing.dummy.Process
1812 locals().update(get_attributes(multiprocessing.dummy, (
1813 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1814 'Condition', 'Event', 'Value', 'Array', 'current_process',
1815 'active_children', 'Pipe', 'connection', 'dict', 'list',
1816 'Namespace', 'JoinableQueue'
1817 )))
1818
1819testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1820globals().update(testcases_threads)
1821
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001822class OtherTest(unittest.TestCase):
1823 # TODO: add more tests for deliver/answer challenge.
1824 def test_deliver_challenge_auth_failure(self):
1825 class _FakeConnection(object):
1826 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001827 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001828 def send_bytes(self, data):
1829 pass
1830 self.assertRaises(multiprocessing.AuthenticationError,
1831 multiprocessing.connection.deliver_challenge,
1832 _FakeConnection(), b'abc')
1833
1834 def test_answer_challenge_auth_failure(self):
1835 class _FakeConnection(object):
1836 def __init__(self):
1837 self.count = 0
1838 def recv_bytes(self, size):
1839 self.count += 1
1840 if self.count == 1:
1841 return multiprocessing.connection.CHALLENGE
1842 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001843 return b'something bogus'
1844 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001845 def send_bytes(self, data):
1846 pass
1847 self.assertRaises(multiprocessing.AuthenticationError,
1848 multiprocessing.connection.answer_challenge,
1849 _FakeConnection(), b'abc')
1850
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001851#
1852# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1853#
1854
1855def initializer(ns):
1856 ns.test += 1
1857
1858class TestInitializers(unittest.TestCase):
1859 def setUp(self):
1860 self.mgr = multiprocessing.Manager()
1861 self.ns = self.mgr.Namespace()
1862 self.ns.test = 0
1863
1864 def tearDown(self):
1865 self.mgr.shutdown()
1866
1867 def test_manager_initializer(self):
1868 m = multiprocessing.managers.SyncManager()
1869 self.assertRaises(TypeError, m.start, 1)
1870 m.start(initializer, (self.ns,))
1871 self.assertEqual(self.ns.test, 1)
1872 m.shutdown()
1873
1874 def test_pool_initializer(self):
1875 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1876 p = multiprocessing.Pool(1, initializer, (self.ns,))
1877 p.close()
1878 p.join()
1879 self.assertEqual(self.ns.test, 1)
1880
R. David Murraya44c6b32009-07-29 15:40:30 +00001881#
1882# Issue 5155, 5313, 5331: Test process in processes
1883# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1884#
1885
1886def _ThisSubProcess(q):
1887 try:
1888 item = q.get(block=False)
1889 except pyqueue.Empty:
1890 pass
1891
1892def _TestProcess(q):
1893 queue = multiprocessing.Queue()
1894 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1895 subProc.start()
1896 subProc.join()
1897
1898def _afunc(x):
1899 return x*x
1900
1901def pool_in_process():
1902 pool = multiprocessing.Pool(processes=4)
1903 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1904
1905class _file_like(object):
1906 def __init__(self, delegate):
1907 self._delegate = delegate
1908 self._pid = None
1909
1910 @property
1911 def cache(self):
1912 pid = os.getpid()
1913 # There are no race conditions since fork keeps only the running thread
1914 if pid != self._pid:
1915 self._pid = pid
1916 self._cache = []
1917 return self._cache
1918
1919 def write(self, data):
1920 self.cache.append(data)
1921
1922 def flush(self):
1923 self._delegate.write(''.join(self.cache))
1924 self._cache = []
1925
1926class TestStdinBadfiledescriptor(unittest.TestCase):
1927
1928 def test_queue_in_process(self):
1929 queue = multiprocessing.Queue()
1930 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1931 proc.start()
1932 proc.join()
1933
1934 def test_pool_in_process(self):
1935 p = multiprocessing.Process(target=pool_in_process)
1936 p.start()
1937 p.join()
1938
1939 def test_flushing(self):
1940 sio = io.StringIO()
1941 flike = _file_like(sio)
1942 flike.write('foo')
1943 proc = multiprocessing.Process(target=lambda: flike.flush())
1944 flike.flush()
1945 assert sio.getvalue() == 'foo'
1946
1947testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1948 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001949
Benjamin Petersone711caf2008-06-11 16:44:04 +00001950#
1951#
1952#
1953
1954def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001955 if sys.platform.startswith("linux"):
1956 try:
1957 lock = multiprocessing.RLock()
1958 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00001959 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001960
Benjamin Petersone711caf2008-06-11 16:44:04 +00001961 if run is None:
1962 from test.support import run_unittest as run
1963
1964 util.get_temp_dir() # creates temp directory for use by all processes
1965
1966 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1967
Benjamin Peterson41181742008-07-02 20:22:54 +00001968 ProcessesMixin.pool = multiprocessing.Pool(4)
1969 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1970 ManagerMixin.manager.__init__()
1971 ManagerMixin.manager.start()
1972 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001973
1974 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001975 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1976 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001977 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1978 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001979 )
1980
1981 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1982 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1983 run(suite)
1984
Benjamin Peterson41181742008-07-02 20:22:54 +00001985 ThreadsMixin.pool.terminate()
1986 ProcessesMixin.pool.terminate()
1987 ManagerMixin.pool.terminate()
1988 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001989
Benjamin Peterson41181742008-07-02 20:22:54 +00001990 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001991
1992def main():
1993 test_main(unittest.TextTestRunner(verbosity=2).run)
1994
1995if __name__ == '__main__':
1996 main()