blob: 3f6fc8b896c94f448dc3e760f7d5b7fd35a85b7a [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
R. David Murraya44c6b32009-07-29 15:40:30 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
15import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000033
34from multiprocessing import util
35
36#
37#
38#
39
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000040def latin(s):
41 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000042
Benjamin Petersone711caf2008-06-11 16:44:04 +000043#
44# Constants
45#
46
47LOG_LEVEL = util.SUBWARNING
48#LOG_LEVEL = logging.WARNING
49
50DELTA = 0.1
51CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
59
60HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
62
Jesse Noller6214edd2009-01-19 16:23:53 +000063WIN32 = (sys.platform == "win32")
64
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
Florent Xicluna9b0e9182010-03-28 11:42:38 +000066# Some tests require ctypes
67#
68
69try:
Florent Xiclunab4efb3d2010-08-14 18:24:40 +000070 from ctypes import Structure, c_int, c_double
Florent Xicluna9b0e9182010-03-28 11:42:38 +000071except ImportError:
72 Structure = object
73 c_int = c_double = None
74
Florent Xiclunab4efb3d2010-08-14 18:24:40 +000075try:
76 from ctypes import Value
77except ImportError:
78 Value = None
79
80try:
81 from ctypes import copy as ctypes_copy
82except ImportError:
83 ctypes_copy = None
84
Florent Xicluna9b0e9182010-03-28 11:42:38 +000085#
Benjamin Petersone711caf2008-06-11 16:44:04 +000086# Creates a wrapper for a function which records the time it takes to finish
87#
88
89class TimingWrapper(object):
90
91 def __init__(self, func):
92 self.func = func
93 self.elapsed = None
94
95 def __call__(self, *args, **kwds):
96 t = time.time()
97 try:
98 return self.func(*args, **kwds)
99 finally:
100 self.elapsed = time.time() - t
101
102#
103# Base class for test cases
104#
105
106class BaseTestCase(object):
107
108 ALLOWED_TYPES = ('processes', 'manager', 'threads')
109
110 def assertTimingAlmostEqual(self, a, b):
111 if CHECK_TIMINGS:
112 self.assertAlmostEqual(a, b, 1)
113
114 def assertReturnsIfImplemented(self, value, func, *args):
115 try:
116 res = func(*args)
117 except NotImplementedError:
118 pass
119 else:
120 return self.assertEqual(value, res)
121
122#
123# Return the value of a semaphore
124#
125
126def get_value(self):
127 try:
128 return self.get_value()
129 except AttributeError:
130 try:
131 return self._Semaphore__value
132 except AttributeError:
133 try:
134 return self._value
135 except AttributeError:
136 raise NotImplementedError
137
138#
139# Testcases
140#
141
142class _TestProcess(BaseTestCase):
143
144 ALLOWED_TYPES = ('processes', 'threads')
145
146 def test_current(self):
147 if self.TYPE == 'threads':
148 return
149
150 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000151 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152
153 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000154 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155 self.assertTrue(isinstance(authkey, bytes))
156 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 self.assertEqual(current.ident, os.getpid())
158 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159
160 def _test(self, q, *args, **kwds):
161 current = self.current_process()
162 q.put(args)
163 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000166 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000167 q.put(current.pid)
168
169 def test_process(self):
170 q = self.Queue(1)
171 e = self.Event()
172 args = (q, 1, 2)
173 kwargs = {'hello':23, 'bye':2.54}
174 name = 'SomeProcess'
175 p = self.Process(
176 target=self._test, args=args, kwargs=kwargs, name=name
177 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179 current = self.current_process()
180
181 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000182 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000184 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 self.assertTrue(p not in self.active_children())
186 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000187 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000188
189 p.start()
190
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192 self.assertEquals(p.is_alive(), True)
193 self.assertTrue(p in self.active_children())
194
195 self.assertEquals(q.get(), args[1:])
196 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000197 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000199 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000200 self.assertEquals(q.get(), p.pid)
201
202 p.join()
203
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205 self.assertEquals(p.is_alive(), False)
206 self.assertTrue(p not in self.active_children())
207
208 def _test_terminate(self):
209 time.sleep(1000)
210
211 def test_terminate(self):
212 if self.TYPE == 'threads':
213 return
214
215 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000216 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217 p.start()
218
219 self.assertEqual(p.is_alive(), True)
220 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000221 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222
223 p.terminate()
224
225 join = TimingWrapper(p.join)
226 self.assertEqual(join(), None)
227 self.assertTimingAlmostEqual(join.elapsed, 0.0)
228
229 self.assertEqual(p.is_alive(), False)
230 self.assertTrue(p not in self.active_children())
231
232 p.join()
233
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000234 # XXX sometimes get p.exitcode == 0 on Windows ...
235 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000236
237 def test_cpu_count(self):
238 try:
239 cpus = multiprocessing.cpu_count()
240 except NotImplementedError:
241 cpus = 1
242 self.assertTrue(type(cpus) is int)
243 self.assertTrue(cpus >= 1)
244
245 def test_active_children(self):
246 self.assertEqual(type(self.active_children()), list)
247
248 p = self.Process(target=time.sleep, args=(DELTA,))
249 self.assertTrue(p not in self.active_children())
250
251 p.start()
252 self.assertTrue(p in self.active_children())
253
254 p.join()
255 self.assertTrue(p not in self.active_children())
256
257 def _test_recursion(self, wconn, id):
258 from multiprocessing import forking
259 wconn.send(id)
260 if len(id) < 2:
261 for i in range(2):
262 p = self.Process(
263 target=self._test_recursion, args=(wconn, id+[i])
264 )
265 p.start()
266 p.join()
267
268 def test_recursion(self):
269 rconn, wconn = self.Pipe(duplex=False)
270 self._test_recursion(wconn, [])
271
272 time.sleep(DELTA)
273 result = []
274 while rconn.poll():
275 result.append(rconn.recv())
276
277 expected = [
278 [],
279 [0],
280 [0, 0],
281 [0, 1],
282 [1],
283 [1, 0],
284 [1, 1]
285 ]
286 self.assertEqual(result, expected)
287
288#
289#
290#
291
292class _UpperCaser(multiprocessing.Process):
293
294 def __init__(self):
295 multiprocessing.Process.__init__(self)
296 self.child_conn, self.parent_conn = multiprocessing.Pipe()
297
298 def run(self):
299 self.parent_conn.close()
300 for s in iter(self.child_conn.recv, None):
301 self.child_conn.send(s.upper())
302 self.child_conn.close()
303
304 def submit(self, s):
305 assert type(s) is str
306 self.parent_conn.send(s)
307 return self.parent_conn.recv()
308
309 def stop(self):
310 self.parent_conn.send(None)
311 self.parent_conn.close()
312 self.child_conn.close()
313
314class _TestSubclassingProcess(BaseTestCase):
315
316 ALLOWED_TYPES = ('processes',)
317
318 def test_subclassing(self):
319 uppercaser = _UpperCaser()
320 uppercaser.start()
321 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
322 self.assertEqual(uppercaser.submit('world'), 'WORLD')
323 uppercaser.stop()
324 uppercaser.join()
325
326#
327#
328#
329
330def queue_empty(q):
331 if hasattr(q, 'empty'):
332 return q.empty()
333 else:
334 return q.qsize() == 0
335
336def queue_full(q, maxsize):
337 if hasattr(q, 'full'):
338 return q.full()
339 else:
340 return q.qsize() == maxsize
341
342
343class _TestQueue(BaseTestCase):
344
345
346 def _test_put(self, queue, child_can_start, parent_can_continue):
347 child_can_start.wait()
348 for i in range(6):
349 queue.get()
350 parent_can_continue.set()
351
352 def test_put(self):
353 MAXSIZE = 6
354 queue = self.Queue(maxsize=MAXSIZE)
355 child_can_start = self.Event()
356 parent_can_continue = self.Event()
357
358 proc = self.Process(
359 target=self._test_put,
360 args=(queue, child_can_start, parent_can_continue)
361 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000362 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000363 proc.start()
364
365 self.assertEqual(queue_empty(queue), True)
366 self.assertEqual(queue_full(queue, MAXSIZE), False)
367
368 queue.put(1)
369 queue.put(2, True)
370 queue.put(3, True, None)
371 queue.put(4, False)
372 queue.put(5, False, None)
373 queue.put_nowait(6)
374
375 # the values may be in buffer but not yet in pipe so sleep a bit
376 time.sleep(DELTA)
377
378 self.assertEqual(queue_empty(queue), False)
379 self.assertEqual(queue_full(queue, MAXSIZE), True)
380
381 put = TimingWrapper(queue.put)
382 put_nowait = TimingWrapper(queue.put_nowait)
383
384 self.assertRaises(pyqueue.Full, put, 7, False)
385 self.assertTimingAlmostEqual(put.elapsed, 0)
386
387 self.assertRaises(pyqueue.Full, put, 7, False, None)
388 self.assertTimingAlmostEqual(put.elapsed, 0)
389
390 self.assertRaises(pyqueue.Full, put_nowait, 7)
391 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
392
393 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
394 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
395
396 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
397 self.assertTimingAlmostEqual(put.elapsed, 0)
398
399 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
400 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
401
402 child_can_start.set()
403 parent_can_continue.wait()
404
405 self.assertEqual(queue_empty(queue), True)
406 self.assertEqual(queue_full(queue, MAXSIZE), False)
407
408 proc.join()
409
410 def _test_get(self, queue, child_can_start, parent_can_continue):
411 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000412 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000413 queue.put(2)
414 queue.put(3)
415 queue.put(4)
416 queue.put(5)
417 parent_can_continue.set()
418
419 def test_get(self):
420 queue = self.Queue()
421 child_can_start = self.Event()
422 parent_can_continue = self.Event()
423
424 proc = self.Process(
425 target=self._test_get,
426 args=(queue, child_can_start, parent_can_continue)
427 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000428 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000429 proc.start()
430
431 self.assertEqual(queue_empty(queue), True)
432
433 child_can_start.set()
434 parent_can_continue.wait()
435
436 time.sleep(DELTA)
437 self.assertEqual(queue_empty(queue), False)
438
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000439 # Hangs unexpectedly, remove for now
440 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 self.assertEqual(queue.get(True, None), 2)
442 self.assertEqual(queue.get(True), 3)
443 self.assertEqual(queue.get(timeout=1), 4)
444 self.assertEqual(queue.get_nowait(), 5)
445
446 self.assertEqual(queue_empty(queue), True)
447
448 get = TimingWrapper(queue.get)
449 get_nowait = TimingWrapper(queue.get_nowait)
450
451 self.assertRaises(pyqueue.Empty, get, False)
452 self.assertTimingAlmostEqual(get.elapsed, 0)
453
454 self.assertRaises(pyqueue.Empty, get, False, None)
455 self.assertTimingAlmostEqual(get.elapsed, 0)
456
457 self.assertRaises(pyqueue.Empty, get_nowait)
458 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
459
460 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
461 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
462
463 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
464 self.assertTimingAlmostEqual(get.elapsed, 0)
465
466 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
467 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
468
469 proc.join()
470
471 def _test_fork(self, queue):
472 for i in range(10, 20):
473 queue.put(i)
474 # note that at this point the items may only be buffered, so the
475 # process cannot shutdown until the feeder thread has finished
476 # pushing items onto the pipe.
477
478 def test_fork(self):
479 # Old versions of Queue would fail to create a new feeder
480 # thread for a forked process if the original process had its
481 # own feeder thread. This test checks that this no longer
482 # happens.
483
484 queue = self.Queue()
485
486 # put items on queue so that main process starts a feeder thread
487 for i in range(10):
488 queue.put(i)
489
490 # wait to make sure thread starts before we fork a new process
491 time.sleep(DELTA)
492
493 # fork process
494 p = self.Process(target=self._test_fork, args=(queue,))
495 p.start()
496
497 # check that all expected items are in the queue
498 for i in range(20):
499 self.assertEqual(queue.get(), i)
500 self.assertRaises(pyqueue.Empty, queue.get, False)
501
502 p.join()
503
504 def test_qsize(self):
505 q = self.Queue()
506 try:
507 self.assertEqual(q.qsize(), 0)
508 except NotImplementedError:
509 return
510 q.put(1)
511 self.assertEqual(q.qsize(), 1)
512 q.put(5)
513 self.assertEqual(q.qsize(), 2)
514 q.get()
515 self.assertEqual(q.qsize(), 1)
516 q.get()
517 self.assertEqual(q.qsize(), 0)
518
519 def _test_task_done(self, q):
520 for obj in iter(q.get, None):
521 time.sleep(DELTA)
522 q.task_done()
523
524 def test_task_done(self):
525 queue = self.JoinableQueue()
526
527 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000528 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000529
530 workers = [self.Process(target=self._test_task_done, args=(queue,))
531 for i in range(4)]
532
533 for p in workers:
534 p.start()
535
536 for i in range(10):
537 queue.put(i)
538
539 queue.join()
540
541 for p in workers:
542 queue.put(None)
543
544 for p in workers:
545 p.join()
546
547#
548#
549#
550
551class _TestLock(BaseTestCase):
552
553 def test_lock(self):
554 lock = self.Lock()
555 self.assertEqual(lock.acquire(), True)
556 self.assertEqual(lock.acquire(False), False)
557 self.assertEqual(lock.release(), None)
558 self.assertRaises((ValueError, threading.ThreadError), lock.release)
559
560 def test_rlock(self):
561 lock = self.RLock()
562 self.assertEqual(lock.acquire(), True)
563 self.assertEqual(lock.acquire(), True)
564 self.assertEqual(lock.acquire(), True)
565 self.assertEqual(lock.release(), None)
566 self.assertEqual(lock.release(), None)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((AssertionError, RuntimeError), lock.release)
569
Jesse Nollerf8d00852009-03-31 03:25:07 +0000570 def test_lock_context(self):
571 with self.Lock():
572 pass
573
Benjamin Petersone711caf2008-06-11 16:44:04 +0000574
575class _TestSemaphore(BaseTestCase):
576
577 def _test_semaphore(self, sem):
578 self.assertReturnsIfImplemented(2, get_value, sem)
579 self.assertEqual(sem.acquire(), True)
580 self.assertReturnsIfImplemented(1, get_value, sem)
581 self.assertEqual(sem.acquire(), True)
582 self.assertReturnsIfImplemented(0, get_value, sem)
583 self.assertEqual(sem.acquire(False), False)
584 self.assertReturnsIfImplemented(0, get_value, sem)
585 self.assertEqual(sem.release(), None)
586 self.assertReturnsIfImplemented(1, get_value, sem)
587 self.assertEqual(sem.release(), None)
588 self.assertReturnsIfImplemented(2, get_value, sem)
589
590 def test_semaphore(self):
591 sem = self.Semaphore(2)
592 self._test_semaphore(sem)
593 self.assertEqual(sem.release(), None)
594 self.assertReturnsIfImplemented(3, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(4, get_value, sem)
597
598 def test_bounded_semaphore(self):
599 sem = self.BoundedSemaphore(2)
600 self._test_semaphore(sem)
601 # Currently fails on OS/X
602 #if HAVE_GETVALUE:
603 # self.assertRaises(ValueError, sem.release)
604 # self.assertReturnsIfImplemented(2, get_value, sem)
605
606 def test_timeout(self):
607 if self.TYPE != 'processes':
608 return
609
610 sem = self.Semaphore(0)
611 acquire = TimingWrapper(sem.acquire)
612
613 self.assertEqual(acquire(False), False)
614 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
615
616 self.assertEqual(acquire(False, None), False)
617 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
618
619 self.assertEqual(acquire(False, TIMEOUT1), False)
620 self.assertTimingAlmostEqual(acquire.elapsed, 0)
621
622 self.assertEqual(acquire(True, TIMEOUT2), False)
623 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
624
625 self.assertEqual(acquire(timeout=TIMEOUT3), False)
626 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
627
628
629class _TestCondition(BaseTestCase):
630
631 def f(self, cond, sleeping, woken, timeout=None):
632 cond.acquire()
633 sleeping.release()
634 cond.wait(timeout)
635 woken.release()
636 cond.release()
637
638 def check_invariant(self, cond):
639 # this is only supposed to succeed when there are no sleepers
640 if self.TYPE == 'processes':
641 try:
642 sleepers = (cond._sleeping_count.get_value() -
643 cond._woken_count.get_value())
644 self.assertEqual(sleepers, 0)
645 self.assertEqual(cond._wait_semaphore.get_value(), 0)
646 except NotImplementedError:
647 pass
648
649 def test_notify(self):
650 cond = self.Condition()
651 sleeping = self.Semaphore(0)
652 woken = self.Semaphore(0)
653
654 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000655 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000656 p.start()
657
658 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000659 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000660 p.start()
661
662 # wait for both children to start sleeping
663 sleeping.acquire()
664 sleeping.acquire()
665
666 # check no process/thread has woken up
667 time.sleep(DELTA)
668 self.assertReturnsIfImplemented(0, get_value, woken)
669
670 # wake up one process/thread
671 cond.acquire()
672 cond.notify()
673 cond.release()
674
675 # check one process/thread has woken up
676 time.sleep(DELTA)
677 self.assertReturnsIfImplemented(1, get_value, woken)
678
679 # wake up another
680 cond.acquire()
681 cond.notify()
682 cond.release()
683
684 # check other has woken up
685 time.sleep(DELTA)
686 self.assertReturnsIfImplemented(2, get_value, woken)
687
688 # check state is not mucked up
689 self.check_invariant(cond)
690 p.join()
691
692 def test_notify_all(self):
693 cond = self.Condition()
694 sleeping = self.Semaphore(0)
695 woken = self.Semaphore(0)
696
697 # start some threads/processes which will timeout
698 for i in range(3):
699 p = self.Process(target=self.f,
700 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000701 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000702 p.start()
703
704 t = threading.Thread(target=self.f,
705 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000706 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000707 t.start()
708
709 # wait for them all to sleep
710 for i in range(6):
711 sleeping.acquire()
712
713 # check they have all timed out
714 for i in range(6):
715 woken.acquire()
716 self.assertReturnsIfImplemented(0, get_value, woken)
717
718 # check state is not mucked up
719 self.check_invariant(cond)
720
721 # start some more threads/processes
722 for i in range(3):
723 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000724 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000725 p.start()
726
727 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000728 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729 t.start()
730
731 # wait for them to all sleep
732 for i in range(6):
733 sleeping.acquire()
734
735 # check no process/thread has woken up
736 time.sleep(DELTA)
737 self.assertReturnsIfImplemented(0, get_value, woken)
738
739 # wake them all up
740 cond.acquire()
741 cond.notify_all()
742 cond.release()
743
744 # check they have all woken
745 time.sleep(DELTA)
746 self.assertReturnsIfImplemented(6, get_value, woken)
747
748 # check state is not mucked up
749 self.check_invariant(cond)
750
751 def test_timeout(self):
752 cond = self.Condition()
753 wait = TimingWrapper(cond.wait)
754 cond.acquire()
755 res = wait(TIMEOUT1)
756 cond.release()
757 self.assertEqual(res, None)
758 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
759
760
761class _TestEvent(BaseTestCase):
762
763 def _test_event(self, event):
764 time.sleep(TIMEOUT2)
765 event.set()
766
767 def test_event(self):
768 event = self.Event()
769 wait = TimingWrapper(event.wait)
770
771 # Removed temporaily, due to API shear, this does not
772 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000773 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000774
Benjamin Peterson965ce872009-04-05 21:24:58 +0000775 # Removed, threading.Event.wait() will return the value of the __flag
776 # instead of None. API Shear with the semaphore backed mp.Event
777 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000778 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000779 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
781
782 event.set()
783
784 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000785 self.assertEqual(event.is_set(), True)
786 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000788 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
790 # self.assertEqual(event.is_set(), True)
791
792 event.clear()
793
794 #self.assertEqual(event.is_set(), False)
795
796 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000797 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000798
799#
800#
801#
802
803class _TestValue(BaseTestCase):
804
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000805 ALLOWED_TYPES = ('processes',)
806
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807 codes_values = [
808 ('i', 4343, 24234),
809 ('d', 3.625, -4.25),
810 ('h', -232, 234),
811 ('c', latin('x'), latin('y'))
812 ]
813
814 def _test(self, values):
815 for sv, cv in zip(values, self.codes_values):
816 sv.value = cv[2]
817
818
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000819 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000820 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000821 if raw:
822 values = [self.RawValue(code, value)
823 for code, value, _ in self.codes_values]
824 else:
825 values = [self.Value(code, value)
826 for code, value, _ in self.codes_values]
827
828 for sv, cv in zip(values, self.codes_values):
829 self.assertEqual(sv.value, cv[1])
830
831 proc = self.Process(target=self._test, args=(values,))
832 proc.start()
833 proc.join()
834
835 for sv, cv in zip(values, self.codes_values):
836 self.assertEqual(sv.value, cv[2])
837
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000838 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000839 def test_rawvalue(self):
840 self.test_value(raw=True)
841
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000842 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000843 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000844 val1 = self.Value('i', 5)
845 lock1 = val1.get_lock()
846 obj1 = val1.get_obj()
847
848 val2 = self.Value('i', 5, lock=None)
849 lock2 = val2.get_lock()
850 obj2 = val2.get_obj()
851
852 lock = self.Lock()
853 val3 = self.Value('i', 5, lock=lock)
854 lock3 = val3.get_lock()
855 obj3 = val3.get_obj()
856 self.assertEqual(lock, lock3)
857
Jesse Nollerb0516a62009-01-18 03:11:38 +0000858 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859 self.assertFalse(hasattr(arr4, 'get_lock'))
860 self.assertFalse(hasattr(arr4, 'get_obj'))
861
Jesse Nollerb0516a62009-01-18 03:11:38 +0000862 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
863
864 arr5 = self.RawValue('i', 5)
865 self.assertFalse(hasattr(arr5, 'get_lock'))
866 self.assertFalse(hasattr(arr5, 'get_obj'))
867
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868
869class _TestArray(BaseTestCase):
870
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000871 ALLOWED_TYPES = ('processes',)
872
Benjamin Petersone711caf2008-06-11 16:44:04 +0000873 def f(self, seq):
874 for i in range(1, len(seq)):
875 seq[i] += seq[i-1]
876
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000877 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000878 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000879 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
880 if raw:
881 arr = self.RawArray('i', seq)
882 else:
883 arr = self.Array('i', seq)
884
885 self.assertEqual(len(arr), len(seq))
886 self.assertEqual(arr[3], seq[3])
887 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
888
889 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
890
891 self.assertEqual(list(arr[:]), seq)
892
893 self.f(seq)
894
895 p = self.Process(target=self.f, args=(arr,))
896 p.start()
897 p.join()
898
899 self.assertEqual(list(arr[:]), seq)
900
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000901 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000902 def test_rawarray(self):
903 self.test_array(raw=True)
904
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000905 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000906 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000907 arr1 = self.Array('i', list(range(10)))
908 lock1 = arr1.get_lock()
909 obj1 = arr1.get_obj()
910
911 arr2 = self.Array('i', list(range(10)), lock=None)
912 lock2 = arr2.get_lock()
913 obj2 = arr2.get_obj()
914
915 lock = self.Lock()
916 arr3 = self.Array('i', list(range(10)), lock=lock)
917 lock3 = arr3.get_lock()
918 obj3 = arr3.get_obj()
919 self.assertEqual(lock, lock3)
920
Jesse Nollerb0516a62009-01-18 03:11:38 +0000921 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000922 self.assertFalse(hasattr(arr4, 'get_lock'))
923 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000924 self.assertRaises(AttributeError,
925 self.Array, 'i', range(10), lock='notalock')
926
927 arr5 = self.RawArray('i', range(10))
928 self.assertFalse(hasattr(arr5, 'get_lock'))
929 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000930
931#
932#
933#
934
935class _TestContainers(BaseTestCase):
936
937 ALLOWED_TYPES = ('manager',)
938
939 def test_list(self):
940 a = self.list(list(range(10)))
941 self.assertEqual(a[:], list(range(10)))
942
943 b = self.list()
944 self.assertEqual(b[:], [])
945
946 b.extend(list(range(5)))
947 self.assertEqual(b[:], list(range(5)))
948
949 self.assertEqual(b[2], 2)
950 self.assertEqual(b[2:10], [2,3,4])
951
952 b *= 2
953 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
954
955 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
956
957 self.assertEqual(a[:], list(range(10)))
958
959 d = [a, b]
960 e = self.list(d)
961 self.assertEqual(
962 e[:],
963 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
964 )
965
966 f = self.list([a])
967 a.append('hello')
968 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
969
970 def test_dict(self):
971 d = self.dict()
972 indices = list(range(65, 70))
973 for i in indices:
974 d[i] = chr(i)
975 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
976 self.assertEqual(sorted(d.keys()), indices)
977 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
978 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
979
980 def test_namespace(self):
981 n = self.Namespace()
982 n.name = 'Bob'
983 n.job = 'Builder'
984 n._hidden = 'hidden'
985 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
986 del n.job
987 self.assertEqual(str(n), "Namespace(name='Bob')")
988 self.assertTrue(hasattr(n, 'name'))
989 self.assertTrue(not hasattr(n, 'job'))
990
991#
992#
993#
994
995def sqr(x, wait=0.0):
996 time.sleep(wait)
997 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000998class _TestPool(BaseTestCase):
999
1000 def test_apply(self):
1001 papply = self.pool.apply
1002 self.assertEqual(papply(sqr, (5,)), sqr(5))
1003 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1004
1005 def test_map(self):
1006 pmap = self.pool.map
1007 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1008 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1009 list(map(sqr, list(range(100)))))
1010
Georg Brandld80344f2009-08-13 12:26:19 +00001011 def test_map_chunksize(self):
1012 try:
1013 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1014 except multiprocessing.TimeoutError:
1015 self.fail("pool.map_async with chunksize stalled on null list")
1016
Benjamin Petersone711caf2008-06-11 16:44:04 +00001017 def test_async(self):
1018 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1019 get = TimingWrapper(res.get)
1020 self.assertEqual(get(), 49)
1021 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1022
1023 def test_async_timeout(self):
1024 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1025 get = TimingWrapper(res.get)
1026 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1027 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1028
1029 def test_imap(self):
1030 it = self.pool.imap(sqr, list(range(10)))
1031 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1032
1033 it = self.pool.imap(sqr, list(range(10)))
1034 for i in range(10):
1035 self.assertEqual(next(it), i*i)
1036 self.assertRaises(StopIteration, it.__next__)
1037
1038 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1039 for i in range(1000):
1040 self.assertEqual(next(it), i*i)
1041 self.assertRaises(StopIteration, it.__next__)
1042
1043 def test_imap_unordered(self):
1044 it = self.pool.imap_unordered(sqr, list(range(1000)))
1045 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1046
1047 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1048 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1049
1050 def test_make_pool(self):
1051 p = multiprocessing.Pool(3)
1052 self.assertEqual(3, len(p._pool))
1053 p.close()
1054 p.join()
1055
1056 def test_terminate(self):
1057 if self.TYPE == 'manager':
1058 # On Unix a forked process increfs each shared object to
1059 # which its parent process held a reference. If the
1060 # forked process gets terminated then there is likely to
1061 # be a reference leak. So to prevent
1062 # _TestZZZNumberOfObjects from failing we skip this test
1063 # when using a manager.
1064 return
1065
1066 result = self.pool.map_async(
1067 time.sleep, [0.1 for i in range(10000)], chunksize=1
1068 )
1069 self.pool.terminate()
1070 join = TimingWrapper(self.pool.join)
1071 join()
1072 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073#
1074# Test that manager has expected number of shared objects left
1075#
1076
1077class _TestZZZNumberOfObjects(BaseTestCase):
1078 # Because test cases are sorted alphabetically, this one will get
1079 # run after all the other tests for the manager. It tests that
1080 # there have been no "reference leaks" for the manager's shared
1081 # objects. Note the comment in _TestPool.test_terminate().
1082 ALLOWED_TYPES = ('manager',)
1083
1084 def test_number_of_objects(self):
1085 EXPECTED_NUMBER = 1 # the pool object is still alive
1086 multiprocessing.active_children() # discard dead process objs
1087 gc.collect() # do garbage collection
1088 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001089 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001090 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001091 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001092 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001093
1094 self.assertEqual(refs, EXPECTED_NUMBER)
1095
1096#
1097# Test of creating a customized manager class
1098#
1099
1100from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1101
1102class FooBar(object):
1103 def f(self):
1104 return 'f()'
1105 def g(self):
1106 raise ValueError
1107 def _h(self):
1108 return '_h()'
1109
1110def baz():
1111 for i in range(10):
1112 yield i*i
1113
1114class IteratorProxy(BaseProxy):
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001115 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001116 def __iter__(self):
1117 return self
1118 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001119 return self._callmethod('__next__')
1120
1121class MyManager(BaseManager):
1122 pass
1123
1124MyManager.register('Foo', callable=FooBar)
1125MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1126MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1127
1128
1129class _TestMyManager(BaseTestCase):
1130
1131 ALLOWED_TYPES = ('manager',)
1132
1133 def test_mymanager(self):
1134 manager = MyManager()
1135 manager.start()
1136
1137 foo = manager.Foo()
1138 bar = manager.Bar()
1139 baz = manager.baz()
1140
1141 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1142 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1143
1144 self.assertEqual(foo_methods, ['f', 'g'])
1145 self.assertEqual(bar_methods, ['f', '_h'])
1146
1147 self.assertEqual(foo.f(), 'f()')
1148 self.assertRaises(ValueError, foo.g)
1149 self.assertEqual(foo._callmethod('f'), 'f()')
1150 self.assertRaises(RemoteError, foo._callmethod, '_h')
1151
1152 self.assertEqual(bar.f(), 'f()')
1153 self.assertEqual(bar._h(), '_h()')
1154 self.assertEqual(bar._callmethod('f'), 'f()')
1155 self.assertEqual(bar._callmethod('_h'), '_h()')
1156
1157 self.assertEqual(list(baz), [i*i for i in range(10)])
1158
1159 manager.shutdown()
1160
1161#
1162# Test of connecting to a remote server and using xmlrpclib for serialization
1163#
1164
1165_queue = pyqueue.Queue()
1166def get_queue():
1167 return _queue
1168
1169class QueueManager(BaseManager):
1170 '''manager class used by server process'''
1171QueueManager.register('get_queue', callable=get_queue)
1172
1173class QueueManager2(BaseManager):
1174 '''manager class which specifies the same interface as QueueManager'''
1175QueueManager2.register('get_queue')
1176
1177
1178SERIALIZER = 'xmlrpclib'
1179
1180class _TestRemoteManager(BaseTestCase):
1181
1182 ALLOWED_TYPES = ('manager',)
1183
1184 def _putter(self, address, authkey):
1185 manager = QueueManager2(
1186 address=address, authkey=authkey, serializer=SERIALIZER
1187 )
1188 manager.connect()
1189 queue = manager.get_queue()
1190 queue.put(('hello world', None, True, 2.25))
1191
1192 def test_remote(self):
1193 authkey = os.urandom(32)
1194
1195 manager = QueueManager(
1196 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1197 )
1198 manager.start()
1199
1200 p = self.Process(target=self._putter, args=(manager.address, authkey))
1201 p.start()
1202
1203 manager2 = QueueManager2(
1204 address=manager.address, authkey=authkey, serializer=SERIALIZER
1205 )
1206 manager2.connect()
1207 queue = manager2.get_queue()
1208
1209 # Note that xmlrpclib will deserialize object as a list not a tuple
1210 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1211
1212 # Because we are using xmlrpclib for serialization instead of
1213 # pickle this will cause a serialization error.
1214 self.assertRaises(Exception, queue.put, time.sleep)
1215
1216 # Make queue finalizer run before the server is stopped
1217 del queue
1218 manager.shutdown()
1219
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001220class _TestManagerRestart(BaseTestCase):
1221
1222 def _putter(self, address, authkey):
1223 manager = QueueManager(
1224 address=address, authkey=authkey, serializer=SERIALIZER)
1225 manager.connect()
1226 queue = manager.get_queue()
1227 queue.put('hello world')
1228
1229 def test_rapid_restart(self):
1230 authkey = os.urandom(32)
1231 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001232 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1233 addr = manager.get_server().address
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001234 manager.start()
1235
1236 p = self.Process(target=self._putter, args=(manager.address, authkey))
1237 p.start()
1238 queue = manager.get_queue()
1239 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001240 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001241 manager.shutdown()
1242 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001243 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001244 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001245 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001246
Benjamin Petersone711caf2008-06-11 16:44:04 +00001247#
1248#
1249#
1250
1251SENTINEL = latin('')
1252
1253class _TestConnection(BaseTestCase):
1254
1255 ALLOWED_TYPES = ('processes', 'threads')
1256
1257 def _echo(self, conn):
1258 for msg in iter(conn.recv_bytes, SENTINEL):
1259 conn.send_bytes(msg)
1260 conn.close()
1261
1262 def test_connection(self):
1263 conn, child_conn = self.Pipe()
1264
1265 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001266 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001267 p.start()
1268
1269 seq = [1, 2.25, None]
1270 msg = latin('hello world')
1271 longmsg = msg * 10
1272 arr = array.array('i', list(range(4)))
1273
1274 if self.TYPE == 'processes':
1275 self.assertEqual(type(conn.fileno()), int)
1276
1277 self.assertEqual(conn.send(seq), None)
1278 self.assertEqual(conn.recv(), seq)
1279
1280 self.assertEqual(conn.send_bytes(msg), None)
1281 self.assertEqual(conn.recv_bytes(), msg)
1282
1283 if self.TYPE == 'processes':
1284 buffer = array.array('i', [0]*10)
1285 expected = list(arr) + [0] * (10 - len(arr))
1286 self.assertEqual(conn.send_bytes(arr), None)
1287 self.assertEqual(conn.recv_bytes_into(buffer),
1288 len(arr) * buffer.itemsize)
1289 self.assertEqual(list(buffer), expected)
1290
1291 buffer = array.array('i', [0]*10)
1292 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1293 self.assertEqual(conn.send_bytes(arr), None)
1294 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1295 len(arr) * buffer.itemsize)
1296 self.assertEqual(list(buffer), expected)
1297
1298 buffer = bytearray(latin(' ' * 40))
1299 self.assertEqual(conn.send_bytes(longmsg), None)
1300 try:
1301 res = conn.recv_bytes_into(buffer)
1302 except multiprocessing.BufferTooShort as e:
1303 self.assertEqual(e.args, (longmsg,))
1304 else:
1305 self.fail('expected BufferTooShort, got %s' % res)
1306
1307 poll = TimingWrapper(conn.poll)
1308
1309 self.assertEqual(poll(), False)
1310 self.assertTimingAlmostEqual(poll.elapsed, 0)
1311
1312 self.assertEqual(poll(TIMEOUT1), False)
1313 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1314
1315 conn.send(None)
1316
1317 self.assertEqual(poll(TIMEOUT1), True)
1318 self.assertTimingAlmostEqual(poll.elapsed, 0)
1319
1320 self.assertEqual(conn.recv(), None)
1321
1322 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1323 conn.send_bytes(really_big_msg)
1324 self.assertEqual(conn.recv_bytes(), really_big_msg)
1325
1326 conn.send_bytes(SENTINEL) # tell child to quit
1327 child_conn.close()
1328
1329 if self.TYPE == 'processes':
1330 self.assertEqual(conn.readable, True)
1331 self.assertEqual(conn.writable, True)
1332 self.assertRaises(EOFError, conn.recv)
1333 self.assertRaises(EOFError, conn.recv_bytes)
1334
1335 p.join()
1336
1337 def test_duplex_false(self):
1338 reader, writer = self.Pipe(duplex=False)
1339 self.assertEqual(writer.send(1), None)
1340 self.assertEqual(reader.recv(), 1)
1341 if self.TYPE == 'processes':
1342 self.assertEqual(reader.readable, True)
1343 self.assertEqual(reader.writable, False)
1344 self.assertEqual(writer.readable, False)
1345 self.assertEqual(writer.writable, True)
1346 self.assertRaises(IOError, reader.send, 2)
1347 self.assertRaises(IOError, writer.recv)
1348 self.assertRaises(IOError, writer.poll)
1349
1350 def test_spawn_close(self):
1351 # We test that a pipe connection can be closed by parent
1352 # process immediately after child is spawned. On Windows this
1353 # would have sometimes failed on old versions because
1354 # child_conn would be closed before the child got a chance to
1355 # duplicate it.
1356 conn, child_conn = self.Pipe()
1357
1358 p = self.Process(target=self._echo, args=(child_conn,))
1359 p.start()
1360 child_conn.close() # this might complete before child initializes
1361
1362 msg = latin('hello')
1363 conn.send_bytes(msg)
1364 self.assertEqual(conn.recv_bytes(), msg)
1365
1366 conn.send_bytes(SENTINEL)
1367 conn.close()
1368 p.join()
1369
1370 def test_sendbytes(self):
1371 if self.TYPE != 'processes':
1372 return
1373
1374 msg = latin('abcdefghijklmnopqrstuvwxyz')
1375 a, b = self.Pipe()
1376
1377 a.send_bytes(msg)
1378 self.assertEqual(b.recv_bytes(), msg)
1379
1380 a.send_bytes(msg, 5)
1381 self.assertEqual(b.recv_bytes(), msg[5:])
1382
1383 a.send_bytes(msg, 7, 8)
1384 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1385
1386 a.send_bytes(msg, 26)
1387 self.assertEqual(b.recv_bytes(), latin(''))
1388
1389 a.send_bytes(msg, 26, 0)
1390 self.assertEqual(b.recv_bytes(), latin(''))
1391
1392 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1393
1394 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1395
1396 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1397
1398 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1399
1400 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1401
Benjamin Petersone711caf2008-06-11 16:44:04 +00001402class _TestListenerClient(BaseTestCase):
1403
1404 ALLOWED_TYPES = ('processes', 'threads')
1405
1406 def _test(self, address):
1407 conn = self.connection.Client(address)
1408 conn.send('hello')
1409 conn.close()
1410
1411 def test_listener_client(self):
1412 for family in self.connection.families:
1413 l = self.connection.Listener(family=family)
1414 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001415 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001416 p.start()
1417 conn = l.accept()
1418 self.assertEqual(conn.recv(), 'hello')
1419 p.join()
1420 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001421#
1422# Test of sending connection and socket objects between processes
1423#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001424"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001425class _TestPicklingConnections(BaseTestCase):
1426
1427 ALLOWED_TYPES = ('processes',)
1428
1429 def _listener(self, conn, families):
1430 for fam in families:
1431 l = self.connection.Listener(family=fam)
1432 conn.send(l.address)
1433 new_conn = l.accept()
1434 conn.send(new_conn)
1435
1436 if self.TYPE == 'processes':
1437 l = socket.socket()
1438 l.bind(('localhost', 0))
1439 conn.send(l.getsockname())
1440 l.listen(1)
1441 new_conn, addr = l.accept()
1442 conn.send(new_conn)
1443
1444 conn.recv()
1445
1446 def _remote(self, conn):
1447 for (address, msg) in iter(conn.recv, None):
1448 client = self.connection.Client(address)
1449 client.send(msg.upper())
1450 client.close()
1451
1452 if self.TYPE == 'processes':
1453 address, msg = conn.recv()
1454 client = socket.socket()
1455 client.connect(address)
1456 client.sendall(msg.upper())
1457 client.close()
1458
1459 conn.close()
1460
1461 def test_pickling(self):
1462 try:
1463 multiprocessing.allow_connection_pickling()
1464 except ImportError:
1465 return
1466
1467 families = self.connection.families
1468
1469 lconn, lconn0 = self.Pipe()
1470 lp = self.Process(target=self._listener, args=(lconn0, families))
1471 lp.start()
1472 lconn0.close()
1473
1474 rconn, rconn0 = self.Pipe()
1475 rp = self.Process(target=self._remote, args=(rconn0,))
1476 rp.start()
1477 rconn0.close()
1478
1479 for fam in families:
1480 msg = ('This connection uses family %s' % fam).encode('ascii')
1481 address = lconn.recv()
1482 rconn.send((address, msg))
1483 new_conn = lconn.recv()
1484 self.assertEqual(new_conn.recv(), msg.upper())
1485
1486 rconn.send(None)
1487
1488 if self.TYPE == 'processes':
1489 msg = latin('This connection uses a normal socket')
1490 address = lconn.recv()
1491 rconn.send((address, msg))
1492 if hasattr(socket, 'fromfd'):
1493 new_conn = lconn.recv()
1494 self.assertEqual(new_conn.recv(100), msg.upper())
1495 else:
1496 # XXX On Windows with Py2.6 need to backport fromfd()
1497 discard = lconn.recv_bytes()
1498
1499 lconn.send(None)
1500
1501 rconn.close()
1502 lconn.close()
1503
1504 lp.join()
1505 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001506"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001507#
1508#
1509#
1510
1511class _TestHeap(BaseTestCase):
1512
1513 ALLOWED_TYPES = ('processes',)
1514
1515 def test_heap(self):
1516 iterations = 5000
1517 maxblocks = 50
1518 blocks = []
1519
1520 # create and destroy lots of blocks of different sizes
1521 for i in range(iterations):
1522 size = int(random.lognormvariate(0, 1) * 1000)
1523 b = multiprocessing.heap.BufferWrapper(size)
1524 blocks.append(b)
1525 if len(blocks) > maxblocks:
1526 i = random.randrange(maxblocks)
1527 del blocks[i]
1528
1529 # get the heap object
1530 heap = multiprocessing.heap.BufferWrapper._heap
1531
1532 # verify the state of the heap
1533 all = []
1534 occupied = 0
1535 for L in list(heap._len_to_seq.values()):
1536 for arena, start, stop in L:
1537 all.append((heap._arenas.index(arena), start, stop,
1538 stop-start, 'free'))
1539 for arena, start, stop in heap._allocated_blocks:
1540 all.append((heap._arenas.index(arena), start, stop,
1541 stop-start, 'occupied'))
1542 occupied += (stop-start)
1543
1544 all.sort()
1545
1546 for i in range(len(all)-1):
1547 (arena, start, stop) = all[i][:3]
1548 (narena, nstart, nstop) = all[i+1][:3]
1549 self.assertTrue((arena != narena and nstart == 0) or
1550 (stop == nstart))
1551
1552#
1553#
1554#
1555
Benjamin Petersone711caf2008-06-11 16:44:04 +00001556class _Foo(Structure):
1557 _fields_ = [
1558 ('x', c_int),
1559 ('y', c_double)
1560 ]
1561
1562class _TestSharedCTypes(BaseTestCase):
1563
1564 ALLOWED_TYPES = ('processes',)
1565
1566 def _double(self, x, y, foo, arr, string):
1567 x.value *= 2
1568 y.value *= 2
1569 foo.x *= 2
1570 foo.y *= 2
1571 string.value *= 2
1572 for i in range(len(arr)):
1573 arr[i] *= 2
1574
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001575 @unittest.skipIf(Value is None, "requires ctypes.Value")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001576 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001577 x = Value('i', 7, lock=lock)
1578 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1579 foo = Value(_Foo, 3, 2, lock=lock)
1580 arr = Array('d', list(range(10)), lock=lock)
1581 string = Array('c', 20, lock=lock)
1582 string.value = 'hello'
1583
1584 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1585 p.start()
1586 p.join()
1587
1588 self.assertEqual(x.value, 14)
1589 self.assertAlmostEqual(y.value, 2.0/3.0)
1590 self.assertEqual(foo.x, 6)
1591 self.assertAlmostEqual(foo.y, 4.0)
1592 for i in range(10):
1593 self.assertAlmostEqual(arr[i], i*2)
1594 self.assertEqual(string.value, latin('hellohello'))
1595
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001596 @unittest.skipIf(Value is None, "requires ctypes.Value")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001597 def test_synchronize(self):
1598 self.test_sharedctypes(lock=True)
1599
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001600 @unittest.skipIf(ctypes_copy is None, "requires ctypes.copy")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001601 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001602 foo = _Foo(2, 5.0)
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001603 bar = ctypes_copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001604 foo.x = 0
1605 foo.y = 0
1606 self.assertEqual(bar.x, 2)
1607 self.assertAlmostEqual(bar.y, 5.0)
1608
1609#
1610#
1611#
1612
1613class _TestFinalize(BaseTestCase):
1614
1615 ALLOWED_TYPES = ('processes',)
1616
1617 def _test_finalize(self, conn):
1618 class Foo(object):
1619 pass
1620
1621 a = Foo()
1622 util.Finalize(a, conn.send, args=('a',))
1623 del a # triggers callback for a
1624
1625 b = Foo()
1626 close_b = util.Finalize(b, conn.send, args=('b',))
1627 close_b() # triggers callback for b
1628 close_b() # does nothing because callback has already been called
1629 del b # does nothing because callback has already been called
1630
1631 c = Foo()
1632 util.Finalize(c, conn.send, args=('c',))
1633
1634 d10 = Foo()
1635 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1636
1637 d01 = Foo()
1638 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1639 d02 = Foo()
1640 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1641 d03 = Foo()
1642 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1643
1644 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1645
1646 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1647
1648 # call mutliprocessing's cleanup function then exit process without
1649 # garbage collecting locals
1650 util._exit_function()
1651 conn.close()
1652 os._exit(0)
1653
1654 def test_finalize(self):
1655 conn, child_conn = self.Pipe()
1656
1657 p = self.Process(target=self._test_finalize, args=(child_conn,))
1658 p.start()
1659 p.join()
1660
1661 result = [obj for obj in iter(conn.recv, 'STOP')]
1662 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1663
1664#
1665# Test that from ... import * works for each module
1666#
1667
1668class _TestImportStar(BaseTestCase):
1669
1670 ALLOWED_TYPES = ('processes',)
1671
1672 def test_import(self):
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001673 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001674 'multiprocessing', 'multiprocessing.connection',
1675 'multiprocessing.heap', 'multiprocessing.managers',
1676 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001677 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001678 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001679 ]
1680
1681 if c_int is not None:
1682 # This module requires _ctypes
1683 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001684
1685 for name in modules:
1686 __import__(name)
1687 mod = sys.modules[name]
1688
1689 for attr in getattr(mod, '__all__', ()):
1690 self.assertTrue(
1691 hasattr(mod, attr),
1692 '%r does not have attribute %r' % (mod, attr)
1693 )
1694
1695#
1696# Quick test that logging works -- does not test logging output
1697#
1698
1699class _TestLogging(BaseTestCase):
1700
1701 ALLOWED_TYPES = ('processes',)
1702
1703 def test_enable_logging(self):
1704 logger = multiprocessing.get_logger()
1705 logger.setLevel(util.SUBWARNING)
1706 self.assertTrue(logger is not None)
1707 logger.debug('this will not be printed')
1708 logger.info('nor will this')
1709 logger.setLevel(LOG_LEVEL)
1710
1711 def _test_level(self, conn):
1712 logger = multiprocessing.get_logger()
1713 conn.send(logger.getEffectiveLevel())
1714
1715 def test_level(self):
1716 LEVEL1 = 32
1717 LEVEL2 = 37
1718
1719 logger = multiprocessing.get_logger()
1720 root_logger = logging.getLogger()
1721 root_level = root_logger.level
1722
1723 reader, writer = multiprocessing.Pipe(duplex=False)
1724
1725 logger.setLevel(LEVEL1)
1726 self.Process(target=self._test_level, args=(writer,)).start()
1727 self.assertEqual(LEVEL1, reader.recv())
1728
1729 logger.setLevel(logging.NOTSET)
1730 root_logger.setLevel(LEVEL2)
1731 self.Process(target=self._test_level, args=(writer,)).start()
1732 self.assertEqual(LEVEL2, reader.recv())
1733
1734 root_logger.setLevel(root_level)
1735 logger.setLevel(level=LOG_LEVEL)
1736
1737#
Jesse Noller6214edd2009-01-19 16:23:53 +00001738# Test to verify handle verification, see issue 3321
1739#
1740
1741class TestInvalidHandle(unittest.TestCase):
1742
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001743 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001744 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001745 conn = _multiprocessing.Connection(44977608)
1746 self.assertRaises(IOError, conn.poll)
1747 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001748
Jesse Noller6214edd2009-01-19 16:23:53 +00001749#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001750# Functions used to create test cases from the base ones in this module
1751#
1752
1753def get_attributes(Source, names):
1754 d = {}
1755 for name in names:
1756 obj = getattr(Source, name)
1757 if type(obj) == type(get_attributes):
1758 obj = staticmethod(obj)
1759 d[name] = obj
1760 return d
1761
1762def create_test_cases(Mixin, type):
1763 result = {}
1764 glob = globals()
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001765 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001766
1767 for name in list(glob.keys()):
1768 if name.startswith('_Test'):
1769 base = glob[name]
1770 if type in base.ALLOWED_TYPES:
1771 newname = 'With' + Type + name[1:]
1772 class Temp(base, unittest.TestCase, Mixin):
1773 pass
1774 result[newname] = Temp
1775 Temp.__name__ = newname
1776 Temp.__module__ = Mixin.__module__
1777 return result
1778
1779#
1780# Create test cases
1781#
1782
1783class ProcessesMixin(object):
1784 TYPE = 'processes'
1785 Process = multiprocessing.Process
1786 locals().update(get_attributes(multiprocessing, (
1787 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1788 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1789 'RawArray', 'current_process', 'active_children', 'Pipe',
1790 'connection', 'JoinableQueue'
1791 )))
1792
1793testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1794globals().update(testcases_processes)
1795
1796
1797class ManagerMixin(object):
1798 TYPE = 'manager'
1799 Process = multiprocessing.Process
1800 manager = object.__new__(multiprocessing.managers.SyncManager)
1801 locals().update(get_attributes(manager, (
1802 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1803 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1804 'Namespace', 'JoinableQueue'
1805 )))
1806
1807testcases_manager = create_test_cases(ManagerMixin, type='manager')
1808globals().update(testcases_manager)
1809
1810
1811class ThreadsMixin(object):
1812 TYPE = 'threads'
1813 Process = multiprocessing.dummy.Process
1814 locals().update(get_attributes(multiprocessing.dummy, (
1815 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1816 'Condition', 'Event', 'Value', 'Array', 'current_process',
1817 'active_children', 'Pipe', 'connection', 'dict', 'list',
1818 'Namespace', 'JoinableQueue'
1819 )))
1820
1821testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1822globals().update(testcases_threads)
1823
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001824class OtherTest(unittest.TestCase):
1825 # TODO: add more tests for deliver/answer challenge.
1826 def test_deliver_challenge_auth_failure(self):
1827 class _FakeConnection(object):
1828 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001829 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001830 def send_bytes(self, data):
1831 pass
1832 self.assertRaises(multiprocessing.AuthenticationError,
1833 multiprocessing.connection.deliver_challenge,
1834 _FakeConnection(), b'abc')
1835
1836 def test_answer_challenge_auth_failure(self):
1837 class _FakeConnection(object):
1838 def __init__(self):
1839 self.count = 0
1840 def recv_bytes(self, size):
1841 self.count += 1
1842 if self.count == 1:
1843 return multiprocessing.connection.CHALLENGE
1844 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001845 return b'something bogus'
1846 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001847 def send_bytes(self, data):
1848 pass
1849 self.assertRaises(multiprocessing.AuthenticationError,
1850 multiprocessing.connection.answer_challenge,
1851 _FakeConnection(), b'abc')
1852
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001853#
1854# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1855#
1856
1857def initializer(ns):
1858 ns.test += 1
1859
1860class TestInitializers(unittest.TestCase):
1861 def setUp(self):
1862 self.mgr = multiprocessing.Manager()
1863 self.ns = self.mgr.Namespace()
1864 self.ns.test = 0
1865
1866 def tearDown(self):
1867 self.mgr.shutdown()
1868
1869 def test_manager_initializer(self):
1870 m = multiprocessing.managers.SyncManager()
1871 self.assertRaises(TypeError, m.start, 1)
1872 m.start(initializer, (self.ns,))
1873 self.assertEqual(self.ns.test, 1)
1874 m.shutdown()
1875
1876 def test_pool_initializer(self):
1877 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1878 p = multiprocessing.Pool(1, initializer, (self.ns,))
1879 p.close()
1880 p.join()
1881 self.assertEqual(self.ns.test, 1)
1882
R. David Murraya44c6b32009-07-29 15:40:30 +00001883#
1884# Issue 5155, 5313, 5331: Test process in processes
1885# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1886#
1887
1888def _ThisSubProcess(q):
1889 try:
1890 item = q.get(block=False)
1891 except pyqueue.Empty:
1892 pass
1893
1894def _TestProcess(q):
1895 queue = multiprocessing.Queue()
1896 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1897 subProc.start()
1898 subProc.join()
1899
1900def _afunc(x):
1901 return x*x
1902
1903def pool_in_process():
1904 pool = multiprocessing.Pool(processes=4)
1905 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1906
1907class _file_like(object):
1908 def __init__(self, delegate):
1909 self._delegate = delegate
1910 self._pid = None
1911
1912 @property
1913 def cache(self):
1914 pid = os.getpid()
1915 # There are no race conditions since fork keeps only the running thread
1916 if pid != self._pid:
1917 self._pid = pid
1918 self._cache = []
1919 return self._cache
1920
1921 def write(self, data):
1922 self.cache.append(data)
1923
1924 def flush(self):
1925 self._delegate.write(''.join(self.cache))
1926 self._cache = []
1927
1928class TestStdinBadfiledescriptor(unittest.TestCase):
1929
1930 def test_queue_in_process(self):
1931 queue = multiprocessing.Queue()
1932 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1933 proc.start()
1934 proc.join()
1935
1936 def test_pool_in_process(self):
1937 p = multiprocessing.Process(target=pool_in_process)
1938 p.start()
1939 p.join()
1940
1941 def test_flushing(self):
1942 sio = io.StringIO()
1943 flike = _file_like(sio)
1944 flike.write('foo')
1945 proc = multiprocessing.Process(target=lambda: flike.flush())
1946 flike.flush()
1947 assert sio.getvalue() == 'foo'
1948
1949testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1950 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001951
Benjamin Petersone711caf2008-06-11 16:44:04 +00001952#
1953#
1954#
1955
1956def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001957 if sys.platform.startswith("linux"):
1958 try:
1959 lock = multiprocessing.RLock()
1960 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00001961 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001962
Benjamin Petersone711caf2008-06-11 16:44:04 +00001963 if run is None:
1964 from test.support import run_unittest as run
1965
1966 util.get_temp_dir() # creates temp directory for use by all processes
1967
1968 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1969
Benjamin Peterson41181742008-07-02 20:22:54 +00001970 ProcessesMixin.pool = multiprocessing.Pool(4)
1971 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1972 ManagerMixin.manager.__init__()
1973 ManagerMixin.manager.start()
1974 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001975
1976 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001977 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1978 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001979 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1980 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001981 )
1982
1983 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1984 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1985 run(suite)
1986
Benjamin Peterson41181742008-07-02 20:22:54 +00001987 ThreadsMixin.pool.terminate()
1988 ProcessesMixin.pool.terminate()
1989 ManagerMixin.pool.terminate()
1990 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001991
Benjamin Peterson41181742008-07-02 20:22:54 +00001992 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001993
1994def main():
1995 test_main(unittest.TextTestRunner(verbosity=2).run)
1996
1997if __name__ == '__main__':
1998 main()