blob: 693026227fdbc88c232c32501b014a96891d90c8 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Victor Stinner613b4cf2010-04-27 21:56:26 +000021# import threading after _multiprocessing to raise a more revelant error
22# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
Brian Curtina06e9b82010-10-07 02:27:41 +000037try:
38 from multiprocessing.sharedctypes import Value, copy
39 HAS_SHAREDCTYPES = True
40except ImportError:
41 HAS_SHAREDCTYPES = False
42
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44#
45#
46
Benjamin Petersone79edf52008-07-13 18:34:58 +000047latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000048
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000054#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000055
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersondfd79492008-06-13 19:13:39 +000071#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000072# Some tests require ctypes
73#
74
75try:
Nick Coghlan13623662010-04-10 14:24:36 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersondfd79492008-06-13 19:13:39 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
118#
119# Return the value of a semaphore
120#
121
122def get_value(self):
123 try:
124 return self.get_value()
125 except AttributeError:
126 try:
127 return self._Semaphore__value
128 except AttributeError:
129 try:
130 return self._value
131 except AttributeError:
132 raise NotImplementedError
133
134#
135# Testcases
136#
137
138class _TestProcess(BaseTestCase):
139
140 ALLOWED_TYPES = ('processes', 'threads')
141
142 def test_current(self):
143 if self.TYPE == 'threads':
144 return
145
146 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000147 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000148
149 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000150 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000151 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000152 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000153 self.assertEqual(current.ident, os.getpid())
154 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000155
156 def _test(self, q, *args, **kwds):
157 current = self.current_process()
158 q.put(args)
159 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000160 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000161 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000162 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000163 q.put(current.pid)
164
165 def test_process(self):
166 q = self.Queue(1)
167 e = self.Event()
168 args = (q, 1, 2)
169 kwargs = {'hello':23, 'bye':2.54}
170 name = 'SomeProcess'
171 p = self.Process(
172 target=self._test, args=args, kwargs=kwargs, name=name
173 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000174 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000175 current = self.current_process()
176
177 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000178 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000180 self.assertEquals(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000181 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000182 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000183 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184
185 p.start()
186
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188 self.assertEquals(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000189 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000190
191 self.assertEquals(q.get(), args[1:])
192 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000195 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196 self.assertEquals(q.get(), p.pid)
197
198 p.join()
199
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201 self.assertEquals(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000202 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203
204 def _test_terminate(self):
205 time.sleep(1000)
206
207 def test_terminate(self):
208 if self.TYPE == 'threads':
209 return
210
211 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000212 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000213 p.start()
214
215 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000216 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000217 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000218
219 p.terminate()
220
221 join = TimingWrapper(p.join)
222 self.assertEqual(join(), None)
223 self.assertTimingAlmostEqual(join.elapsed, 0.0)
224
225 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000226 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000227
228 p.join()
229
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000230 # XXX sometimes get p.exitcode == 0 on Windows ...
231 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000232
233 def test_cpu_count(self):
234 try:
235 cpus = multiprocessing.cpu_count()
236 except NotImplementedError:
237 cpus = 1
238 self.assertTrue(type(cpus) is int)
239 self.assertTrue(cpus >= 1)
240
241 def test_active_children(self):
242 self.assertEqual(type(self.active_children()), list)
243
244 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000245 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000246
247 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000248 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000249
250 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000251 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 def _test_recursion(self, wconn, id):
254 from multiprocessing import forking
255 wconn.send(id)
256 if len(id) < 2:
257 for i in range(2):
258 p = self.Process(
259 target=self._test_recursion, args=(wconn, id+[i])
260 )
261 p.start()
262 p.join()
263
264 def test_recursion(self):
265 rconn, wconn = self.Pipe(duplex=False)
266 self._test_recursion(wconn, [])
267
268 time.sleep(DELTA)
269 result = []
270 while rconn.poll():
271 result.append(rconn.recv())
272
273 expected = [
274 [],
275 [0],
276 [0, 0],
277 [0, 1],
278 [1],
279 [1, 0],
280 [1, 1]
281 ]
282 self.assertEqual(result, expected)
283
284#
285#
286#
287
288class _UpperCaser(multiprocessing.Process):
289
290 def __init__(self):
291 multiprocessing.Process.__init__(self)
292 self.child_conn, self.parent_conn = multiprocessing.Pipe()
293
294 def run(self):
295 self.parent_conn.close()
296 for s in iter(self.child_conn.recv, None):
297 self.child_conn.send(s.upper())
298 self.child_conn.close()
299
300 def submit(self, s):
301 assert type(s) is str
302 self.parent_conn.send(s)
303 return self.parent_conn.recv()
304
305 def stop(self):
306 self.parent_conn.send(None)
307 self.parent_conn.close()
308 self.child_conn.close()
309
310class _TestSubclassingProcess(BaseTestCase):
311
312 ALLOWED_TYPES = ('processes',)
313
314 def test_subclassing(self):
315 uppercaser = _UpperCaser()
316 uppercaser.start()
317 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
318 self.assertEqual(uppercaser.submit('world'), 'WORLD')
319 uppercaser.stop()
320 uppercaser.join()
321
322#
323#
324#
325
326def queue_empty(q):
327 if hasattr(q, 'empty'):
328 return q.empty()
329 else:
330 return q.qsize() == 0
331
332def queue_full(q, maxsize):
333 if hasattr(q, 'full'):
334 return q.full()
335 else:
336 return q.qsize() == maxsize
337
338
339class _TestQueue(BaseTestCase):
340
341
342 def _test_put(self, queue, child_can_start, parent_can_continue):
343 child_can_start.wait()
344 for i in range(6):
345 queue.get()
346 parent_can_continue.set()
347
348 def test_put(self):
349 MAXSIZE = 6
350 queue = self.Queue(maxsize=MAXSIZE)
351 child_can_start = self.Event()
352 parent_can_continue = self.Event()
353
354 proc = self.Process(
355 target=self._test_put,
356 args=(queue, child_can_start, parent_can_continue)
357 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000358 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000359 proc.start()
360
361 self.assertEqual(queue_empty(queue), True)
362 self.assertEqual(queue_full(queue, MAXSIZE), False)
363
364 queue.put(1)
365 queue.put(2, True)
366 queue.put(3, True, None)
367 queue.put(4, False)
368 queue.put(5, False, None)
369 queue.put_nowait(6)
370
371 # the values may be in buffer but not yet in pipe so sleep a bit
372 time.sleep(DELTA)
373
374 self.assertEqual(queue_empty(queue), False)
375 self.assertEqual(queue_full(queue, MAXSIZE), True)
376
377 put = TimingWrapper(queue.put)
378 put_nowait = TimingWrapper(queue.put_nowait)
379
380 self.assertRaises(Queue.Full, put, 7, False)
381 self.assertTimingAlmostEqual(put.elapsed, 0)
382
383 self.assertRaises(Queue.Full, put, 7, False, None)
384 self.assertTimingAlmostEqual(put.elapsed, 0)
385
386 self.assertRaises(Queue.Full, put_nowait, 7)
387 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
388
389 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
390 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
391
392 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
393 self.assertTimingAlmostEqual(put.elapsed, 0)
394
395 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
396 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
397
398 child_can_start.set()
399 parent_can_continue.wait()
400
401 self.assertEqual(queue_empty(queue), True)
402 self.assertEqual(queue_full(queue, MAXSIZE), False)
403
404 proc.join()
405
406 def _test_get(self, queue, child_can_start, parent_can_continue):
407 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000408 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000409 queue.put(2)
410 queue.put(3)
411 queue.put(4)
412 queue.put(5)
413 parent_can_continue.set()
414
415 def test_get(self):
416 queue = self.Queue()
417 child_can_start = self.Event()
418 parent_can_continue = self.Event()
419
420 proc = self.Process(
421 target=self._test_get,
422 args=(queue, child_can_start, parent_can_continue)
423 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000424 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000425 proc.start()
426
427 self.assertEqual(queue_empty(queue), True)
428
429 child_can_start.set()
430 parent_can_continue.wait()
431
432 time.sleep(DELTA)
433 self.assertEqual(queue_empty(queue), False)
434
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000435 # Hangs unexpectedly, remove for now
436 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 self.assertEqual(queue.get(True, None), 2)
438 self.assertEqual(queue.get(True), 3)
439 self.assertEqual(queue.get(timeout=1), 4)
440 self.assertEqual(queue.get_nowait(), 5)
441
442 self.assertEqual(queue_empty(queue), True)
443
444 get = TimingWrapper(queue.get)
445 get_nowait = TimingWrapper(queue.get_nowait)
446
447 self.assertRaises(Queue.Empty, get, False)
448 self.assertTimingAlmostEqual(get.elapsed, 0)
449
450 self.assertRaises(Queue.Empty, get, False, None)
451 self.assertTimingAlmostEqual(get.elapsed, 0)
452
453 self.assertRaises(Queue.Empty, get_nowait)
454 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
455
456 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
457 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
458
459 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
463 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
464
465 proc.join()
466
467 def _test_fork(self, queue):
468 for i in range(10, 20):
469 queue.put(i)
470 # note that at this point the items may only be buffered, so the
471 # process cannot shutdown until the feeder thread has finished
472 # pushing items onto the pipe.
473
474 def test_fork(self):
475 # Old versions of Queue would fail to create a new feeder
476 # thread for a forked process if the original process had its
477 # own feeder thread. This test checks that this no longer
478 # happens.
479
480 queue = self.Queue()
481
482 # put items on queue so that main process starts a feeder thread
483 for i in range(10):
484 queue.put(i)
485
486 # wait to make sure thread starts before we fork a new process
487 time.sleep(DELTA)
488
489 # fork process
490 p = self.Process(target=self._test_fork, args=(queue,))
491 p.start()
492
493 # check that all expected items are in the queue
494 for i in range(20):
495 self.assertEqual(queue.get(), i)
496 self.assertRaises(Queue.Empty, queue.get, False)
497
498 p.join()
499
500 def test_qsize(self):
501 q = self.Queue()
502 try:
503 self.assertEqual(q.qsize(), 0)
504 except NotImplementedError:
505 return
506 q.put(1)
507 self.assertEqual(q.qsize(), 1)
508 q.put(5)
509 self.assertEqual(q.qsize(), 2)
510 q.get()
511 self.assertEqual(q.qsize(), 1)
512 q.get()
513 self.assertEqual(q.qsize(), 0)
514
515 def _test_task_done(self, q):
516 for obj in iter(q.get, None):
517 time.sleep(DELTA)
518 q.task_done()
519
520 def test_task_done(self):
521 queue = self.JoinableQueue()
522
523 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000524 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000525
526 workers = [self.Process(target=self._test_task_done, args=(queue,))
527 for i in xrange(4)]
528
529 for p in workers:
530 p.start()
531
532 for i in xrange(10):
533 queue.put(i)
534
535 queue.join()
536
537 for p in workers:
538 queue.put(None)
539
540 for p in workers:
541 p.join()
542
543#
544#
545#
546
547class _TestLock(BaseTestCase):
548
549 def test_lock(self):
550 lock = self.Lock()
551 self.assertEqual(lock.acquire(), True)
552 self.assertEqual(lock.acquire(False), False)
553 self.assertEqual(lock.release(), None)
554 self.assertRaises((ValueError, threading.ThreadError), lock.release)
555
556 def test_rlock(self):
557 lock = self.RLock()
558 self.assertEqual(lock.acquire(), True)
559 self.assertEqual(lock.acquire(), True)
560 self.assertEqual(lock.acquire(), True)
561 self.assertEqual(lock.release(), None)
562 self.assertEqual(lock.release(), None)
563 self.assertEqual(lock.release(), None)
564 self.assertRaises((AssertionError, RuntimeError), lock.release)
565
Jesse Noller82eb5902009-03-30 23:29:31 +0000566 def test_lock_context(self):
567 with self.Lock():
568 pass
569
Benjamin Petersondfd79492008-06-13 19:13:39 +0000570
571class _TestSemaphore(BaseTestCase):
572
573 def _test_semaphore(self, sem):
574 self.assertReturnsIfImplemented(2, get_value, sem)
575 self.assertEqual(sem.acquire(), True)
576 self.assertReturnsIfImplemented(1, get_value, sem)
577 self.assertEqual(sem.acquire(), True)
578 self.assertReturnsIfImplemented(0, get_value, sem)
579 self.assertEqual(sem.acquire(False), False)
580 self.assertReturnsIfImplemented(0, get_value, sem)
581 self.assertEqual(sem.release(), None)
582 self.assertReturnsIfImplemented(1, get_value, sem)
583 self.assertEqual(sem.release(), None)
584 self.assertReturnsIfImplemented(2, get_value, sem)
585
586 def test_semaphore(self):
587 sem = self.Semaphore(2)
588 self._test_semaphore(sem)
589 self.assertEqual(sem.release(), None)
590 self.assertReturnsIfImplemented(3, get_value, sem)
591 self.assertEqual(sem.release(), None)
592 self.assertReturnsIfImplemented(4, get_value, sem)
593
594 def test_bounded_semaphore(self):
595 sem = self.BoundedSemaphore(2)
596 self._test_semaphore(sem)
597 # Currently fails on OS/X
598 #if HAVE_GETVALUE:
599 # self.assertRaises(ValueError, sem.release)
600 # self.assertReturnsIfImplemented(2, get_value, sem)
601
602 def test_timeout(self):
603 if self.TYPE != 'processes':
604 return
605
606 sem = self.Semaphore(0)
607 acquire = TimingWrapper(sem.acquire)
608
609 self.assertEqual(acquire(False), False)
610 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
611
612 self.assertEqual(acquire(False, None), False)
613 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
614
615 self.assertEqual(acquire(False, TIMEOUT1), False)
616 self.assertTimingAlmostEqual(acquire.elapsed, 0)
617
618 self.assertEqual(acquire(True, TIMEOUT2), False)
619 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
620
621 self.assertEqual(acquire(timeout=TIMEOUT3), False)
622 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
623
624
625class _TestCondition(BaseTestCase):
626
627 def f(self, cond, sleeping, woken, timeout=None):
628 cond.acquire()
629 sleeping.release()
630 cond.wait(timeout)
631 woken.release()
632 cond.release()
633
634 def check_invariant(self, cond):
635 # this is only supposed to succeed when there are no sleepers
636 if self.TYPE == 'processes':
637 try:
638 sleepers = (cond._sleeping_count.get_value() -
639 cond._woken_count.get_value())
640 self.assertEqual(sleepers, 0)
641 self.assertEqual(cond._wait_semaphore.get_value(), 0)
642 except NotImplementedError:
643 pass
644
645 def test_notify(self):
646 cond = self.Condition()
647 sleeping = self.Semaphore(0)
648 woken = self.Semaphore(0)
649
650 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000651 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000652 p.start()
653
654 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000655 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000656 p.start()
657
658 # wait for both children to start sleeping
659 sleeping.acquire()
660 sleeping.acquire()
661
662 # check no process/thread has woken up
663 time.sleep(DELTA)
664 self.assertReturnsIfImplemented(0, get_value, woken)
665
666 # wake up one process/thread
667 cond.acquire()
668 cond.notify()
669 cond.release()
670
671 # check one process/thread has woken up
672 time.sleep(DELTA)
673 self.assertReturnsIfImplemented(1, get_value, woken)
674
675 # wake up another
676 cond.acquire()
677 cond.notify()
678 cond.release()
679
680 # check other has woken up
681 time.sleep(DELTA)
682 self.assertReturnsIfImplemented(2, get_value, woken)
683
684 # check state is not mucked up
685 self.check_invariant(cond)
686 p.join()
687
688 def test_notify_all(self):
689 cond = self.Condition()
690 sleeping = self.Semaphore(0)
691 woken = self.Semaphore(0)
692
693 # start some threads/processes which will timeout
694 for i in range(3):
695 p = self.Process(target=self.f,
696 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000697 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000698 p.start()
699
700 t = threading.Thread(target=self.f,
701 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000702 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000703 t.start()
704
705 # wait for them all to sleep
706 for i in xrange(6):
707 sleeping.acquire()
708
709 # check they have all timed out
710 for i in xrange(6):
711 woken.acquire()
712 self.assertReturnsIfImplemented(0, get_value, woken)
713
714 # check state is not mucked up
715 self.check_invariant(cond)
716
717 # start some more threads/processes
718 for i in range(3):
719 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000720 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000721 p.start()
722
723 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000724 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000725 t.start()
726
727 # wait for them to all sleep
728 for i in xrange(6):
729 sleeping.acquire()
730
731 # check no process/thread has woken up
732 time.sleep(DELTA)
733 self.assertReturnsIfImplemented(0, get_value, woken)
734
735 # wake them all up
736 cond.acquire()
737 cond.notify_all()
738 cond.release()
739
740 # check they have all woken
741 time.sleep(DELTA)
742 self.assertReturnsIfImplemented(6, get_value, woken)
743
744 # check state is not mucked up
745 self.check_invariant(cond)
746
747 def test_timeout(self):
748 cond = self.Condition()
749 wait = TimingWrapper(cond.wait)
750 cond.acquire()
751 res = wait(TIMEOUT1)
752 cond.release()
753 self.assertEqual(res, None)
754 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
755
756
757class _TestEvent(BaseTestCase):
758
759 def _test_event(self, event):
760 time.sleep(TIMEOUT2)
761 event.set()
762
763 def test_event(self):
764 event = self.Event()
765 wait = TimingWrapper(event.wait)
766
767 # Removed temporaily, due to API shear, this does not
768 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000769 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000770
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000771 # Removed, threading.Event.wait() will return the value of the __flag
772 # instead of None. API Shear with the semaphore backed mp.Event
773 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000774 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000775 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000776 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
777
778 event.set()
779
780 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000781 self.assertEqual(event.is_set(), True)
782 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000783 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000784 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000785 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
786 # self.assertEqual(event.is_set(), True)
787
788 event.clear()
789
790 #self.assertEqual(event.is_set(), False)
791
792 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000793 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000794
795#
796#
797#
798
Brian Curtina06e9b82010-10-07 02:27:41 +0000799@unittest.skipUnless(HAS_SHAREDCTYPES,
800 "requires multiprocessing.sharedctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801class _TestValue(BaseTestCase):
802
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000803 ALLOWED_TYPES = ('processes',)
804
Benjamin Petersondfd79492008-06-13 19:13:39 +0000805 codes_values = [
806 ('i', 4343, 24234),
807 ('d', 3.625, -4.25),
808 ('h', -232, 234),
809 ('c', latin('x'), latin('y'))
810 ]
811
812 def _test(self, values):
813 for sv, cv in zip(values, self.codes_values):
814 sv.value = cv[2]
815
816
817 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000818 if raw:
819 values = [self.RawValue(code, value)
820 for code, value, _ in self.codes_values]
821 else:
822 values = [self.Value(code, value)
823 for code, value, _ in self.codes_values]
824
825 for sv, cv in zip(values, self.codes_values):
826 self.assertEqual(sv.value, cv[1])
827
828 proc = self.Process(target=self._test, args=(values,))
829 proc.start()
830 proc.join()
831
832 for sv, cv in zip(values, self.codes_values):
833 self.assertEqual(sv.value, cv[2])
834
835 def test_rawvalue(self):
836 self.test_value(raw=True)
837
838 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000839 val1 = self.Value('i', 5)
840 lock1 = val1.get_lock()
841 obj1 = val1.get_obj()
842
843 val2 = self.Value('i', 5, lock=None)
844 lock2 = val2.get_lock()
845 obj2 = val2.get_obj()
846
847 lock = self.Lock()
848 val3 = self.Value('i', 5, lock=lock)
849 lock3 = val3.get_lock()
850 obj3 = val3.get_obj()
851 self.assertEqual(lock, lock3)
852
Jesse Noller6ab22152009-01-18 02:45:38 +0000853 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000854 self.assertFalse(hasattr(arr4, 'get_lock'))
855 self.assertFalse(hasattr(arr4, 'get_obj'))
856
Jesse Noller6ab22152009-01-18 02:45:38 +0000857 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
858
859 arr5 = self.RawValue('i', 5)
860 self.assertFalse(hasattr(arr5, 'get_lock'))
861 self.assertFalse(hasattr(arr5, 'get_obj'))
862
Benjamin Petersondfd79492008-06-13 19:13:39 +0000863
864class _TestArray(BaseTestCase):
865
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000866 ALLOWED_TYPES = ('processes',)
867
Benjamin Petersondfd79492008-06-13 19:13:39 +0000868 def f(self, seq):
869 for i in range(1, len(seq)):
870 seq[i] += seq[i-1]
871
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000872 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000873 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000874 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
875 if raw:
876 arr = self.RawArray('i', seq)
877 else:
878 arr = self.Array('i', seq)
879
880 self.assertEqual(len(arr), len(seq))
881 self.assertEqual(arr[3], seq[3])
882 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
883
884 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
885
886 self.assertEqual(list(arr[:]), seq)
887
888 self.f(seq)
889
890 p = self.Process(target=self.f, args=(arr,))
891 p.start()
892 p.join()
893
894 self.assertEqual(list(arr[:]), seq)
895
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000896 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000897 def test_rawarray(self):
898 self.test_array(raw=True)
899
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000900 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000901 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000902 arr1 = self.Array('i', range(10))
903 lock1 = arr1.get_lock()
904 obj1 = arr1.get_obj()
905
906 arr2 = self.Array('i', range(10), lock=None)
907 lock2 = arr2.get_lock()
908 obj2 = arr2.get_obj()
909
910 lock = self.Lock()
911 arr3 = self.Array('i', range(10), lock=lock)
912 lock3 = arr3.get_lock()
913 obj3 = arr3.get_obj()
914 self.assertEqual(lock, lock3)
915
Jesse Noller6ab22152009-01-18 02:45:38 +0000916 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000917 self.assertFalse(hasattr(arr4, 'get_lock'))
918 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000919 self.assertRaises(AttributeError,
920 self.Array, 'i', range(10), lock='notalock')
921
922 arr5 = self.RawArray('i', range(10))
923 self.assertFalse(hasattr(arr5, 'get_lock'))
924 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000925
926#
927#
928#
929
930class _TestContainers(BaseTestCase):
931
932 ALLOWED_TYPES = ('manager',)
933
934 def test_list(self):
935 a = self.list(range(10))
936 self.assertEqual(a[:], range(10))
937
938 b = self.list()
939 self.assertEqual(b[:], [])
940
941 b.extend(range(5))
942 self.assertEqual(b[:], range(5))
943
944 self.assertEqual(b[2], 2)
945 self.assertEqual(b[2:10], [2,3,4])
946
947 b *= 2
948 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
949
950 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
951
952 self.assertEqual(a[:], range(10))
953
954 d = [a, b]
955 e = self.list(d)
956 self.assertEqual(
957 e[:],
958 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
959 )
960
961 f = self.list([a])
962 a.append('hello')
963 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
964
965 def test_dict(self):
966 d = self.dict()
967 indices = range(65, 70)
968 for i in indices:
969 d[i] = chr(i)
970 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
971 self.assertEqual(sorted(d.keys()), indices)
972 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
973 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
974
975 def test_namespace(self):
976 n = self.Namespace()
977 n.name = 'Bob'
978 n.job = 'Builder'
979 n._hidden = 'hidden'
980 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
981 del n.job
982 self.assertEqual(str(n), "Namespace(name='Bob')")
983 self.assertTrue(hasattr(n, 'name'))
984 self.assertTrue(not hasattr(n, 'job'))
985
986#
987#
988#
989
990def sqr(x, wait=0.0):
991 time.sleep(wait)
992 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000993class _TestPool(BaseTestCase):
994
995 def test_apply(self):
996 papply = self.pool.apply
997 self.assertEqual(papply(sqr, (5,)), sqr(5))
998 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
999
1000 def test_map(self):
1001 pmap = self.pool.map
1002 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1003 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1004 map(sqr, range(100)))
1005
Jesse Noller7530e472009-07-16 14:23:04 +00001006 def test_map_chunksize(self):
1007 try:
1008 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1009 except multiprocessing.TimeoutError:
1010 self.fail("pool.map_async with chunksize stalled on null list")
1011
Benjamin Petersondfd79492008-06-13 19:13:39 +00001012 def test_async(self):
1013 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1014 get = TimingWrapper(res.get)
1015 self.assertEqual(get(), 49)
1016 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1017
1018 def test_async_timeout(self):
1019 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1020 get = TimingWrapper(res.get)
1021 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1022 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1023
1024 def test_imap(self):
1025 it = self.pool.imap(sqr, range(10))
1026 self.assertEqual(list(it), map(sqr, range(10)))
1027
1028 it = self.pool.imap(sqr, range(10))
1029 for i in range(10):
1030 self.assertEqual(it.next(), i*i)
1031 self.assertRaises(StopIteration, it.next)
1032
1033 it = self.pool.imap(sqr, range(1000), chunksize=100)
1034 for i in range(1000):
1035 self.assertEqual(it.next(), i*i)
1036 self.assertRaises(StopIteration, it.next)
1037
1038 def test_imap_unordered(self):
1039 it = self.pool.imap_unordered(sqr, range(1000))
1040 self.assertEqual(sorted(it), map(sqr, range(1000)))
1041
1042 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1043 self.assertEqual(sorted(it), map(sqr, range(1000)))
1044
1045 def test_make_pool(self):
1046 p = multiprocessing.Pool(3)
1047 self.assertEqual(3, len(p._pool))
1048 p.close()
1049 p.join()
1050
1051 def test_terminate(self):
1052 if self.TYPE == 'manager':
1053 # On Unix a forked process increfs each shared object to
1054 # which its parent process held a reference. If the
1055 # forked process gets terminated then there is likely to
1056 # be a reference leak. So to prevent
1057 # _TestZZZNumberOfObjects from failing we skip this test
1058 # when using a manager.
1059 return
1060
1061 result = self.pool.map_async(
1062 time.sleep, [0.1 for i in range(10000)], chunksize=1
1063 )
1064 self.pool.terminate()
1065 join = TimingWrapper(self.pool.join)
1066 join()
1067 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001068
1069class _TestPoolWorkerLifetime(BaseTestCase):
1070
1071 ALLOWED_TYPES = ('processes', )
1072 def test_pool_worker_lifetime(self):
1073 p = multiprocessing.Pool(3, maxtasksperchild=10)
1074 self.assertEqual(3, len(p._pool))
1075 origworkerpids = [w.pid for w in p._pool]
1076 # Run many tasks so each worker gets replaced (hopefully)
1077 results = []
1078 for i in range(100):
1079 results.append(p.apply_async(sqr, (i, )))
1080 # Fetch the results and verify we got the right answers,
1081 # also ensuring all the tasks have completed.
1082 for (j, res) in enumerate(results):
1083 self.assertEqual(res.get(), sqr(j))
1084 # Refill the pool
1085 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001086 # Wait until all workers are alive
1087 countdown = 5
1088 while countdown and not all(w.is_alive() for w in p._pool):
1089 countdown -= 1
1090 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001091 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001092 # All pids should be assigned. See issue #7805.
1093 self.assertNotIn(None, origworkerpids)
1094 self.assertNotIn(None, finalworkerpids)
1095 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001096 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1097 p.close()
1098 p.join()
1099
Benjamin Petersondfd79492008-06-13 19:13:39 +00001100#
1101# Test that manager has expected number of shared objects left
1102#
1103
1104class _TestZZZNumberOfObjects(BaseTestCase):
1105 # Because test cases are sorted alphabetically, this one will get
1106 # run after all the other tests for the manager. It tests that
1107 # there have been no "reference leaks" for the manager's shared
1108 # objects. Note the comment in _TestPool.test_terminate().
1109 ALLOWED_TYPES = ('manager',)
1110
1111 def test_number_of_objects(self):
1112 EXPECTED_NUMBER = 1 # the pool object is still alive
1113 multiprocessing.active_children() # discard dead process objs
1114 gc.collect() # do garbage collection
1115 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001116 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001117 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001118 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001119 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001120
1121 self.assertEqual(refs, EXPECTED_NUMBER)
1122
1123#
1124# Test of creating a customized manager class
1125#
1126
1127from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1128
1129class FooBar(object):
1130 def f(self):
1131 return 'f()'
1132 def g(self):
1133 raise ValueError
1134 def _h(self):
1135 return '_h()'
1136
1137def baz():
1138 for i in xrange(10):
1139 yield i*i
1140
1141class IteratorProxy(BaseProxy):
1142 _exposed_ = ('next', '__next__')
1143 def __iter__(self):
1144 return self
1145 def next(self):
1146 return self._callmethod('next')
1147 def __next__(self):
1148 return self._callmethod('__next__')
1149
1150class MyManager(BaseManager):
1151 pass
1152
1153MyManager.register('Foo', callable=FooBar)
1154MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1155MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1156
1157
1158class _TestMyManager(BaseTestCase):
1159
1160 ALLOWED_TYPES = ('manager',)
1161
1162 def test_mymanager(self):
1163 manager = MyManager()
1164 manager.start()
1165
1166 foo = manager.Foo()
1167 bar = manager.Bar()
1168 baz = manager.baz()
1169
1170 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1171 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1172
1173 self.assertEqual(foo_methods, ['f', 'g'])
1174 self.assertEqual(bar_methods, ['f', '_h'])
1175
1176 self.assertEqual(foo.f(), 'f()')
1177 self.assertRaises(ValueError, foo.g)
1178 self.assertEqual(foo._callmethod('f'), 'f()')
1179 self.assertRaises(RemoteError, foo._callmethod, '_h')
1180
1181 self.assertEqual(bar.f(), 'f()')
1182 self.assertEqual(bar._h(), '_h()')
1183 self.assertEqual(bar._callmethod('f'), 'f()')
1184 self.assertEqual(bar._callmethod('_h'), '_h()')
1185
1186 self.assertEqual(list(baz), [i*i for i in range(10)])
1187
1188 manager.shutdown()
1189
1190#
1191# Test of connecting to a remote server and using xmlrpclib for serialization
1192#
1193
1194_queue = Queue.Queue()
1195def get_queue():
1196 return _queue
1197
1198class QueueManager(BaseManager):
1199 '''manager class used by server process'''
1200QueueManager.register('get_queue', callable=get_queue)
1201
1202class QueueManager2(BaseManager):
1203 '''manager class which specifies the same interface as QueueManager'''
1204QueueManager2.register('get_queue')
1205
1206
1207SERIALIZER = 'xmlrpclib'
1208
1209class _TestRemoteManager(BaseTestCase):
1210
1211 ALLOWED_TYPES = ('manager',)
1212
1213 def _putter(self, address, authkey):
1214 manager = QueueManager2(
1215 address=address, authkey=authkey, serializer=SERIALIZER
1216 )
1217 manager.connect()
1218 queue = manager.get_queue()
1219 queue.put(('hello world', None, True, 2.25))
1220
1221 def test_remote(self):
1222 authkey = os.urandom(32)
1223
1224 manager = QueueManager(
1225 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1226 )
1227 manager.start()
1228
1229 p = self.Process(target=self._putter, args=(manager.address, authkey))
1230 p.start()
1231
1232 manager2 = QueueManager2(
1233 address=manager.address, authkey=authkey, serializer=SERIALIZER
1234 )
1235 manager2.connect()
1236 queue = manager2.get_queue()
1237
1238 # Note that xmlrpclib will deserialize object as a list not a tuple
1239 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1240
1241 # Because we are using xmlrpclib for serialization instead of
1242 # pickle this will cause a serialization error.
1243 self.assertRaises(Exception, queue.put, time.sleep)
1244
1245 # Make queue finalizer run before the server is stopped
1246 del queue
1247 manager.shutdown()
1248
Jesse Noller459a6482009-03-30 15:50:42 +00001249class _TestManagerRestart(BaseTestCase):
1250
1251 def _putter(self, address, authkey):
1252 manager = QueueManager(
1253 address=address, authkey=authkey, serializer=SERIALIZER)
1254 manager.connect()
1255 queue = manager.get_queue()
1256 queue.put('hello world')
1257
1258 def test_rapid_restart(self):
1259 authkey = os.urandom(32)
1260 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001261 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1262 addr = manager.get_server().address
Jesse Noller459a6482009-03-30 15:50:42 +00001263 manager.start()
1264
1265 p = self.Process(target=self._putter, args=(manager.address, authkey))
1266 p.start()
1267 queue = manager.get_queue()
1268 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001269 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001270 manager.shutdown()
1271 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001272 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001273 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001274 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001275
Benjamin Petersondfd79492008-06-13 19:13:39 +00001276#
1277#
1278#
1279
1280SENTINEL = latin('')
1281
1282class _TestConnection(BaseTestCase):
1283
1284 ALLOWED_TYPES = ('processes', 'threads')
1285
1286 def _echo(self, conn):
1287 for msg in iter(conn.recv_bytes, SENTINEL):
1288 conn.send_bytes(msg)
1289 conn.close()
1290
1291 def test_connection(self):
1292 conn, child_conn = self.Pipe()
1293
1294 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001295 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001296 p.start()
1297
1298 seq = [1, 2.25, None]
1299 msg = latin('hello world')
1300 longmsg = msg * 10
1301 arr = array.array('i', range(4))
1302
1303 if self.TYPE == 'processes':
1304 self.assertEqual(type(conn.fileno()), int)
1305
1306 self.assertEqual(conn.send(seq), None)
1307 self.assertEqual(conn.recv(), seq)
1308
1309 self.assertEqual(conn.send_bytes(msg), None)
1310 self.assertEqual(conn.recv_bytes(), msg)
1311
1312 if self.TYPE == 'processes':
1313 buffer = array.array('i', [0]*10)
1314 expected = list(arr) + [0] * (10 - len(arr))
1315 self.assertEqual(conn.send_bytes(arr), None)
1316 self.assertEqual(conn.recv_bytes_into(buffer),
1317 len(arr) * buffer.itemsize)
1318 self.assertEqual(list(buffer), expected)
1319
1320 buffer = array.array('i', [0]*10)
1321 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1322 self.assertEqual(conn.send_bytes(arr), None)
1323 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1324 len(arr) * buffer.itemsize)
1325 self.assertEqual(list(buffer), expected)
1326
1327 buffer = bytearray(latin(' ' * 40))
1328 self.assertEqual(conn.send_bytes(longmsg), None)
1329 try:
1330 res = conn.recv_bytes_into(buffer)
1331 except multiprocessing.BufferTooShort, e:
1332 self.assertEqual(e.args, (longmsg,))
1333 else:
1334 self.fail('expected BufferTooShort, got %s' % res)
1335
1336 poll = TimingWrapper(conn.poll)
1337
1338 self.assertEqual(poll(), False)
1339 self.assertTimingAlmostEqual(poll.elapsed, 0)
1340
1341 self.assertEqual(poll(TIMEOUT1), False)
1342 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1343
1344 conn.send(None)
1345
1346 self.assertEqual(poll(TIMEOUT1), True)
1347 self.assertTimingAlmostEqual(poll.elapsed, 0)
1348
1349 self.assertEqual(conn.recv(), None)
1350
1351 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1352 conn.send_bytes(really_big_msg)
1353 self.assertEqual(conn.recv_bytes(), really_big_msg)
1354
1355 conn.send_bytes(SENTINEL) # tell child to quit
1356 child_conn.close()
1357
1358 if self.TYPE == 'processes':
1359 self.assertEqual(conn.readable, True)
1360 self.assertEqual(conn.writable, True)
1361 self.assertRaises(EOFError, conn.recv)
1362 self.assertRaises(EOFError, conn.recv_bytes)
1363
1364 p.join()
1365
1366 def test_duplex_false(self):
1367 reader, writer = self.Pipe(duplex=False)
1368 self.assertEqual(writer.send(1), None)
1369 self.assertEqual(reader.recv(), 1)
1370 if self.TYPE == 'processes':
1371 self.assertEqual(reader.readable, True)
1372 self.assertEqual(reader.writable, False)
1373 self.assertEqual(writer.readable, False)
1374 self.assertEqual(writer.writable, True)
1375 self.assertRaises(IOError, reader.send, 2)
1376 self.assertRaises(IOError, writer.recv)
1377 self.assertRaises(IOError, writer.poll)
1378
1379 def test_spawn_close(self):
1380 # We test that a pipe connection can be closed by parent
1381 # process immediately after child is spawned. On Windows this
1382 # would have sometimes failed on old versions because
1383 # child_conn would be closed before the child got a chance to
1384 # duplicate it.
1385 conn, child_conn = self.Pipe()
1386
1387 p = self.Process(target=self._echo, args=(child_conn,))
1388 p.start()
1389 child_conn.close() # this might complete before child initializes
1390
1391 msg = latin('hello')
1392 conn.send_bytes(msg)
1393 self.assertEqual(conn.recv_bytes(), msg)
1394
1395 conn.send_bytes(SENTINEL)
1396 conn.close()
1397 p.join()
1398
1399 def test_sendbytes(self):
1400 if self.TYPE != 'processes':
1401 return
1402
1403 msg = latin('abcdefghijklmnopqrstuvwxyz')
1404 a, b = self.Pipe()
1405
1406 a.send_bytes(msg)
1407 self.assertEqual(b.recv_bytes(), msg)
1408
1409 a.send_bytes(msg, 5)
1410 self.assertEqual(b.recv_bytes(), msg[5:])
1411
1412 a.send_bytes(msg, 7, 8)
1413 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1414
1415 a.send_bytes(msg, 26)
1416 self.assertEqual(b.recv_bytes(), latin(''))
1417
1418 a.send_bytes(msg, 26, 0)
1419 self.assertEqual(b.recv_bytes(), latin(''))
1420
1421 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1422
1423 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1424
1425 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1426
1427 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1428
1429 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1430
Benjamin Petersondfd79492008-06-13 19:13:39 +00001431class _TestListenerClient(BaseTestCase):
1432
1433 ALLOWED_TYPES = ('processes', 'threads')
1434
1435 def _test(self, address):
1436 conn = self.connection.Client(address)
1437 conn.send('hello')
1438 conn.close()
1439
1440 def test_listener_client(self):
1441 for family in self.connection.families:
1442 l = self.connection.Listener(family=family)
1443 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001444 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001445 p.start()
1446 conn = l.accept()
1447 self.assertEqual(conn.recv(), 'hello')
1448 p.join()
1449 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001450#
1451# Test of sending connection and socket objects between processes
1452#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001453"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001454class _TestPicklingConnections(BaseTestCase):
1455
1456 ALLOWED_TYPES = ('processes',)
1457
1458 def _listener(self, conn, families):
1459 for fam in families:
1460 l = self.connection.Listener(family=fam)
1461 conn.send(l.address)
1462 new_conn = l.accept()
1463 conn.send(new_conn)
1464
1465 if self.TYPE == 'processes':
1466 l = socket.socket()
1467 l.bind(('localhost', 0))
1468 conn.send(l.getsockname())
1469 l.listen(1)
1470 new_conn, addr = l.accept()
1471 conn.send(new_conn)
1472
1473 conn.recv()
1474
1475 def _remote(self, conn):
1476 for (address, msg) in iter(conn.recv, None):
1477 client = self.connection.Client(address)
1478 client.send(msg.upper())
1479 client.close()
1480
1481 if self.TYPE == 'processes':
1482 address, msg = conn.recv()
1483 client = socket.socket()
1484 client.connect(address)
1485 client.sendall(msg.upper())
1486 client.close()
1487
1488 conn.close()
1489
1490 def test_pickling(self):
1491 try:
1492 multiprocessing.allow_connection_pickling()
1493 except ImportError:
1494 return
1495
1496 families = self.connection.families
1497
1498 lconn, lconn0 = self.Pipe()
1499 lp = self.Process(target=self._listener, args=(lconn0, families))
1500 lp.start()
1501 lconn0.close()
1502
1503 rconn, rconn0 = self.Pipe()
1504 rp = self.Process(target=self._remote, args=(rconn0,))
1505 rp.start()
1506 rconn0.close()
1507
1508 for fam in families:
1509 msg = ('This connection uses family %s' % fam).encode('ascii')
1510 address = lconn.recv()
1511 rconn.send((address, msg))
1512 new_conn = lconn.recv()
1513 self.assertEqual(new_conn.recv(), msg.upper())
1514
1515 rconn.send(None)
1516
1517 if self.TYPE == 'processes':
1518 msg = latin('This connection uses a normal socket')
1519 address = lconn.recv()
1520 rconn.send((address, msg))
1521 if hasattr(socket, 'fromfd'):
1522 new_conn = lconn.recv()
1523 self.assertEqual(new_conn.recv(100), msg.upper())
1524 else:
1525 # XXX On Windows with Py2.6 need to backport fromfd()
1526 discard = lconn.recv_bytes()
1527
1528 lconn.send(None)
1529
1530 rconn.close()
1531 lconn.close()
1532
1533 lp.join()
1534 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001535"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001536#
1537#
1538#
1539
1540class _TestHeap(BaseTestCase):
1541
1542 ALLOWED_TYPES = ('processes',)
1543
1544 def test_heap(self):
1545 iterations = 5000
1546 maxblocks = 50
1547 blocks = []
1548
1549 # create and destroy lots of blocks of different sizes
1550 for i in xrange(iterations):
1551 size = int(random.lognormvariate(0, 1) * 1000)
1552 b = multiprocessing.heap.BufferWrapper(size)
1553 blocks.append(b)
1554 if len(blocks) > maxblocks:
1555 i = random.randrange(maxblocks)
1556 del blocks[i]
1557
1558 # get the heap object
1559 heap = multiprocessing.heap.BufferWrapper._heap
1560
1561 # verify the state of the heap
1562 all = []
1563 occupied = 0
1564 for L in heap._len_to_seq.values():
1565 for arena, start, stop in L:
1566 all.append((heap._arenas.index(arena), start, stop,
1567 stop-start, 'free'))
1568 for arena, start, stop in heap._allocated_blocks:
1569 all.append((heap._arenas.index(arena), start, stop,
1570 stop-start, 'occupied'))
1571 occupied += (stop-start)
1572
1573 all.sort()
1574
1575 for i in range(len(all)-1):
1576 (arena, start, stop) = all[i][:3]
1577 (narena, nstart, nstop) = all[i+1][:3]
1578 self.assertTrue((arena != narena and nstart == 0) or
1579 (stop == nstart))
1580
1581#
1582#
1583#
1584
Benjamin Petersondfd79492008-06-13 19:13:39 +00001585class _Foo(Structure):
1586 _fields_ = [
1587 ('x', c_int),
1588 ('y', c_double)
1589 ]
1590
Brian Curtina06e9b82010-10-07 02:27:41 +00001591@unittest.skipUnless(HAS_SHAREDCTYPES,
1592 "requires multiprocessing.sharedctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001593class _TestSharedCTypes(BaseTestCase):
1594
1595 ALLOWED_TYPES = ('processes',)
1596
1597 def _double(self, x, y, foo, arr, string):
1598 x.value *= 2
1599 y.value *= 2
1600 foo.x *= 2
1601 foo.y *= 2
1602 string.value *= 2
1603 for i in range(len(arr)):
1604 arr[i] *= 2
1605
1606 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001607 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001608 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001609 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001610 arr = self.Array('d', range(10), lock=lock)
1611 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001612 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001613
1614 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1615 p.start()
1616 p.join()
1617
1618 self.assertEqual(x.value, 14)
1619 self.assertAlmostEqual(y.value, 2.0/3.0)
1620 self.assertEqual(foo.x, 6)
1621 self.assertAlmostEqual(foo.y, 4.0)
1622 for i in range(10):
1623 self.assertAlmostEqual(arr[i], i*2)
1624 self.assertEqual(string.value, latin('hellohello'))
1625
1626 def test_synchronize(self):
1627 self.test_sharedctypes(lock=True)
1628
1629 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001630 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001631 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001632 foo.x = 0
1633 foo.y = 0
1634 self.assertEqual(bar.x, 2)
1635 self.assertAlmostEqual(bar.y, 5.0)
1636
1637#
1638#
1639#
1640
1641class _TestFinalize(BaseTestCase):
1642
1643 ALLOWED_TYPES = ('processes',)
1644
1645 def _test_finalize(self, conn):
1646 class Foo(object):
1647 pass
1648
1649 a = Foo()
1650 util.Finalize(a, conn.send, args=('a',))
1651 del a # triggers callback for a
1652
1653 b = Foo()
1654 close_b = util.Finalize(b, conn.send, args=('b',))
1655 close_b() # triggers callback for b
1656 close_b() # does nothing because callback has already been called
1657 del b # does nothing because callback has already been called
1658
1659 c = Foo()
1660 util.Finalize(c, conn.send, args=('c',))
1661
1662 d10 = Foo()
1663 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1664
1665 d01 = Foo()
1666 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1667 d02 = Foo()
1668 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1669 d03 = Foo()
1670 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1671
1672 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1673
1674 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1675
1676 # call mutliprocessing's cleanup function then exit process without
1677 # garbage collecting locals
1678 util._exit_function()
1679 conn.close()
1680 os._exit(0)
1681
1682 def test_finalize(self):
1683 conn, child_conn = self.Pipe()
1684
1685 p = self.Process(target=self._test_finalize, args=(child_conn,))
1686 p.start()
1687 p.join()
1688
1689 result = [obj for obj in iter(conn.recv, 'STOP')]
1690 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1691
1692#
1693# Test that from ... import * works for each module
1694#
1695
1696class _TestImportStar(BaseTestCase):
1697
1698 ALLOWED_TYPES = ('processes',)
1699
1700 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001701 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001702 'multiprocessing', 'multiprocessing.connection',
1703 'multiprocessing.heap', 'multiprocessing.managers',
1704 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001705 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001706 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001707 ]
1708
1709 if c_int is not None:
1710 # This module requires _ctypes
1711 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001712
1713 for name in modules:
1714 __import__(name)
1715 mod = sys.modules[name]
1716
1717 for attr in getattr(mod, '__all__', ()):
1718 self.assertTrue(
1719 hasattr(mod, attr),
1720 '%r does not have attribute %r' % (mod, attr)
1721 )
1722
1723#
1724# Quick test that logging works -- does not test logging output
1725#
1726
1727class _TestLogging(BaseTestCase):
1728
1729 ALLOWED_TYPES = ('processes',)
1730
1731 def test_enable_logging(self):
1732 logger = multiprocessing.get_logger()
1733 logger.setLevel(util.SUBWARNING)
1734 self.assertTrue(logger is not None)
1735 logger.debug('this will not be printed')
1736 logger.info('nor will this')
1737 logger.setLevel(LOG_LEVEL)
1738
1739 def _test_level(self, conn):
1740 logger = multiprocessing.get_logger()
1741 conn.send(logger.getEffectiveLevel())
1742
1743 def test_level(self):
1744 LEVEL1 = 32
1745 LEVEL2 = 37
1746
1747 logger = multiprocessing.get_logger()
1748 root_logger = logging.getLogger()
1749 root_level = root_logger.level
1750
1751 reader, writer = multiprocessing.Pipe(duplex=False)
1752
1753 logger.setLevel(LEVEL1)
1754 self.Process(target=self._test_level, args=(writer,)).start()
1755 self.assertEqual(LEVEL1, reader.recv())
1756
1757 logger.setLevel(logging.NOTSET)
1758 root_logger.setLevel(LEVEL2)
1759 self.Process(target=self._test_level, args=(writer,)).start()
1760 self.assertEqual(LEVEL2, reader.recv())
1761
1762 root_logger.setLevel(root_level)
1763 logger.setLevel(level=LOG_LEVEL)
1764
Jesse Noller814d02d2009-11-21 14:38:23 +00001765
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001766# class _TestLoggingProcessName(BaseTestCase):
1767#
1768# def handle(self, record):
1769# assert record.processName == multiprocessing.current_process().name
1770# self.__handled = True
1771#
1772# def test_logging(self):
1773# handler = logging.Handler()
1774# handler.handle = self.handle
1775# self.__handled = False
1776# # Bypass getLogger() and side-effects
1777# logger = logging.getLoggerClass()(
1778# 'multiprocessing.test.TestLoggingProcessName')
1779# logger.addHandler(handler)
1780# logger.propagate = False
1781#
1782# logger.warn('foo')
1783# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001784
Benjamin Petersondfd79492008-06-13 19:13:39 +00001785#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001786# Test to verify handle verification, see issue 3321
1787#
1788
1789class TestInvalidHandle(unittest.TestCase):
1790
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001791 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001792 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001793 conn = _multiprocessing.Connection(44977608)
1794 self.assertRaises(IOError, conn.poll)
1795 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001796
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001797#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001798# Functions used to create test cases from the base ones in this module
1799#
1800
1801def get_attributes(Source, names):
1802 d = {}
1803 for name in names:
1804 obj = getattr(Source, name)
1805 if type(obj) == type(get_attributes):
1806 obj = staticmethod(obj)
1807 d[name] = obj
1808 return d
1809
1810def create_test_cases(Mixin, type):
1811 result = {}
1812 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001813 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001814
1815 for name in glob.keys():
1816 if name.startswith('_Test'):
1817 base = glob[name]
1818 if type in base.ALLOWED_TYPES:
1819 newname = 'With' + Type + name[1:]
1820 class Temp(base, unittest.TestCase, Mixin):
1821 pass
1822 result[newname] = Temp
1823 Temp.__name__ = newname
1824 Temp.__module__ = Mixin.__module__
1825 return result
1826
1827#
1828# Create test cases
1829#
1830
1831class ProcessesMixin(object):
1832 TYPE = 'processes'
1833 Process = multiprocessing.Process
1834 locals().update(get_attributes(multiprocessing, (
1835 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1836 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1837 'RawArray', 'current_process', 'active_children', 'Pipe',
1838 'connection', 'JoinableQueue'
1839 )))
1840
1841testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1842globals().update(testcases_processes)
1843
1844
1845class ManagerMixin(object):
1846 TYPE = 'manager'
1847 Process = multiprocessing.Process
1848 manager = object.__new__(multiprocessing.managers.SyncManager)
1849 locals().update(get_attributes(manager, (
1850 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1851 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1852 'Namespace', 'JoinableQueue'
1853 )))
1854
1855testcases_manager = create_test_cases(ManagerMixin, type='manager')
1856globals().update(testcases_manager)
1857
1858
1859class ThreadsMixin(object):
1860 TYPE = 'threads'
1861 Process = multiprocessing.dummy.Process
1862 locals().update(get_attributes(multiprocessing.dummy, (
1863 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1864 'Condition', 'Event', 'Value', 'Array', 'current_process',
1865 'active_children', 'Pipe', 'connection', 'dict', 'list',
1866 'Namespace', 'JoinableQueue'
1867 )))
1868
1869testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1870globals().update(testcases_threads)
1871
Neal Norwitz0c519b32008-08-25 01:50:24 +00001872class OtherTest(unittest.TestCase):
1873 # TODO: add more tests for deliver/answer challenge.
1874 def test_deliver_challenge_auth_failure(self):
1875 class _FakeConnection(object):
1876 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001877 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001878 def send_bytes(self, data):
1879 pass
1880 self.assertRaises(multiprocessing.AuthenticationError,
1881 multiprocessing.connection.deliver_challenge,
1882 _FakeConnection(), b'abc')
1883
1884 def test_answer_challenge_auth_failure(self):
1885 class _FakeConnection(object):
1886 def __init__(self):
1887 self.count = 0
1888 def recv_bytes(self, size):
1889 self.count += 1
1890 if self.count == 1:
1891 return multiprocessing.connection.CHALLENGE
1892 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001893 return b'something bogus'
1894 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001895 def send_bytes(self, data):
1896 pass
1897 self.assertRaises(multiprocessing.AuthenticationError,
1898 multiprocessing.connection.answer_challenge,
1899 _FakeConnection(), b'abc')
1900
Jesse Noller7152f6d2009-04-02 05:17:26 +00001901#
1902# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1903#
1904
1905def initializer(ns):
1906 ns.test += 1
1907
1908class TestInitializers(unittest.TestCase):
1909 def setUp(self):
1910 self.mgr = multiprocessing.Manager()
1911 self.ns = self.mgr.Namespace()
1912 self.ns.test = 0
1913
1914 def tearDown(self):
1915 self.mgr.shutdown()
1916
1917 def test_manager_initializer(self):
1918 m = multiprocessing.managers.SyncManager()
1919 self.assertRaises(TypeError, m.start, 1)
1920 m.start(initializer, (self.ns,))
1921 self.assertEqual(self.ns.test, 1)
1922 m.shutdown()
1923
1924 def test_pool_initializer(self):
1925 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1926 p = multiprocessing.Pool(1, initializer, (self.ns,))
1927 p.close()
1928 p.join()
1929 self.assertEqual(self.ns.test, 1)
1930
Jesse Noller1b90efb2009-06-30 17:11:52 +00001931#
1932# Issue 5155, 5313, 5331: Test process in processes
1933# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1934#
1935
1936def _ThisSubProcess(q):
1937 try:
1938 item = q.get(block=False)
1939 except Queue.Empty:
1940 pass
1941
1942def _TestProcess(q):
1943 queue = multiprocessing.Queue()
1944 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1945 subProc.start()
1946 subProc.join()
1947
1948def _afunc(x):
1949 return x*x
1950
1951def pool_in_process():
1952 pool = multiprocessing.Pool(processes=4)
1953 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1954
1955class _file_like(object):
1956 def __init__(self, delegate):
1957 self._delegate = delegate
1958 self._pid = None
1959
1960 @property
1961 def cache(self):
1962 pid = os.getpid()
1963 # There are no race conditions since fork keeps only the running thread
1964 if pid != self._pid:
1965 self._pid = pid
1966 self._cache = []
1967 return self._cache
1968
1969 def write(self, data):
1970 self.cache.append(data)
1971
1972 def flush(self):
1973 self._delegate.write(''.join(self.cache))
1974 self._cache = []
1975
1976class TestStdinBadfiledescriptor(unittest.TestCase):
1977
1978 def test_queue_in_process(self):
1979 queue = multiprocessing.Queue()
1980 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1981 proc.start()
1982 proc.join()
1983
1984 def test_pool_in_process(self):
1985 p = multiprocessing.Process(target=pool_in_process)
1986 p.start()
1987 p.join()
1988
1989 def test_flushing(self):
1990 sio = StringIO()
1991 flike = _file_like(sio)
1992 flike.write('foo')
1993 proc = multiprocessing.Process(target=lambda: flike.flush())
1994 flike.flush()
1995 assert sio.getvalue() == 'foo'
1996
1997testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1998 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001999
Benjamin Petersondfd79492008-06-13 19:13:39 +00002000#
2001#
2002#
2003
2004def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002005 if sys.platform.startswith("linux"):
2006 try:
2007 lock = multiprocessing.RLock()
2008 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002009 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002010
Benjamin Petersondfd79492008-06-13 19:13:39 +00002011 if run is None:
2012 from test.test_support import run_unittest as run
2013
2014 util.get_temp_dir() # creates temp directory for use by all processes
2015
2016 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2017
Jesse Noller146b7ab2008-07-02 16:44:09 +00002018 ProcessesMixin.pool = multiprocessing.Pool(4)
2019 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2020 ManagerMixin.manager.__init__()
2021 ManagerMixin.manager.start()
2022 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002023
2024 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002025 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2026 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002027 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2028 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002029 )
2030
2031 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2032 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002033 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2034 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002035 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002036 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002037 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002038 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2039 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2040 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002041 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002042
Jesse Noller146b7ab2008-07-02 16:44:09 +00002043 ThreadsMixin.pool.terminate()
2044 ProcessesMixin.pool.terminate()
2045 ManagerMixin.pool.terminate()
2046 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002047
Jesse Noller146b7ab2008-07-02 16:44:09 +00002048 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002049
2050def main():
2051 test_main(unittest.TextTestRunner(verbosity=2).run)
2052
2053if __name__ == '__main__':
2054 main()