blob: f901a05245cd4f9db46e634dea8bbc35456dee39 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
R. David Murraya44c6b32009-07-29 15:40:30 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
15import signal
16import array
17import copy
18import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000028
Benjamin Petersone711caf2008-06-11 16:44:04 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000034
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000041def latin(s):
42 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
Benjamin Petersone711caf2008-06-11 16:44:04 +000044#
45# Constants
46#
47
48LOG_LEVEL = util.SUBWARNING
49#LOG_LEVEL = logging.WARNING
50
51DELTA = 0.1
52CHECK_TIMINGS = False # making true makes tests take a lot longer
53 # and can sometimes cause some non-serious
54 # failures because some calls block a bit
55 # longer than expected
56if CHECK_TIMINGS:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
58else:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
60
61HAVE_GETVALUE = not getattr(_multiprocessing,
62 'HAVE_BROKEN_SEM_GETVALUE', False)
63
Jesse Noller6214edd2009-01-19 16:23:53 +000064WIN32 = (sys.platform == "win32")
65
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
Florent Xicluna9b0e9182010-03-28 11:42:38 +000067# Some tests require ctypes
68#
69
70try:
71 from ctypes import Structure, Value, copy, c_int, c_double
72except ImportError:
73 Structure = object
74 c_int = c_double = None
75
76#
Benjamin Petersone711caf2008-06-11 16:44:04 +000077# Creates a wrapper for a function which records the time it takes to finish
78#
79
80class TimingWrapper(object):
81
82 def __init__(self, func):
83 self.func = func
84 self.elapsed = None
85
86 def __call__(self, *args, **kwds):
87 t = time.time()
88 try:
89 return self.func(*args, **kwds)
90 finally:
91 self.elapsed = time.time() - t
92
93#
94# Base class for test cases
95#
96
97class BaseTestCase(object):
98
99 ALLOWED_TYPES = ('processes', 'manager', 'threads')
100
101 def assertTimingAlmostEqual(self, a, b):
102 if CHECK_TIMINGS:
103 self.assertAlmostEqual(a, b, 1)
104
105 def assertReturnsIfImplemented(self, value, func, *args):
106 try:
107 res = func(*args)
108 except NotImplementedError:
109 pass
110 else:
111 return self.assertEqual(value, res)
112
113#
114# Return the value of a semaphore
115#
116
117def get_value(self):
118 try:
119 return self.get_value()
120 except AttributeError:
121 try:
122 return self._Semaphore__value
123 except AttributeError:
124 try:
125 return self._value
126 except AttributeError:
127 raise NotImplementedError
128
129#
130# Testcases
131#
132
133class _TestProcess(BaseTestCase):
134
135 ALLOWED_TYPES = ('processes', 'threads')
136
137 def test_current(self):
138 if self.TYPE == 'threads':
139 return
140
141 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000142 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143
144 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000145 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146 self.assertTrue(isinstance(authkey, bytes))
147 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000148 self.assertEqual(current.ident, os.getpid())
149 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000150
151 def _test(self, q, *args, **kwds):
152 current = self.current_process()
153 q.put(args)
154 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000155 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 q.put(current.pid)
159
160 def test_process(self):
161 q = self.Queue(1)
162 e = self.Event()
163 args = (q, 1, 2)
164 kwargs = {'hello':23, 'bye':2.54}
165 name = 'SomeProcess'
166 p = self.Process(
167 target=self._test, args=args, kwargs=kwargs, name=name
168 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000169 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 current = self.current_process()
171
172 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000173 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000175 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000176 self.assertTrue(p not in self.active_children())
177 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179
180 p.start()
181
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000182 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 self.assertEquals(p.is_alive(), True)
184 self.assertTrue(p in self.active_children())
185
186 self.assertEquals(q.get(), args[1:])
187 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000188 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191 self.assertEquals(q.get(), p.pid)
192
193 p.join()
194
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196 self.assertEquals(p.is_alive(), False)
197 self.assertTrue(p not in self.active_children())
198
199 def _test_terminate(self):
200 time.sleep(1000)
201
202 def test_terminate(self):
203 if self.TYPE == 'threads':
204 return
205
206 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208 p.start()
209
210 self.assertEqual(p.is_alive(), True)
211 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000212 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213
214 p.terminate()
215
216 join = TimingWrapper(p.join)
217 self.assertEqual(join(), None)
218 self.assertTimingAlmostEqual(join.elapsed, 0.0)
219
220 self.assertEqual(p.is_alive(), False)
221 self.assertTrue(p not in self.active_children())
222
223 p.join()
224
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 # XXX sometimes get p.exitcode == 0 on Windows ...
226 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227
228 def test_cpu_count(self):
229 try:
230 cpus = multiprocessing.cpu_count()
231 except NotImplementedError:
232 cpus = 1
233 self.assertTrue(type(cpus) is int)
234 self.assertTrue(cpus >= 1)
235
236 def test_active_children(self):
237 self.assertEqual(type(self.active_children()), list)
238
239 p = self.Process(target=time.sleep, args=(DELTA,))
240 self.assertTrue(p not in self.active_children())
241
242 p.start()
243 self.assertTrue(p in self.active_children())
244
245 p.join()
246 self.assertTrue(p not in self.active_children())
247
248 def _test_recursion(self, wconn, id):
249 from multiprocessing import forking
250 wconn.send(id)
251 if len(id) < 2:
252 for i in range(2):
253 p = self.Process(
254 target=self._test_recursion, args=(wconn, id+[i])
255 )
256 p.start()
257 p.join()
258
259 def test_recursion(self):
260 rconn, wconn = self.Pipe(duplex=False)
261 self._test_recursion(wconn, [])
262
263 time.sleep(DELTA)
264 result = []
265 while rconn.poll():
266 result.append(rconn.recv())
267
268 expected = [
269 [],
270 [0],
271 [0, 0],
272 [0, 1],
273 [1],
274 [1, 0],
275 [1, 1]
276 ]
277 self.assertEqual(result, expected)
278
279#
280#
281#
282
283class _UpperCaser(multiprocessing.Process):
284
285 def __init__(self):
286 multiprocessing.Process.__init__(self)
287 self.child_conn, self.parent_conn = multiprocessing.Pipe()
288
289 def run(self):
290 self.parent_conn.close()
291 for s in iter(self.child_conn.recv, None):
292 self.child_conn.send(s.upper())
293 self.child_conn.close()
294
295 def submit(self, s):
296 assert type(s) is str
297 self.parent_conn.send(s)
298 return self.parent_conn.recv()
299
300 def stop(self):
301 self.parent_conn.send(None)
302 self.parent_conn.close()
303 self.child_conn.close()
304
305class _TestSubclassingProcess(BaseTestCase):
306
307 ALLOWED_TYPES = ('processes',)
308
309 def test_subclassing(self):
310 uppercaser = _UpperCaser()
311 uppercaser.start()
312 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
313 self.assertEqual(uppercaser.submit('world'), 'WORLD')
314 uppercaser.stop()
315 uppercaser.join()
316
317#
318#
319#
320
321def queue_empty(q):
322 if hasattr(q, 'empty'):
323 return q.empty()
324 else:
325 return q.qsize() == 0
326
327def queue_full(q, maxsize):
328 if hasattr(q, 'full'):
329 return q.full()
330 else:
331 return q.qsize() == maxsize
332
333
334class _TestQueue(BaseTestCase):
335
336
337 def _test_put(self, queue, child_can_start, parent_can_continue):
338 child_can_start.wait()
339 for i in range(6):
340 queue.get()
341 parent_can_continue.set()
342
343 def test_put(self):
344 MAXSIZE = 6
345 queue = self.Queue(maxsize=MAXSIZE)
346 child_can_start = self.Event()
347 parent_can_continue = self.Event()
348
349 proc = self.Process(
350 target=self._test_put,
351 args=(queue, child_can_start, parent_can_continue)
352 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000353 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000354 proc.start()
355
356 self.assertEqual(queue_empty(queue), True)
357 self.assertEqual(queue_full(queue, MAXSIZE), False)
358
359 queue.put(1)
360 queue.put(2, True)
361 queue.put(3, True, None)
362 queue.put(4, False)
363 queue.put(5, False, None)
364 queue.put_nowait(6)
365
366 # the values may be in buffer but not yet in pipe so sleep a bit
367 time.sleep(DELTA)
368
369 self.assertEqual(queue_empty(queue), False)
370 self.assertEqual(queue_full(queue, MAXSIZE), True)
371
372 put = TimingWrapper(queue.put)
373 put_nowait = TimingWrapper(queue.put_nowait)
374
375 self.assertRaises(pyqueue.Full, put, 7, False)
376 self.assertTimingAlmostEqual(put.elapsed, 0)
377
378 self.assertRaises(pyqueue.Full, put, 7, False, None)
379 self.assertTimingAlmostEqual(put.elapsed, 0)
380
381 self.assertRaises(pyqueue.Full, put_nowait, 7)
382 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
383
384 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
385 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
386
387 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
388 self.assertTimingAlmostEqual(put.elapsed, 0)
389
390 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
391 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
392
393 child_can_start.set()
394 parent_can_continue.wait()
395
396 self.assertEqual(queue_empty(queue), True)
397 self.assertEqual(queue_full(queue, MAXSIZE), False)
398
399 proc.join()
400
401 def _test_get(self, queue, child_can_start, parent_can_continue):
402 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000403 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000404 queue.put(2)
405 queue.put(3)
406 queue.put(4)
407 queue.put(5)
408 parent_can_continue.set()
409
410 def test_get(self):
411 queue = self.Queue()
412 child_can_start = self.Event()
413 parent_can_continue = self.Event()
414
415 proc = self.Process(
416 target=self._test_get,
417 args=(queue, child_can_start, parent_can_continue)
418 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000419 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000420 proc.start()
421
422 self.assertEqual(queue_empty(queue), True)
423
424 child_can_start.set()
425 parent_can_continue.wait()
426
427 time.sleep(DELTA)
428 self.assertEqual(queue_empty(queue), False)
429
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000430 # Hangs unexpectedly, remove for now
431 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000432 self.assertEqual(queue.get(True, None), 2)
433 self.assertEqual(queue.get(True), 3)
434 self.assertEqual(queue.get(timeout=1), 4)
435 self.assertEqual(queue.get_nowait(), 5)
436
437 self.assertEqual(queue_empty(queue), True)
438
439 get = TimingWrapper(queue.get)
440 get_nowait = TimingWrapper(queue.get_nowait)
441
442 self.assertRaises(pyqueue.Empty, get, False)
443 self.assertTimingAlmostEqual(get.elapsed, 0)
444
445 self.assertRaises(pyqueue.Empty, get, False, None)
446 self.assertTimingAlmostEqual(get.elapsed, 0)
447
448 self.assertRaises(pyqueue.Empty, get_nowait)
449 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
450
451 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
452 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
453
454 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
455 self.assertTimingAlmostEqual(get.elapsed, 0)
456
457 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
458 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
459
460 proc.join()
461
462 def _test_fork(self, queue):
463 for i in range(10, 20):
464 queue.put(i)
465 # note that at this point the items may only be buffered, so the
466 # process cannot shutdown until the feeder thread has finished
467 # pushing items onto the pipe.
468
469 def test_fork(self):
470 # Old versions of Queue would fail to create a new feeder
471 # thread for a forked process if the original process had its
472 # own feeder thread. This test checks that this no longer
473 # happens.
474
475 queue = self.Queue()
476
477 # put items on queue so that main process starts a feeder thread
478 for i in range(10):
479 queue.put(i)
480
481 # wait to make sure thread starts before we fork a new process
482 time.sleep(DELTA)
483
484 # fork process
485 p = self.Process(target=self._test_fork, args=(queue,))
486 p.start()
487
488 # check that all expected items are in the queue
489 for i in range(20):
490 self.assertEqual(queue.get(), i)
491 self.assertRaises(pyqueue.Empty, queue.get, False)
492
493 p.join()
494
495 def test_qsize(self):
496 q = self.Queue()
497 try:
498 self.assertEqual(q.qsize(), 0)
499 except NotImplementedError:
500 return
501 q.put(1)
502 self.assertEqual(q.qsize(), 1)
503 q.put(5)
504 self.assertEqual(q.qsize(), 2)
505 q.get()
506 self.assertEqual(q.qsize(), 1)
507 q.get()
508 self.assertEqual(q.qsize(), 0)
509
510 def _test_task_done(self, q):
511 for obj in iter(q.get, None):
512 time.sleep(DELTA)
513 q.task_done()
514
515 def test_task_done(self):
516 queue = self.JoinableQueue()
517
518 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000519 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520
521 workers = [self.Process(target=self._test_task_done, args=(queue,))
522 for i in range(4)]
523
524 for p in workers:
525 p.start()
526
527 for i in range(10):
528 queue.put(i)
529
530 queue.join()
531
532 for p in workers:
533 queue.put(None)
534
535 for p in workers:
536 p.join()
537
538#
539#
540#
541
542class _TestLock(BaseTestCase):
543
544 def test_lock(self):
545 lock = self.Lock()
546 self.assertEqual(lock.acquire(), True)
547 self.assertEqual(lock.acquire(False), False)
548 self.assertEqual(lock.release(), None)
549 self.assertRaises((ValueError, threading.ThreadError), lock.release)
550
551 def test_rlock(self):
552 lock = self.RLock()
553 self.assertEqual(lock.acquire(), True)
554 self.assertEqual(lock.acquire(), True)
555 self.assertEqual(lock.acquire(), True)
556 self.assertEqual(lock.release(), None)
557 self.assertEqual(lock.release(), None)
558 self.assertEqual(lock.release(), None)
559 self.assertRaises((AssertionError, RuntimeError), lock.release)
560
Jesse Nollerf8d00852009-03-31 03:25:07 +0000561 def test_lock_context(self):
562 with self.Lock():
563 pass
564
Benjamin Petersone711caf2008-06-11 16:44:04 +0000565
566class _TestSemaphore(BaseTestCase):
567
568 def _test_semaphore(self, sem):
569 self.assertReturnsIfImplemented(2, get_value, sem)
570 self.assertEqual(sem.acquire(), True)
571 self.assertReturnsIfImplemented(1, get_value, sem)
572 self.assertEqual(sem.acquire(), True)
573 self.assertReturnsIfImplemented(0, get_value, sem)
574 self.assertEqual(sem.acquire(False), False)
575 self.assertReturnsIfImplemented(0, get_value, sem)
576 self.assertEqual(sem.release(), None)
577 self.assertReturnsIfImplemented(1, get_value, sem)
578 self.assertEqual(sem.release(), None)
579 self.assertReturnsIfImplemented(2, get_value, sem)
580
581 def test_semaphore(self):
582 sem = self.Semaphore(2)
583 self._test_semaphore(sem)
584 self.assertEqual(sem.release(), None)
585 self.assertReturnsIfImplemented(3, get_value, sem)
586 self.assertEqual(sem.release(), None)
587 self.assertReturnsIfImplemented(4, get_value, sem)
588
589 def test_bounded_semaphore(self):
590 sem = self.BoundedSemaphore(2)
591 self._test_semaphore(sem)
592 # Currently fails on OS/X
593 #if HAVE_GETVALUE:
594 # self.assertRaises(ValueError, sem.release)
595 # self.assertReturnsIfImplemented(2, get_value, sem)
596
597 def test_timeout(self):
598 if self.TYPE != 'processes':
599 return
600
601 sem = self.Semaphore(0)
602 acquire = TimingWrapper(sem.acquire)
603
604 self.assertEqual(acquire(False), False)
605 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
606
607 self.assertEqual(acquire(False, None), False)
608 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
609
610 self.assertEqual(acquire(False, TIMEOUT1), False)
611 self.assertTimingAlmostEqual(acquire.elapsed, 0)
612
613 self.assertEqual(acquire(True, TIMEOUT2), False)
614 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
615
616 self.assertEqual(acquire(timeout=TIMEOUT3), False)
617 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
618
619
620class _TestCondition(BaseTestCase):
621
622 def f(self, cond, sleeping, woken, timeout=None):
623 cond.acquire()
624 sleeping.release()
625 cond.wait(timeout)
626 woken.release()
627 cond.release()
628
629 def check_invariant(self, cond):
630 # this is only supposed to succeed when there are no sleepers
631 if self.TYPE == 'processes':
632 try:
633 sleepers = (cond._sleeping_count.get_value() -
634 cond._woken_count.get_value())
635 self.assertEqual(sleepers, 0)
636 self.assertEqual(cond._wait_semaphore.get_value(), 0)
637 except NotImplementedError:
638 pass
639
640 def test_notify(self):
641 cond = self.Condition()
642 sleeping = self.Semaphore(0)
643 woken = self.Semaphore(0)
644
645 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000646 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647 p.start()
648
649 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000650 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000651 p.start()
652
653 # wait for both children to start sleeping
654 sleeping.acquire()
655 sleeping.acquire()
656
657 # check no process/thread has woken up
658 time.sleep(DELTA)
659 self.assertReturnsIfImplemented(0, get_value, woken)
660
661 # wake up one process/thread
662 cond.acquire()
663 cond.notify()
664 cond.release()
665
666 # check one process/thread has woken up
667 time.sleep(DELTA)
668 self.assertReturnsIfImplemented(1, get_value, woken)
669
670 # wake up another
671 cond.acquire()
672 cond.notify()
673 cond.release()
674
675 # check other has woken up
676 time.sleep(DELTA)
677 self.assertReturnsIfImplemented(2, get_value, woken)
678
679 # check state is not mucked up
680 self.check_invariant(cond)
681 p.join()
682
683 def test_notify_all(self):
684 cond = self.Condition()
685 sleeping = self.Semaphore(0)
686 woken = self.Semaphore(0)
687
688 # start some threads/processes which will timeout
689 for i in range(3):
690 p = self.Process(target=self.f,
691 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000692 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000693 p.start()
694
695 t = threading.Thread(target=self.f,
696 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000697 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000698 t.start()
699
700 # wait for them all to sleep
701 for i in range(6):
702 sleeping.acquire()
703
704 # check they have all timed out
705 for i in range(6):
706 woken.acquire()
707 self.assertReturnsIfImplemented(0, get_value, woken)
708
709 # check state is not mucked up
710 self.check_invariant(cond)
711
712 # start some more threads/processes
713 for i in range(3):
714 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000715 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 p.start()
717
718 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000719 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000720 t.start()
721
722 # wait for them to all sleep
723 for i in range(6):
724 sleeping.acquire()
725
726 # check no process/thread has woken up
727 time.sleep(DELTA)
728 self.assertReturnsIfImplemented(0, get_value, woken)
729
730 # wake them all up
731 cond.acquire()
732 cond.notify_all()
733 cond.release()
734
735 # check they have all woken
736 time.sleep(DELTA)
737 self.assertReturnsIfImplemented(6, get_value, woken)
738
739 # check state is not mucked up
740 self.check_invariant(cond)
741
742 def test_timeout(self):
743 cond = self.Condition()
744 wait = TimingWrapper(cond.wait)
745 cond.acquire()
746 res = wait(TIMEOUT1)
747 cond.release()
748 self.assertEqual(res, None)
749 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
750
751
752class _TestEvent(BaseTestCase):
753
754 def _test_event(self, event):
755 time.sleep(TIMEOUT2)
756 event.set()
757
758 def test_event(self):
759 event = self.Event()
760 wait = TimingWrapper(event.wait)
761
762 # Removed temporaily, due to API shear, this does not
763 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000764 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000765
Benjamin Peterson965ce872009-04-05 21:24:58 +0000766 # Removed, threading.Event.wait() will return the value of the __flag
767 # instead of None. API Shear with the semaphore backed mp.Event
768 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000769 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000770 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000771 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
772
773 event.set()
774
775 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000776 self.assertEqual(event.is_set(), True)
777 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000778 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000779 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
781 # self.assertEqual(event.is_set(), True)
782
783 event.clear()
784
785 #self.assertEqual(event.is_set(), False)
786
787 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000788 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789
790#
791#
792#
793
794class _TestValue(BaseTestCase):
795
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000796 ALLOWED_TYPES = ('processes',)
797
Benjamin Petersone711caf2008-06-11 16:44:04 +0000798 codes_values = [
799 ('i', 4343, 24234),
800 ('d', 3.625, -4.25),
801 ('h', -232, 234),
802 ('c', latin('x'), latin('y'))
803 ]
804
805 def _test(self, values):
806 for sv, cv in zip(values, self.codes_values):
807 sv.value = cv[2]
808
809
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000810 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000812 if raw:
813 values = [self.RawValue(code, value)
814 for code, value, _ in self.codes_values]
815 else:
816 values = [self.Value(code, value)
817 for code, value, _ in self.codes_values]
818
819 for sv, cv in zip(values, self.codes_values):
820 self.assertEqual(sv.value, cv[1])
821
822 proc = self.Process(target=self._test, args=(values,))
823 proc.start()
824 proc.join()
825
826 for sv, cv in zip(values, self.codes_values):
827 self.assertEqual(sv.value, cv[2])
828
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000829 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000830 def test_rawvalue(self):
831 self.test_value(raw=True)
832
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000833 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 val1 = self.Value('i', 5)
836 lock1 = val1.get_lock()
837 obj1 = val1.get_obj()
838
839 val2 = self.Value('i', 5, lock=None)
840 lock2 = val2.get_lock()
841 obj2 = val2.get_obj()
842
843 lock = self.Lock()
844 val3 = self.Value('i', 5, lock=lock)
845 lock3 = val3.get_lock()
846 obj3 = val3.get_obj()
847 self.assertEqual(lock, lock3)
848
Jesse Nollerb0516a62009-01-18 03:11:38 +0000849 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000850 self.assertFalse(hasattr(arr4, 'get_lock'))
851 self.assertFalse(hasattr(arr4, 'get_obj'))
852
Jesse Nollerb0516a62009-01-18 03:11:38 +0000853 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
854
855 arr5 = self.RawValue('i', 5)
856 self.assertFalse(hasattr(arr5, 'get_lock'))
857 self.assertFalse(hasattr(arr5, 'get_obj'))
858
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859
860class _TestArray(BaseTestCase):
861
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000862 ALLOWED_TYPES = ('processes',)
863
Benjamin Petersone711caf2008-06-11 16:44:04 +0000864 def f(self, seq):
865 for i in range(1, len(seq)):
866 seq[i] += seq[i-1]
867
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000868 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000870 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
871 if raw:
872 arr = self.RawArray('i', seq)
873 else:
874 arr = self.Array('i', seq)
875
876 self.assertEqual(len(arr), len(seq))
877 self.assertEqual(arr[3], seq[3])
878 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
879
880 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
881
882 self.assertEqual(list(arr[:]), seq)
883
884 self.f(seq)
885
886 p = self.Process(target=self.f, args=(arr,))
887 p.start()
888 p.join()
889
890 self.assertEqual(list(arr[:]), seq)
891
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000892 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000893 def test_rawarray(self):
894 self.test_array(raw=True)
895
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000896 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898 arr1 = self.Array('i', list(range(10)))
899 lock1 = arr1.get_lock()
900 obj1 = arr1.get_obj()
901
902 arr2 = self.Array('i', list(range(10)), lock=None)
903 lock2 = arr2.get_lock()
904 obj2 = arr2.get_obj()
905
906 lock = self.Lock()
907 arr3 = self.Array('i', list(range(10)), lock=lock)
908 lock3 = arr3.get_lock()
909 obj3 = arr3.get_obj()
910 self.assertEqual(lock, lock3)
911
Jesse Nollerb0516a62009-01-18 03:11:38 +0000912 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000913 self.assertFalse(hasattr(arr4, 'get_lock'))
914 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000915 self.assertRaises(AttributeError,
916 self.Array, 'i', range(10), lock='notalock')
917
918 arr5 = self.RawArray('i', range(10))
919 self.assertFalse(hasattr(arr5, 'get_lock'))
920 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000921
922#
923#
924#
925
926class _TestContainers(BaseTestCase):
927
928 ALLOWED_TYPES = ('manager',)
929
930 def test_list(self):
931 a = self.list(list(range(10)))
932 self.assertEqual(a[:], list(range(10)))
933
934 b = self.list()
935 self.assertEqual(b[:], [])
936
937 b.extend(list(range(5)))
938 self.assertEqual(b[:], list(range(5)))
939
940 self.assertEqual(b[2], 2)
941 self.assertEqual(b[2:10], [2,3,4])
942
943 b *= 2
944 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
945
946 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
947
948 self.assertEqual(a[:], list(range(10)))
949
950 d = [a, b]
951 e = self.list(d)
952 self.assertEqual(
953 e[:],
954 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
955 )
956
957 f = self.list([a])
958 a.append('hello')
959 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
960
961 def test_dict(self):
962 d = self.dict()
963 indices = list(range(65, 70))
964 for i in indices:
965 d[i] = chr(i)
966 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
967 self.assertEqual(sorted(d.keys()), indices)
968 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
969 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
970
971 def test_namespace(self):
972 n = self.Namespace()
973 n.name = 'Bob'
974 n.job = 'Builder'
975 n._hidden = 'hidden'
976 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
977 del n.job
978 self.assertEqual(str(n), "Namespace(name='Bob')")
979 self.assertTrue(hasattr(n, 'name'))
980 self.assertTrue(not hasattr(n, 'job'))
981
982#
983#
984#
985
986def sqr(x, wait=0.0):
987 time.sleep(wait)
988 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000989class _TestPool(BaseTestCase):
990
991 def test_apply(self):
992 papply = self.pool.apply
993 self.assertEqual(papply(sqr, (5,)), sqr(5))
994 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
995
996 def test_map(self):
997 pmap = self.pool.map
998 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
999 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1000 list(map(sqr, list(range(100)))))
1001
Georg Brandld80344f2009-08-13 12:26:19 +00001002 def test_map_chunksize(self):
1003 try:
1004 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1005 except multiprocessing.TimeoutError:
1006 self.fail("pool.map_async with chunksize stalled on null list")
1007
Benjamin Petersone711caf2008-06-11 16:44:04 +00001008 def test_async(self):
1009 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1010 get = TimingWrapper(res.get)
1011 self.assertEqual(get(), 49)
1012 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1013
1014 def test_async_timeout(self):
1015 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1016 get = TimingWrapper(res.get)
1017 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1018 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1019
1020 def test_imap(self):
1021 it = self.pool.imap(sqr, list(range(10)))
1022 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1023
1024 it = self.pool.imap(sqr, list(range(10)))
1025 for i in range(10):
1026 self.assertEqual(next(it), i*i)
1027 self.assertRaises(StopIteration, it.__next__)
1028
1029 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1030 for i in range(1000):
1031 self.assertEqual(next(it), i*i)
1032 self.assertRaises(StopIteration, it.__next__)
1033
1034 def test_imap_unordered(self):
1035 it = self.pool.imap_unordered(sqr, list(range(1000)))
1036 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1037
1038 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1039 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1040
1041 def test_make_pool(self):
1042 p = multiprocessing.Pool(3)
1043 self.assertEqual(3, len(p._pool))
1044 p.close()
1045 p.join()
1046
1047 def test_terminate(self):
1048 if self.TYPE == 'manager':
1049 # On Unix a forked process increfs each shared object to
1050 # which its parent process held a reference. If the
1051 # forked process gets terminated then there is likely to
1052 # be a reference leak. So to prevent
1053 # _TestZZZNumberOfObjects from failing we skip this test
1054 # when using a manager.
1055 return
1056
1057 result = self.pool.map_async(
1058 time.sleep, [0.1 for i in range(10000)], chunksize=1
1059 )
1060 self.pool.terminate()
1061 join = TimingWrapper(self.pool.join)
1062 join()
1063 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001064#
1065# Test that manager has expected number of shared objects left
1066#
1067
1068class _TestZZZNumberOfObjects(BaseTestCase):
1069 # Because test cases are sorted alphabetically, this one will get
1070 # run after all the other tests for the manager. It tests that
1071 # there have been no "reference leaks" for the manager's shared
1072 # objects. Note the comment in _TestPool.test_terminate().
1073 ALLOWED_TYPES = ('manager',)
1074
1075 def test_number_of_objects(self):
1076 EXPECTED_NUMBER = 1 # the pool object is still alive
1077 multiprocessing.active_children() # discard dead process objs
1078 gc.collect() # do garbage collection
1079 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001080 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001081 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001082 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001083 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001084
1085 self.assertEqual(refs, EXPECTED_NUMBER)
1086
1087#
1088# Test of creating a customized manager class
1089#
1090
1091from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1092
1093class FooBar(object):
1094 def f(self):
1095 return 'f()'
1096 def g(self):
1097 raise ValueError
1098 def _h(self):
1099 return '_h()'
1100
1101def baz():
1102 for i in range(10):
1103 yield i*i
1104
1105class IteratorProxy(BaseProxy):
1106 _exposed_ = ('next', '__next__')
1107 def __iter__(self):
1108 return self
1109 def __next__(self):
1110 return self._callmethod('next')
1111 def __next__(self):
1112 return self._callmethod('__next__')
1113
1114class MyManager(BaseManager):
1115 pass
1116
1117MyManager.register('Foo', callable=FooBar)
1118MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1119MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1120
1121
1122class _TestMyManager(BaseTestCase):
1123
1124 ALLOWED_TYPES = ('manager',)
1125
1126 def test_mymanager(self):
1127 manager = MyManager()
1128 manager.start()
1129
1130 foo = manager.Foo()
1131 bar = manager.Bar()
1132 baz = manager.baz()
1133
1134 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1135 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1136
1137 self.assertEqual(foo_methods, ['f', 'g'])
1138 self.assertEqual(bar_methods, ['f', '_h'])
1139
1140 self.assertEqual(foo.f(), 'f()')
1141 self.assertRaises(ValueError, foo.g)
1142 self.assertEqual(foo._callmethod('f'), 'f()')
1143 self.assertRaises(RemoteError, foo._callmethod, '_h')
1144
1145 self.assertEqual(bar.f(), 'f()')
1146 self.assertEqual(bar._h(), '_h()')
1147 self.assertEqual(bar._callmethod('f'), 'f()')
1148 self.assertEqual(bar._callmethod('_h'), '_h()')
1149
1150 self.assertEqual(list(baz), [i*i for i in range(10)])
1151
1152 manager.shutdown()
1153
1154#
1155# Test of connecting to a remote server and using xmlrpclib for serialization
1156#
1157
1158_queue = pyqueue.Queue()
1159def get_queue():
1160 return _queue
1161
1162class QueueManager(BaseManager):
1163 '''manager class used by server process'''
1164QueueManager.register('get_queue', callable=get_queue)
1165
1166class QueueManager2(BaseManager):
1167 '''manager class which specifies the same interface as QueueManager'''
1168QueueManager2.register('get_queue')
1169
1170
1171SERIALIZER = 'xmlrpclib'
1172
1173class _TestRemoteManager(BaseTestCase):
1174
1175 ALLOWED_TYPES = ('manager',)
1176
1177 def _putter(self, address, authkey):
1178 manager = QueueManager2(
1179 address=address, authkey=authkey, serializer=SERIALIZER
1180 )
1181 manager.connect()
1182 queue = manager.get_queue()
1183 queue.put(('hello world', None, True, 2.25))
1184
1185 def test_remote(self):
1186 authkey = os.urandom(32)
1187
1188 manager = QueueManager(
1189 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1190 )
1191 manager.start()
1192
1193 p = self.Process(target=self._putter, args=(manager.address, authkey))
1194 p.start()
1195
1196 manager2 = QueueManager2(
1197 address=manager.address, authkey=authkey, serializer=SERIALIZER
1198 )
1199 manager2.connect()
1200 queue = manager2.get_queue()
1201
1202 # Note that xmlrpclib will deserialize object as a list not a tuple
1203 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1204
1205 # Because we are using xmlrpclib for serialization instead of
1206 # pickle this will cause a serialization error.
1207 self.assertRaises(Exception, queue.put, time.sleep)
1208
1209 # Make queue finalizer run before the server is stopped
1210 del queue
1211 manager.shutdown()
1212
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001213class _TestManagerRestart(BaseTestCase):
1214
1215 def _putter(self, address, authkey):
1216 manager = QueueManager(
1217 address=address, authkey=authkey, serializer=SERIALIZER)
1218 manager.connect()
1219 queue = manager.get_queue()
1220 queue.put('hello world')
1221
1222 def test_rapid_restart(self):
1223 authkey = os.urandom(32)
R. David Murrayc009b442009-12-14 22:57:04 +00001224 port = test.support.find_unused_port()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001225 manager = QueueManager(
R. David Murrayc009b442009-12-14 22:57:04 +00001226 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001227 manager.start()
1228
1229 p = self.Process(target=self._putter, args=(manager.address, authkey))
1230 p.start()
1231 queue = manager.get_queue()
1232 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001233 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001234 manager.shutdown()
1235 manager = QueueManager(
R. David Murrayc009b442009-12-14 22:57:04 +00001236 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001237 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001238 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001239
Benjamin Petersone711caf2008-06-11 16:44:04 +00001240#
1241#
1242#
1243
1244SENTINEL = latin('')
1245
1246class _TestConnection(BaseTestCase):
1247
1248 ALLOWED_TYPES = ('processes', 'threads')
1249
1250 def _echo(self, conn):
1251 for msg in iter(conn.recv_bytes, SENTINEL):
1252 conn.send_bytes(msg)
1253 conn.close()
1254
1255 def test_connection(self):
1256 conn, child_conn = self.Pipe()
1257
1258 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001259 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001260 p.start()
1261
1262 seq = [1, 2.25, None]
1263 msg = latin('hello world')
1264 longmsg = msg * 10
1265 arr = array.array('i', list(range(4)))
1266
1267 if self.TYPE == 'processes':
1268 self.assertEqual(type(conn.fileno()), int)
1269
1270 self.assertEqual(conn.send(seq), None)
1271 self.assertEqual(conn.recv(), seq)
1272
1273 self.assertEqual(conn.send_bytes(msg), None)
1274 self.assertEqual(conn.recv_bytes(), msg)
1275
1276 if self.TYPE == 'processes':
1277 buffer = array.array('i', [0]*10)
1278 expected = list(arr) + [0] * (10 - len(arr))
1279 self.assertEqual(conn.send_bytes(arr), None)
1280 self.assertEqual(conn.recv_bytes_into(buffer),
1281 len(arr) * buffer.itemsize)
1282 self.assertEqual(list(buffer), expected)
1283
1284 buffer = array.array('i', [0]*10)
1285 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1286 self.assertEqual(conn.send_bytes(arr), None)
1287 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1288 len(arr) * buffer.itemsize)
1289 self.assertEqual(list(buffer), expected)
1290
1291 buffer = bytearray(latin(' ' * 40))
1292 self.assertEqual(conn.send_bytes(longmsg), None)
1293 try:
1294 res = conn.recv_bytes_into(buffer)
1295 except multiprocessing.BufferTooShort as e:
1296 self.assertEqual(e.args, (longmsg,))
1297 else:
1298 self.fail('expected BufferTooShort, got %s' % res)
1299
1300 poll = TimingWrapper(conn.poll)
1301
1302 self.assertEqual(poll(), False)
1303 self.assertTimingAlmostEqual(poll.elapsed, 0)
1304
1305 self.assertEqual(poll(TIMEOUT1), False)
1306 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1307
1308 conn.send(None)
1309
1310 self.assertEqual(poll(TIMEOUT1), True)
1311 self.assertTimingAlmostEqual(poll.elapsed, 0)
1312
1313 self.assertEqual(conn.recv(), None)
1314
1315 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1316 conn.send_bytes(really_big_msg)
1317 self.assertEqual(conn.recv_bytes(), really_big_msg)
1318
1319 conn.send_bytes(SENTINEL) # tell child to quit
1320 child_conn.close()
1321
1322 if self.TYPE == 'processes':
1323 self.assertEqual(conn.readable, True)
1324 self.assertEqual(conn.writable, True)
1325 self.assertRaises(EOFError, conn.recv)
1326 self.assertRaises(EOFError, conn.recv_bytes)
1327
1328 p.join()
1329
1330 def test_duplex_false(self):
1331 reader, writer = self.Pipe(duplex=False)
1332 self.assertEqual(writer.send(1), None)
1333 self.assertEqual(reader.recv(), 1)
1334 if self.TYPE == 'processes':
1335 self.assertEqual(reader.readable, True)
1336 self.assertEqual(reader.writable, False)
1337 self.assertEqual(writer.readable, False)
1338 self.assertEqual(writer.writable, True)
1339 self.assertRaises(IOError, reader.send, 2)
1340 self.assertRaises(IOError, writer.recv)
1341 self.assertRaises(IOError, writer.poll)
1342
1343 def test_spawn_close(self):
1344 # We test that a pipe connection can be closed by parent
1345 # process immediately after child is spawned. On Windows this
1346 # would have sometimes failed on old versions because
1347 # child_conn would be closed before the child got a chance to
1348 # duplicate it.
1349 conn, child_conn = self.Pipe()
1350
1351 p = self.Process(target=self._echo, args=(child_conn,))
1352 p.start()
1353 child_conn.close() # this might complete before child initializes
1354
1355 msg = latin('hello')
1356 conn.send_bytes(msg)
1357 self.assertEqual(conn.recv_bytes(), msg)
1358
1359 conn.send_bytes(SENTINEL)
1360 conn.close()
1361 p.join()
1362
1363 def test_sendbytes(self):
1364 if self.TYPE != 'processes':
1365 return
1366
1367 msg = latin('abcdefghijklmnopqrstuvwxyz')
1368 a, b = self.Pipe()
1369
1370 a.send_bytes(msg)
1371 self.assertEqual(b.recv_bytes(), msg)
1372
1373 a.send_bytes(msg, 5)
1374 self.assertEqual(b.recv_bytes(), msg[5:])
1375
1376 a.send_bytes(msg, 7, 8)
1377 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1378
1379 a.send_bytes(msg, 26)
1380 self.assertEqual(b.recv_bytes(), latin(''))
1381
1382 a.send_bytes(msg, 26, 0)
1383 self.assertEqual(b.recv_bytes(), latin(''))
1384
1385 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1386
1387 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1388
1389 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1390
1391 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1392
1393 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1394
Benjamin Petersone711caf2008-06-11 16:44:04 +00001395class _TestListenerClient(BaseTestCase):
1396
1397 ALLOWED_TYPES = ('processes', 'threads')
1398
1399 def _test(self, address):
1400 conn = self.connection.Client(address)
1401 conn.send('hello')
1402 conn.close()
1403
1404 def test_listener_client(self):
1405 for family in self.connection.families:
1406 l = self.connection.Listener(family=family)
1407 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001408 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001409 p.start()
1410 conn = l.accept()
1411 self.assertEqual(conn.recv(), 'hello')
1412 p.join()
1413 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001414#
1415# Test of sending connection and socket objects between processes
1416#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001417"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001418class _TestPicklingConnections(BaseTestCase):
1419
1420 ALLOWED_TYPES = ('processes',)
1421
1422 def _listener(self, conn, families):
1423 for fam in families:
1424 l = self.connection.Listener(family=fam)
1425 conn.send(l.address)
1426 new_conn = l.accept()
1427 conn.send(new_conn)
1428
1429 if self.TYPE == 'processes':
1430 l = socket.socket()
1431 l.bind(('localhost', 0))
1432 conn.send(l.getsockname())
1433 l.listen(1)
1434 new_conn, addr = l.accept()
1435 conn.send(new_conn)
1436
1437 conn.recv()
1438
1439 def _remote(self, conn):
1440 for (address, msg) in iter(conn.recv, None):
1441 client = self.connection.Client(address)
1442 client.send(msg.upper())
1443 client.close()
1444
1445 if self.TYPE == 'processes':
1446 address, msg = conn.recv()
1447 client = socket.socket()
1448 client.connect(address)
1449 client.sendall(msg.upper())
1450 client.close()
1451
1452 conn.close()
1453
1454 def test_pickling(self):
1455 try:
1456 multiprocessing.allow_connection_pickling()
1457 except ImportError:
1458 return
1459
1460 families = self.connection.families
1461
1462 lconn, lconn0 = self.Pipe()
1463 lp = self.Process(target=self._listener, args=(lconn0, families))
1464 lp.start()
1465 lconn0.close()
1466
1467 rconn, rconn0 = self.Pipe()
1468 rp = self.Process(target=self._remote, args=(rconn0,))
1469 rp.start()
1470 rconn0.close()
1471
1472 for fam in families:
1473 msg = ('This connection uses family %s' % fam).encode('ascii')
1474 address = lconn.recv()
1475 rconn.send((address, msg))
1476 new_conn = lconn.recv()
1477 self.assertEqual(new_conn.recv(), msg.upper())
1478
1479 rconn.send(None)
1480
1481 if self.TYPE == 'processes':
1482 msg = latin('This connection uses a normal socket')
1483 address = lconn.recv()
1484 rconn.send((address, msg))
1485 if hasattr(socket, 'fromfd'):
1486 new_conn = lconn.recv()
1487 self.assertEqual(new_conn.recv(100), msg.upper())
1488 else:
1489 # XXX On Windows with Py2.6 need to backport fromfd()
1490 discard = lconn.recv_bytes()
1491
1492 lconn.send(None)
1493
1494 rconn.close()
1495 lconn.close()
1496
1497 lp.join()
1498 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001499"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001500#
1501#
1502#
1503
1504class _TestHeap(BaseTestCase):
1505
1506 ALLOWED_TYPES = ('processes',)
1507
1508 def test_heap(self):
1509 iterations = 5000
1510 maxblocks = 50
1511 blocks = []
1512
1513 # create and destroy lots of blocks of different sizes
1514 for i in range(iterations):
1515 size = int(random.lognormvariate(0, 1) * 1000)
1516 b = multiprocessing.heap.BufferWrapper(size)
1517 blocks.append(b)
1518 if len(blocks) > maxblocks:
1519 i = random.randrange(maxblocks)
1520 del blocks[i]
1521
1522 # get the heap object
1523 heap = multiprocessing.heap.BufferWrapper._heap
1524
1525 # verify the state of the heap
1526 all = []
1527 occupied = 0
1528 for L in list(heap._len_to_seq.values()):
1529 for arena, start, stop in L:
1530 all.append((heap._arenas.index(arena), start, stop,
1531 stop-start, 'free'))
1532 for arena, start, stop in heap._allocated_blocks:
1533 all.append((heap._arenas.index(arena), start, stop,
1534 stop-start, 'occupied'))
1535 occupied += (stop-start)
1536
1537 all.sort()
1538
1539 for i in range(len(all)-1):
1540 (arena, start, stop) = all[i][:3]
1541 (narena, nstart, nstop) = all[i+1][:3]
1542 self.assertTrue((arena != narena and nstart == 0) or
1543 (stop == nstart))
1544
1545#
1546#
1547#
1548
Benjamin Petersone711caf2008-06-11 16:44:04 +00001549class _Foo(Structure):
1550 _fields_ = [
1551 ('x', c_int),
1552 ('y', c_double)
1553 ]
1554
1555class _TestSharedCTypes(BaseTestCase):
1556
1557 ALLOWED_TYPES = ('processes',)
1558
1559 def _double(self, x, y, foo, arr, string):
1560 x.value *= 2
1561 y.value *= 2
1562 foo.x *= 2
1563 foo.y *= 2
1564 string.value *= 2
1565 for i in range(len(arr)):
1566 arr[i] *= 2
1567
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001568 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001569 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001570 x = Value('i', 7, lock=lock)
1571 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1572 foo = Value(_Foo, 3, 2, lock=lock)
1573 arr = Array('d', list(range(10)), lock=lock)
1574 string = Array('c', 20, lock=lock)
1575 string.value = 'hello'
1576
1577 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1578 p.start()
1579 p.join()
1580
1581 self.assertEqual(x.value, 14)
1582 self.assertAlmostEqual(y.value, 2.0/3.0)
1583 self.assertEqual(foo.x, 6)
1584 self.assertAlmostEqual(foo.y, 4.0)
1585 for i in range(10):
1586 self.assertAlmostEqual(arr[i], i*2)
1587 self.assertEqual(string.value, latin('hellohello'))
1588
1589 def test_synchronize(self):
1590 self.test_sharedctypes(lock=True)
1591
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001592 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001593 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001594 foo = _Foo(2, 5.0)
1595 bar = copy(foo)
1596 foo.x = 0
1597 foo.y = 0
1598 self.assertEqual(bar.x, 2)
1599 self.assertAlmostEqual(bar.y, 5.0)
1600
1601#
1602#
1603#
1604
1605class _TestFinalize(BaseTestCase):
1606
1607 ALLOWED_TYPES = ('processes',)
1608
1609 def _test_finalize(self, conn):
1610 class Foo(object):
1611 pass
1612
1613 a = Foo()
1614 util.Finalize(a, conn.send, args=('a',))
1615 del a # triggers callback for a
1616
1617 b = Foo()
1618 close_b = util.Finalize(b, conn.send, args=('b',))
1619 close_b() # triggers callback for b
1620 close_b() # does nothing because callback has already been called
1621 del b # does nothing because callback has already been called
1622
1623 c = Foo()
1624 util.Finalize(c, conn.send, args=('c',))
1625
1626 d10 = Foo()
1627 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1628
1629 d01 = Foo()
1630 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1631 d02 = Foo()
1632 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1633 d03 = Foo()
1634 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1635
1636 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1637
1638 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1639
1640 # call mutliprocessing's cleanup function then exit process without
1641 # garbage collecting locals
1642 util._exit_function()
1643 conn.close()
1644 os._exit(0)
1645
1646 def test_finalize(self):
1647 conn, child_conn = self.Pipe()
1648
1649 p = self.Process(target=self._test_finalize, args=(child_conn,))
1650 p.start()
1651 p.join()
1652
1653 result = [obj for obj in iter(conn.recv, 'STOP')]
1654 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1655
1656#
1657# Test that from ... import * works for each module
1658#
1659
1660class _TestImportStar(BaseTestCase):
1661
1662 ALLOWED_TYPES = ('processes',)
1663
1664 def test_import(self):
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001665 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001666 'multiprocessing', 'multiprocessing.connection',
1667 'multiprocessing.heap', 'multiprocessing.managers',
1668 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001669 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001670 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001671 ]
1672
1673 if c_int is not None:
1674 # This module requires _ctypes
1675 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001676
1677 for name in modules:
1678 __import__(name)
1679 mod = sys.modules[name]
1680
1681 for attr in getattr(mod, '__all__', ()):
1682 self.assertTrue(
1683 hasattr(mod, attr),
1684 '%r does not have attribute %r' % (mod, attr)
1685 )
1686
1687#
1688# Quick test that logging works -- does not test logging output
1689#
1690
1691class _TestLogging(BaseTestCase):
1692
1693 ALLOWED_TYPES = ('processes',)
1694
1695 def test_enable_logging(self):
1696 logger = multiprocessing.get_logger()
1697 logger.setLevel(util.SUBWARNING)
1698 self.assertTrue(logger is not None)
1699 logger.debug('this will not be printed')
1700 logger.info('nor will this')
1701 logger.setLevel(LOG_LEVEL)
1702
1703 def _test_level(self, conn):
1704 logger = multiprocessing.get_logger()
1705 conn.send(logger.getEffectiveLevel())
1706
1707 def test_level(self):
1708 LEVEL1 = 32
1709 LEVEL2 = 37
1710
1711 logger = multiprocessing.get_logger()
1712 root_logger = logging.getLogger()
1713 root_level = root_logger.level
1714
1715 reader, writer = multiprocessing.Pipe(duplex=False)
1716
1717 logger.setLevel(LEVEL1)
1718 self.Process(target=self._test_level, args=(writer,)).start()
1719 self.assertEqual(LEVEL1, reader.recv())
1720
1721 logger.setLevel(logging.NOTSET)
1722 root_logger.setLevel(LEVEL2)
1723 self.Process(target=self._test_level, args=(writer,)).start()
1724 self.assertEqual(LEVEL2, reader.recv())
1725
1726 root_logger.setLevel(root_level)
1727 logger.setLevel(level=LOG_LEVEL)
1728
1729#
Jesse Noller6214edd2009-01-19 16:23:53 +00001730# Test to verify handle verification, see issue 3321
1731#
1732
1733class TestInvalidHandle(unittest.TestCase):
1734
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001735 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001736 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001737 conn = _multiprocessing.Connection(44977608)
1738 self.assertRaises(IOError, conn.poll)
1739 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001740
Jesse Noller6214edd2009-01-19 16:23:53 +00001741#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001742# Functions used to create test cases from the base ones in this module
1743#
1744
1745def get_attributes(Source, names):
1746 d = {}
1747 for name in names:
1748 obj = getattr(Source, name)
1749 if type(obj) == type(get_attributes):
1750 obj = staticmethod(obj)
1751 d[name] = obj
1752 return d
1753
1754def create_test_cases(Mixin, type):
1755 result = {}
1756 glob = globals()
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001757 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001758
1759 for name in list(glob.keys()):
1760 if name.startswith('_Test'):
1761 base = glob[name]
1762 if type in base.ALLOWED_TYPES:
1763 newname = 'With' + Type + name[1:]
1764 class Temp(base, unittest.TestCase, Mixin):
1765 pass
1766 result[newname] = Temp
1767 Temp.__name__ = newname
1768 Temp.__module__ = Mixin.__module__
1769 return result
1770
1771#
1772# Create test cases
1773#
1774
1775class ProcessesMixin(object):
1776 TYPE = 'processes'
1777 Process = multiprocessing.Process
1778 locals().update(get_attributes(multiprocessing, (
1779 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1780 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1781 'RawArray', 'current_process', 'active_children', 'Pipe',
1782 'connection', 'JoinableQueue'
1783 )))
1784
1785testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1786globals().update(testcases_processes)
1787
1788
1789class ManagerMixin(object):
1790 TYPE = 'manager'
1791 Process = multiprocessing.Process
1792 manager = object.__new__(multiprocessing.managers.SyncManager)
1793 locals().update(get_attributes(manager, (
1794 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1795 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1796 'Namespace', 'JoinableQueue'
1797 )))
1798
1799testcases_manager = create_test_cases(ManagerMixin, type='manager')
1800globals().update(testcases_manager)
1801
1802
1803class ThreadsMixin(object):
1804 TYPE = 'threads'
1805 Process = multiprocessing.dummy.Process
1806 locals().update(get_attributes(multiprocessing.dummy, (
1807 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1808 'Condition', 'Event', 'Value', 'Array', 'current_process',
1809 'active_children', 'Pipe', 'connection', 'dict', 'list',
1810 'Namespace', 'JoinableQueue'
1811 )))
1812
1813testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1814globals().update(testcases_threads)
1815
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001816class OtherTest(unittest.TestCase):
1817 # TODO: add more tests for deliver/answer challenge.
1818 def test_deliver_challenge_auth_failure(self):
1819 class _FakeConnection(object):
1820 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001821 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001822 def send_bytes(self, data):
1823 pass
1824 self.assertRaises(multiprocessing.AuthenticationError,
1825 multiprocessing.connection.deliver_challenge,
1826 _FakeConnection(), b'abc')
1827
1828 def test_answer_challenge_auth_failure(self):
1829 class _FakeConnection(object):
1830 def __init__(self):
1831 self.count = 0
1832 def recv_bytes(self, size):
1833 self.count += 1
1834 if self.count == 1:
1835 return multiprocessing.connection.CHALLENGE
1836 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001837 return b'something bogus'
1838 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001839 def send_bytes(self, data):
1840 pass
1841 self.assertRaises(multiprocessing.AuthenticationError,
1842 multiprocessing.connection.answer_challenge,
1843 _FakeConnection(), b'abc')
1844
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001845#
1846# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1847#
1848
1849def initializer(ns):
1850 ns.test += 1
1851
1852class TestInitializers(unittest.TestCase):
1853 def setUp(self):
1854 self.mgr = multiprocessing.Manager()
1855 self.ns = self.mgr.Namespace()
1856 self.ns.test = 0
1857
1858 def tearDown(self):
1859 self.mgr.shutdown()
1860
1861 def test_manager_initializer(self):
1862 m = multiprocessing.managers.SyncManager()
1863 self.assertRaises(TypeError, m.start, 1)
1864 m.start(initializer, (self.ns,))
1865 self.assertEqual(self.ns.test, 1)
1866 m.shutdown()
1867
1868 def test_pool_initializer(self):
1869 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1870 p = multiprocessing.Pool(1, initializer, (self.ns,))
1871 p.close()
1872 p.join()
1873 self.assertEqual(self.ns.test, 1)
1874
R. David Murraya44c6b32009-07-29 15:40:30 +00001875#
1876# Issue 5155, 5313, 5331: Test process in processes
1877# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1878#
1879
1880def _ThisSubProcess(q):
1881 try:
1882 item = q.get(block=False)
1883 except pyqueue.Empty:
1884 pass
1885
1886def _TestProcess(q):
1887 queue = multiprocessing.Queue()
1888 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1889 subProc.start()
1890 subProc.join()
1891
1892def _afunc(x):
1893 return x*x
1894
1895def pool_in_process():
1896 pool = multiprocessing.Pool(processes=4)
1897 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1898
1899class _file_like(object):
1900 def __init__(self, delegate):
1901 self._delegate = delegate
1902 self._pid = None
1903
1904 @property
1905 def cache(self):
1906 pid = os.getpid()
1907 # There are no race conditions since fork keeps only the running thread
1908 if pid != self._pid:
1909 self._pid = pid
1910 self._cache = []
1911 return self._cache
1912
1913 def write(self, data):
1914 self.cache.append(data)
1915
1916 def flush(self):
1917 self._delegate.write(''.join(self.cache))
1918 self._cache = []
1919
1920class TestStdinBadfiledescriptor(unittest.TestCase):
1921
1922 def test_queue_in_process(self):
1923 queue = multiprocessing.Queue()
1924 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1925 proc.start()
1926 proc.join()
1927
1928 def test_pool_in_process(self):
1929 p = multiprocessing.Process(target=pool_in_process)
1930 p.start()
1931 p.join()
1932
1933 def test_flushing(self):
1934 sio = io.StringIO()
1935 flike = _file_like(sio)
1936 flike.write('foo')
1937 proc = multiprocessing.Process(target=lambda: flike.flush())
1938 flike.flush()
1939 assert sio.getvalue() == 'foo'
1940
1941testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1942 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001943
Benjamin Petersone711caf2008-06-11 16:44:04 +00001944#
1945#
1946#
1947
1948def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001949 if sys.platform.startswith("linux"):
1950 try:
1951 lock = multiprocessing.RLock()
1952 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00001953 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001954
Benjamin Petersone711caf2008-06-11 16:44:04 +00001955 if run is None:
1956 from test.support import run_unittest as run
1957
1958 util.get_temp_dir() # creates temp directory for use by all processes
1959
1960 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1961
Benjamin Peterson41181742008-07-02 20:22:54 +00001962 ProcessesMixin.pool = multiprocessing.Pool(4)
1963 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1964 ManagerMixin.manager.__init__()
1965 ManagerMixin.manager.start()
1966 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001967
1968 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001969 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1970 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001971 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1972 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001973 )
1974
1975 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1976 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1977 run(suite)
1978
Benjamin Peterson41181742008-07-02 20:22:54 +00001979 ThreadsMixin.pool.terminate()
1980 ProcessesMixin.pool.terminate()
1981 ManagerMixin.pool.terminate()
1982 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001983
Benjamin Peterson41181742008-07-02 20:22:54 +00001984 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001985
1986def main():
1987 test_main(unittest.TextTestRunner(verbosity=2).run)
1988
1989if __name__ == '__main__':
1990 main()