blob: ac02d5ab250e4097e81cf47e5b2f98f83231474c [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Victor Stinner613b4cf2010-04-27 21:56:26 +000021# import threading after _multiprocessing to raise a more revelant error
22# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
Brian Curtina06e9b82010-10-07 02:27:41 +000037try:
38 from multiprocessing.sharedctypes import Value, copy
39 HAS_SHAREDCTYPES = True
40except ImportError:
41 HAS_SHAREDCTYPES = False
42
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44#
45#
46
Benjamin Petersone79edf52008-07-13 18:34:58 +000047latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000048
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000054#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000055
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersondfd79492008-06-13 19:13:39 +000071#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000072# Some tests require ctypes
73#
74
75try:
Nick Coghlan13623662010-04-10 14:24:36 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersondfd79492008-06-13 19:13:39 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
118#
119# Return the value of a semaphore
120#
121
122def get_value(self):
123 try:
124 return self.get_value()
125 except AttributeError:
126 try:
127 return self._Semaphore__value
128 except AttributeError:
129 try:
130 return self._value
131 except AttributeError:
132 raise NotImplementedError
133
134#
135# Testcases
136#
137
138class _TestProcess(BaseTestCase):
139
140 ALLOWED_TYPES = ('processes', 'threads')
141
142 def test_current(self):
143 if self.TYPE == 'threads':
144 return
145
146 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000147 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000148
149 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000150 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000151 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000152 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000153 self.assertEqual(current.ident, os.getpid())
154 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000155
156 def _test(self, q, *args, **kwds):
157 current = self.current_process()
158 q.put(args)
159 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000160 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000161 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000162 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000163 q.put(current.pid)
164
165 def test_process(self):
166 q = self.Queue(1)
167 e = self.Event()
168 args = (q, 1, 2)
169 kwargs = {'hello':23, 'bye':2.54}
170 name = 'SomeProcess'
171 p = self.Process(
172 target=self._test, args=args, kwargs=kwargs, name=name
173 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000174 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000175 current = self.current_process()
176
177 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000178 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000180 self.assertEquals(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000181 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000182 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000183 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184
185 p.start()
186
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188 self.assertEquals(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000189 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000190
191 self.assertEquals(q.get(), args[1:])
192 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000195 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196 self.assertEquals(q.get(), p.pid)
197
198 p.join()
199
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201 self.assertEquals(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000202 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203
204 def _test_terminate(self):
205 time.sleep(1000)
206
207 def test_terminate(self):
208 if self.TYPE == 'threads':
209 return
210
211 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000212 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000213 p.start()
214
215 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000216 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000217 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000218
219 p.terminate()
220
221 join = TimingWrapper(p.join)
222 self.assertEqual(join(), None)
223 self.assertTimingAlmostEqual(join.elapsed, 0.0)
224
225 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000226 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000227
228 p.join()
229
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000230 # XXX sometimes get p.exitcode == 0 on Windows ...
231 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000232
233 def test_cpu_count(self):
234 try:
235 cpus = multiprocessing.cpu_count()
236 except NotImplementedError:
237 cpus = 1
238 self.assertTrue(type(cpus) is int)
239 self.assertTrue(cpus >= 1)
240
241 def test_active_children(self):
242 self.assertEqual(type(self.active_children()), list)
243
244 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000245 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000246
247 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000248 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000249
250 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000251 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 def _test_recursion(self, wconn, id):
254 from multiprocessing import forking
255 wconn.send(id)
256 if len(id) < 2:
257 for i in range(2):
258 p = self.Process(
259 target=self._test_recursion, args=(wconn, id+[i])
260 )
261 p.start()
262 p.join()
263
264 def test_recursion(self):
265 rconn, wconn = self.Pipe(duplex=False)
266 self._test_recursion(wconn, [])
267
268 time.sleep(DELTA)
269 result = []
270 while rconn.poll():
271 result.append(rconn.recv())
272
273 expected = [
274 [],
275 [0],
276 [0, 0],
277 [0, 1],
278 [1],
279 [1, 0],
280 [1, 1]
281 ]
282 self.assertEqual(result, expected)
283
284#
285#
286#
287
288class _UpperCaser(multiprocessing.Process):
289
290 def __init__(self):
291 multiprocessing.Process.__init__(self)
292 self.child_conn, self.parent_conn = multiprocessing.Pipe()
293
294 def run(self):
295 self.parent_conn.close()
296 for s in iter(self.child_conn.recv, None):
297 self.child_conn.send(s.upper())
298 self.child_conn.close()
299
300 def submit(self, s):
301 assert type(s) is str
302 self.parent_conn.send(s)
303 return self.parent_conn.recv()
304
305 def stop(self):
306 self.parent_conn.send(None)
307 self.parent_conn.close()
308 self.child_conn.close()
309
310class _TestSubclassingProcess(BaseTestCase):
311
312 ALLOWED_TYPES = ('processes',)
313
314 def test_subclassing(self):
315 uppercaser = _UpperCaser()
316 uppercaser.start()
317 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
318 self.assertEqual(uppercaser.submit('world'), 'WORLD')
319 uppercaser.stop()
320 uppercaser.join()
321
322#
323#
324#
325
326def queue_empty(q):
327 if hasattr(q, 'empty'):
328 return q.empty()
329 else:
330 return q.qsize() == 0
331
332def queue_full(q, maxsize):
333 if hasattr(q, 'full'):
334 return q.full()
335 else:
336 return q.qsize() == maxsize
337
338
339class _TestQueue(BaseTestCase):
340
341
342 def _test_put(self, queue, child_can_start, parent_can_continue):
343 child_can_start.wait()
344 for i in range(6):
345 queue.get()
346 parent_can_continue.set()
347
348 def test_put(self):
349 MAXSIZE = 6
350 queue = self.Queue(maxsize=MAXSIZE)
351 child_can_start = self.Event()
352 parent_can_continue = self.Event()
353
354 proc = self.Process(
355 target=self._test_put,
356 args=(queue, child_can_start, parent_can_continue)
357 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000358 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000359 proc.start()
360
361 self.assertEqual(queue_empty(queue), True)
362 self.assertEqual(queue_full(queue, MAXSIZE), False)
363
364 queue.put(1)
365 queue.put(2, True)
366 queue.put(3, True, None)
367 queue.put(4, False)
368 queue.put(5, False, None)
369 queue.put_nowait(6)
370
371 # the values may be in buffer but not yet in pipe so sleep a bit
372 time.sleep(DELTA)
373
374 self.assertEqual(queue_empty(queue), False)
375 self.assertEqual(queue_full(queue, MAXSIZE), True)
376
377 put = TimingWrapper(queue.put)
378 put_nowait = TimingWrapper(queue.put_nowait)
379
380 self.assertRaises(Queue.Full, put, 7, False)
381 self.assertTimingAlmostEqual(put.elapsed, 0)
382
383 self.assertRaises(Queue.Full, put, 7, False, None)
384 self.assertTimingAlmostEqual(put.elapsed, 0)
385
386 self.assertRaises(Queue.Full, put_nowait, 7)
387 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
388
389 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
390 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
391
392 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
393 self.assertTimingAlmostEqual(put.elapsed, 0)
394
395 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
396 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
397
398 child_can_start.set()
399 parent_can_continue.wait()
400
401 self.assertEqual(queue_empty(queue), True)
402 self.assertEqual(queue_full(queue, MAXSIZE), False)
403
404 proc.join()
405
406 def _test_get(self, queue, child_can_start, parent_can_continue):
407 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000408 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000409 queue.put(2)
410 queue.put(3)
411 queue.put(4)
412 queue.put(5)
413 parent_can_continue.set()
414
415 def test_get(self):
416 queue = self.Queue()
417 child_can_start = self.Event()
418 parent_can_continue = self.Event()
419
420 proc = self.Process(
421 target=self._test_get,
422 args=(queue, child_can_start, parent_can_continue)
423 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000424 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000425 proc.start()
426
427 self.assertEqual(queue_empty(queue), True)
428
429 child_can_start.set()
430 parent_can_continue.wait()
431
432 time.sleep(DELTA)
433 self.assertEqual(queue_empty(queue), False)
434
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000435 # Hangs unexpectedly, remove for now
436 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 self.assertEqual(queue.get(True, None), 2)
438 self.assertEqual(queue.get(True), 3)
439 self.assertEqual(queue.get(timeout=1), 4)
440 self.assertEqual(queue.get_nowait(), 5)
441
442 self.assertEqual(queue_empty(queue), True)
443
444 get = TimingWrapper(queue.get)
445 get_nowait = TimingWrapper(queue.get_nowait)
446
447 self.assertRaises(Queue.Empty, get, False)
448 self.assertTimingAlmostEqual(get.elapsed, 0)
449
450 self.assertRaises(Queue.Empty, get, False, None)
451 self.assertTimingAlmostEqual(get.elapsed, 0)
452
453 self.assertRaises(Queue.Empty, get_nowait)
454 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
455
456 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
457 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
458
459 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
463 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
464
465 proc.join()
466
467 def _test_fork(self, queue):
468 for i in range(10, 20):
469 queue.put(i)
470 # note that at this point the items may only be buffered, so the
471 # process cannot shutdown until the feeder thread has finished
472 # pushing items onto the pipe.
473
474 def test_fork(self):
475 # Old versions of Queue would fail to create a new feeder
476 # thread for a forked process if the original process had its
477 # own feeder thread. This test checks that this no longer
478 # happens.
479
480 queue = self.Queue()
481
482 # put items on queue so that main process starts a feeder thread
483 for i in range(10):
484 queue.put(i)
485
486 # wait to make sure thread starts before we fork a new process
487 time.sleep(DELTA)
488
489 # fork process
490 p = self.Process(target=self._test_fork, args=(queue,))
491 p.start()
492
493 # check that all expected items are in the queue
494 for i in range(20):
495 self.assertEqual(queue.get(), i)
496 self.assertRaises(Queue.Empty, queue.get, False)
497
498 p.join()
499
500 def test_qsize(self):
501 q = self.Queue()
502 try:
503 self.assertEqual(q.qsize(), 0)
504 except NotImplementedError:
505 return
506 q.put(1)
507 self.assertEqual(q.qsize(), 1)
508 q.put(5)
509 self.assertEqual(q.qsize(), 2)
510 q.get()
511 self.assertEqual(q.qsize(), 1)
512 q.get()
513 self.assertEqual(q.qsize(), 0)
514
515 def _test_task_done(self, q):
516 for obj in iter(q.get, None):
517 time.sleep(DELTA)
518 q.task_done()
519
520 def test_task_done(self):
521 queue = self.JoinableQueue()
522
523 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000524 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000525
526 workers = [self.Process(target=self._test_task_done, args=(queue,))
527 for i in xrange(4)]
528
529 for p in workers:
530 p.start()
531
532 for i in xrange(10):
533 queue.put(i)
534
535 queue.join()
536
537 for p in workers:
538 queue.put(None)
539
540 for p in workers:
541 p.join()
542
543#
544#
545#
546
547class _TestLock(BaseTestCase):
548
549 def test_lock(self):
550 lock = self.Lock()
551 self.assertEqual(lock.acquire(), True)
552 self.assertEqual(lock.acquire(False), False)
553 self.assertEqual(lock.release(), None)
554 self.assertRaises((ValueError, threading.ThreadError), lock.release)
555
556 def test_rlock(self):
557 lock = self.RLock()
558 self.assertEqual(lock.acquire(), True)
559 self.assertEqual(lock.acquire(), True)
560 self.assertEqual(lock.acquire(), True)
561 self.assertEqual(lock.release(), None)
562 self.assertEqual(lock.release(), None)
563 self.assertEqual(lock.release(), None)
564 self.assertRaises((AssertionError, RuntimeError), lock.release)
565
Jesse Noller82eb5902009-03-30 23:29:31 +0000566 def test_lock_context(self):
567 with self.Lock():
568 pass
569
Benjamin Petersondfd79492008-06-13 19:13:39 +0000570
571class _TestSemaphore(BaseTestCase):
572
573 def _test_semaphore(self, sem):
574 self.assertReturnsIfImplemented(2, get_value, sem)
575 self.assertEqual(sem.acquire(), True)
576 self.assertReturnsIfImplemented(1, get_value, sem)
577 self.assertEqual(sem.acquire(), True)
578 self.assertReturnsIfImplemented(0, get_value, sem)
579 self.assertEqual(sem.acquire(False), False)
580 self.assertReturnsIfImplemented(0, get_value, sem)
581 self.assertEqual(sem.release(), None)
582 self.assertReturnsIfImplemented(1, get_value, sem)
583 self.assertEqual(sem.release(), None)
584 self.assertReturnsIfImplemented(2, get_value, sem)
585
586 def test_semaphore(self):
587 sem = self.Semaphore(2)
588 self._test_semaphore(sem)
589 self.assertEqual(sem.release(), None)
590 self.assertReturnsIfImplemented(3, get_value, sem)
591 self.assertEqual(sem.release(), None)
592 self.assertReturnsIfImplemented(4, get_value, sem)
593
594 def test_bounded_semaphore(self):
595 sem = self.BoundedSemaphore(2)
596 self._test_semaphore(sem)
597 # Currently fails on OS/X
598 #if HAVE_GETVALUE:
599 # self.assertRaises(ValueError, sem.release)
600 # self.assertReturnsIfImplemented(2, get_value, sem)
601
602 def test_timeout(self):
603 if self.TYPE != 'processes':
604 return
605
606 sem = self.Semaphore(0)
607 acquire = TimingWrapper(sem.acquire)
608
609 self.assertEqual(acquire(False), False)
610 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
611
612 self.assertEqual(acquire(False, None), False)
613 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
614
615 self.assertEqual(acquire(False, TIMEOUT1), False)
616 self.assertTimingAlmostEqual(acquire.elapsed, 0)
617
618 self.assertEqual(acquire(True, TIMEOUT2), False)
619 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
620
621 self.assertEqual(acquire(timeout=TIMEOUT3), False)
622 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
623
624
625class _TestCondition(BaseTestCase):
626
627 def f(self, cond, sleeping, woken, timeout=None):
628 cond.acquire()
629 sleeping.release()
630 cond.wait(timeout)
631 woken.release()
632 cond.release()
633
634 def check_invariant(self, cond):
635 # this is only supposed to succeed when there are no sleepers
636 if self.TYPE == 'processes':
637 try:
638 sleepers = (cond._sleeping_count.get_value() -
639 cond._woken_count.get_value())
640 self.assertEqual(sleepers, 0)
641 self.assertEqual(cond._wait_semaphore.get_value(), 0)
642 except NotImplementedError:
643 pass
644
645 def test_notify(self):
646 cond = self.Condition()
647 sleeping = self.Semaphore(0)
648 woken = self.Semaphore(0)
649
650 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000651 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000652 p.start()
653
654 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000655 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000656 p.start()
657
658 # wait for both children to start sleeping
659 sleeping.acquire()
660 sleeping.acquire()
661
662 # check no process/thread has woken up
663 time.sleep(DELTA)
664 self.assertReturnsIfImplemented(0, get_value, woken)
665
666 # wake up one process/thread
667 cond.acquire()
668 cond.notify()
669 cond.release()
670
671 # check one process/thread has woken up
672 time.sleep(DELTA)
673 self.assertReturnsIfImplemented(1, get_value, woken)
674
675 # wake up another
676 cond.acquire()
677 cond.notify()
678 cond.release()
679
680 # check other has woken up
681 time.sleep(DELTA)
682 self.assertReturnsIfImplemented(2, get_value, woken)
683
684 # check state is not mucked up
685 self.check_invariant(cond)
686 p.join()
687
688 def test_notify_all(self):
689 cond = self.Condition()
690 sleeping = self.Semaphore(0)
691 woken = self.Semaphore(0)
692
693 # start some threads/processes which will timeout
694 for i in range(3):
695 p = self.Process(target=self.f,
696 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000697 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000698 p.start()
699
700 t = threading.Thread(target=self.f,
701 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000702 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000703 t.start()
704
705 # wait for them all to sleep
706 for i in xrange(6):
707 sleeping.acquire()
708
709 # check they have all timed out
710 for i in xrange(6):
711 woken.acquire()
712 self.assertReturnsIfImplemented(0, get_value, woken)
713
714 # check state is not mucked up
715 self.check_invariant(cond)
716
717 # start some more threads/processes
718 for i in range(3):
719 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000720 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000721 p.start()
722
723 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000724 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000725 t.start()
726
727 # wait for them to all sleep
728 for i in xrange(6):
729 sleeping.acquire()
730
731 # check no process/thread has woken up
732 time.sleep(DELTA)
733 self.assertReturnsIfImplemented(0, get_value, woken)
734
735 # wake them all up
736 cond.acquire()
737 cond.notify_all()
738 cond.release()
739
740 # check they have all woken
741 time.sleep(DELTA)
742 self.assertReturnsIfImplemented(6, get_value, woken)
743
744 # check state is not mucked up
745 self.check_invariant(cond)
746
747 def test_timeout(self):
748 cond = self.Condition()
749 wait = TimingWrapper(cond.wait)
750 cond.acquire()
751 res = wait(TIMEOUT1)
752 cond.release()
753 self.assertEqual(res, None)
754 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
755
756
757class _TestEvent(BaseTestCase):
758
759 def _test_event(self, event):
760 time.sleep(TIMEOUT2)
761 event.set()
762
763 def test_event(self):
764 event = self.Event()
765 wait = TimingWrapper(event.wait)
766
767 # Removed temporaily, due to API shear, this does not
768 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000769 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000770
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000771 # Removed, threading.Event.wait() will return the value of the __flag
772 # instead of None. API Shear with the semaphore backed mp.Event
773 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000774 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000775 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000776 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
777
778 event.set()
779
780 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000781 self.assertEqual(event.is_set(), True)
782 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000783 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000784 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000785 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
786 # self.assertEqual(event.is_set(), True)
787
788 event.clear()
789
790 #self.assertEqual(event.is_set(), False)
791
792 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000793 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000794
795#
796#
797#
798
Brian Curtina06e9b82010-10-07 02:27:41 +0000799@unittest.skipUnless(HAS_SHAREDCTYPES,
800 "requires multiprocessing.sharedctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801class _TestValue(BaseTestCase):
802
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000803 ALLOWED_TYPES = ('processes',)
804
Benjamin Petersondfd79492008-06-13 19:13:39 +0000805 codes_values = [
806 ('i', 4343, 24234),
807 ('d', 3.625, -4.25),
808 ('h', -232, 234),
809 ('c', latin('x'), latin('y'))
810 ]
811
812 def _test(self, values):
813 for sv, cv in zip(values, self.codes_values):
814 sv.value = cv[2]
815
816
817 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000818 if raw:
819 values = [self.RawValue(code, value)
820 for code, value, _ in self.codes_values]
821 else:
822 values = [self.Value(code, value)
823 for code, value, _ in self.codes_values]
824
825 for sv, cv in zip(values, self.codes_values):
826 self.assertEqual(sv.value, cv[1])
827
828 proc = self.Process(target=self._test, args=(values,))
829 proc.start()
830 proc.join()
831
832 for sv, cv in zip(values, self.codes_values):
833 self.assertEqual(sv.value, cv[2])
834
835 def test_rawvalue(self):
836 self.test_value(raw=True)
837
838 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000839 val1 = self.Value('i', 5)
840 lock1 = val1.get_lock()
841 obj1 = val1.get_obj()
842
843 val2 = self.Value('i', 5, lock=None)
844 lock2 = val2.get_lock()
845 obj2 = val2.get_obj()
846
847 lock = self.Lock()
848 val3 = self.Value('i', 5, lock=lock)
849 lock3 = val3.get_lock()
850 obj3 = val3.get_obj()
851 self.assertEqual(lock, lock3)
852
Jesse Noller6ab22152009-01-18 02:45:38 +0000853 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000854 self.assertFalse(hasattr(arr4, 'get_lock'))
855 self.assertFalse(hasattr(arr4, 'get_obj'))
856
Jesse Noller6ab22152009-01-18 02:45:38 +0000857 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
858
859 arr5 = self.RawValue('i', 5)
860 self.assertFalse(hasattr(arr5, 'get_lock'))
861 self.assertFalse(hasattr(arr5, 'get_obj'))
862
Benjamin Petersondfd79492008-06-13 19:13:39 +0000863
864class _TestArray(BaseTestCase):
865
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000866 ALLOWED_TYPES = ('processes',)
867
Benjamin Petersondfd79492008-06-13 19:13:39 +0000868 def f(self, seq):
869 for i in range(1, len(seq)):
870 seq[i] += seq[i-1]
871
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000872 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000873 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000874 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
875 if raw:
876 arr = self.RawArray('i', seq)
877 else:
878 arr = self.Array('i', seq)
879
880 self.assertEqual(len(arr), len(seq))
881 self.assertEqual(arr[3], seq[3])
882 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
883
884 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
885
886 self.assertEqual(list(arr[:]), seq)
887
888 self.f(seq)
889
890 p = self.Process(target=self.f, args=(arr,))
891 p.start()
892 p.join()
893
894 self.assertEqual(list(arr[:]), seq)
895
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000896 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000897 def test_rawarray(self):
898 self.test_array(raw=True)
899
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000900 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000901 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000902 arr1 = self.Array('i', range(10))
903 lock1 = arr1.get_lock()
904 obj1 = arr1.get_obj()
905
906 arr2 = self.Array('i', range(10), lock=None)
907 lock2 = arr2.get_lock()
908 obj2 = arr2.get_obj()
909
910 lock = self.Lock()
911 arr3 = self.Array('i', range(10), lock=lock)
912 lock3 = arr3.get_lock()
913 obj3 = arr3.get_obj()
914 self.assertEqual(lock, lock3)
915
Jesse Noller6ab22152009-01-18 02:45:38 +0000916 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000917 self.assertFalse(hasattr(arr4, 'get_lock'))
918 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000919 self.assertRaises(AttributeError,
920 self.Array, 'i', range(10), lock='notalock')
921
922 arr5 = self.RawArray('i', range(10))
923 self.assertFalse(hasattr(arr5, 'get_lock'))
924 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000925
926#
927#
928#
929
930class _TestContainers(BaseTestCase):
931
932 ALLOWED_TYPES = ('manager',)
933
934 def test_list(self):
935 a = self.list(range(10))
936 self.assertEqual(a[:], range(10))
937
938 b = self.list()
939 self.assertEqual(b[:], [])
940
941 b.extend(range(5))
942 self.assertEqual(b[:], range(5))
943
944 self.assertEqual(b[2], 2)
945 self.assertEqual(b[2:10], [2,3,4])
946
947 b *= 2
948 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
949
950 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
951
952 self.assertEqual(a[:], range(10))
953
954 d = [a, b]
955 e = self.list(d)
956 self.assertEqual(
957 e[:],
958 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
959 )
960
961 f = self.list([a])
962 a.append('hello')
963 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
964
965 def test_dict(self):
966 d = self.dict()
967 indices = range(65, 70)
968 for i in indices:
969 d[i] = chr(i)
970 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
971 self.assertEqual(sorted(d.keys()), indices)
972 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
973 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
974
975 def test_namespace(self):
976 n = self.Namespace()
977 n.name = 'Bob'
978 n.job = 'Builder'
979 n._hidden = 'hidden'
980 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
981 del n.job
982 self.assertEqual(str(n), "Namespace(name='Bob')")
983 self.assertTrue(hasattr(n, 'name'))
984 self.assertTrue(not hasattr(n, 'job'))
985
986#
987#
988#
989
990def sqr(x, wait=0.0):
991 time.sleep(wait)
992 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000993class _TestPool(BaseTestCase):
994
995 def test_apply(self):
996 papply = self.pool.apply
997 self.assertEqual(papply(sqr, (5,)), sqr(5))
998 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
999
1000 def test_map(self):
1001 pmap = self.pool.map
1002 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1003 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1004 map(sqr, range(100)))
1005
Jesse Noller7530e472009-07-16 14:23:04 +00001006 def test_map_chunksize(self):
1007 try:
1008 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1009 except multiprocessing.TimeoutError:
1010 self.fail("pool.map_async with chunksize stalled on null list")
1011
Benjamin Petersondfd79492008-06-13 19:13:39 +00001012 def test_async(self):
1013 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1014 get = TimingWrapper(res.get)
1015 self.assertEqual(get(), 49)
1016 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1017
1018 def test_async_timeout(self):
1019 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1020 get = TimingWrapper(res.get)
1021 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1022 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1023
1024 def test_imap(self):
1025 it = self.pool.imap(sqr, range(10))
1026 self.assertEqual(list(it), map(sqr, range(10)))
1027
1028 it = self.pool.imap(sqr, range(10))
1029 for i in range(10):
1030 self.assertEqual(it.next(), i*i)
1031 self.assertRaises(StopIteration, it.next)
1032
1033 it = self.pool.imap(sqr, range(1000), chunksize=100)
1034 for i in range(1000):
1035 self.assertEqual(it.next(), i*i)
1036 self.assertRaises(StopIteration, it.next)
1037
1038 def test_imap_unordered(self):
1039 it = self.pool.imap_unordered(sqr, range(1000))
1040 self.assertEqual(sorted(it), map(sqr, range(1000)))
1041
1042 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1043 self.assertEqual(sorted(it), map(sqr, range(1000)))
1044
1045 def test_make_pool(self):
1046 p = multiprocessing.Pool(3)
1047 self.assertEqual(3, len(p._pool))
1048 p.close()
1049 p.join()
1050
1051 def test_terminate(self):
1052 if self.TYPE == 'manager':
1053 # On Unix a forked process increfs each shared object to
1054 # which its parent process held a reference. If the
1055 # forked process gets terminated then there is likely to
1056 # be a reference leak. So to prevent
1057 # _TestZZZNumberOfObjects from failing we skip this test
1058 # when using a manager.
1059 return
1060
1061 result = self.pool.map_async(
1062 time.sleep, [0.1 for i in range(10000)], chunksize=1
1063 )
1064 self.pool.terminate()
1065 join = TimingWrapper(self.pool.join)
1066 join()
1067 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001068
1069class _TestPoolWorkerLifetime(BaseTestCase):
1070
1071 ALLOWED_TYPES = ('processes', )
1072 def test_pool_worker_lifetime(self):
1073 p = multiprocessing.Pool(3, maxtasksperchild=10)
1074 self.assertEqual(3, len(p._pool))
1075 origworkerpids = [w.pid for w in p._pool]
1076 # Run many tasks so each worker gets replaced (hopefully)
1077 results = []
1078 for i in range(100):
1079 results.append(p.apply_async(sqr, (i, )))
1080 # Fetch the results and verify we got the right answers,
1081 # also ensuring all the tasks have completed.
1082 for (j, res) in enumerate(results):
1083 self.assertEqual(res.get(), sqr(j))
1084 # Refill the pool
1085 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001086 # Wait until all workers are alive
1087 countdown = 5
1088 while countdown and not all(w.is_alive() for w in p._pool):
1089 countdown -= 1
1090 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001091 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001092 # All pids should be assigned. See issue #7805.
1093 self.assertNotIn(None, origworkerpids)
1094 self.assertNotIn(None, finalworkerpids)
1095 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001096 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1097 p.close()
1098 p.join()
1099
Benjamin Petersondfd79492008-06-13 19:13:39 +00001100#
1101# Test that manager has expected number of shared objects left
1102#
1103
1104class _TestZZZNumberOfObjects(BaseTestCase):
1105 # Because test cases are sorted alphabetically, this one will get
1106 # run after all the other tests for the manager. It tests that
1107 # there have been no "reference leaks" for the manager's shared
1108 # objects. Note the comment in _TestPool.test_terminate().
1109 ALLOWED_TYPES = ('manager',)
1110
1111 def test_number_of_objects(self):
1112 EXPECTED_NUMBER = 1 # the pool object is still alive
1113 multiprocessing.active_children() # discard dead process objs
1114 gc.collect() # do garbage collection
1115 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001116 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001117 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001118 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001119 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001120
1121 self.assertEqual(refs, EXPECTED_NUMBER)
1122
1123#
1124# Test of creating a customized manager class
1125#
1126
1127from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1128
1129class FooBar(object):
1130 def f(self):
1131 return 'f()'
1132 def g(self):
1133 raise ValueError
1134 def _h(self):
1135 return '_h()'
1136
1137def baz():
1138 for i in xrange(10):
1139 yield i*i
1140
1141class IteratorProxy(BaseProxy):
1142 _exposed_ = ('next', '__next__')
1143 def __iter__(self):
1144 return self
1145 def next(self):
1146 return self._callmethod('next')
1147 def __next__(self):
1148 return self._callmethod('__next__')
1149
1150class MyManager(BaseManager):
1151 pass
1152
1153MyManager.register('Foo', callable=FooBar)
1154MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1155MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1156
1157
1158class _TestMyManager(BaseTestCase):
1159
1160 ALLOWED_TYPES = ('manager',)
1161
1162 def test_mymanager(self):
1163 manager = MyManager()
1164 manager.start()
1165
1166 foo = manager.Foo()
1167 bar = manager.Bar()
1168 baz = manager.baz()
1169
1170 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1171 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1172
1173 self.assertEqual(foo_methods, ['f', 'g'])
1174 self.assertEqual(bar_methods, ['f', '_h'])
1175
1176 self.assertEqual(foo.f(), 'f()')
1177 self.assertRaises(ValueError, foo.g)
1178 self.assertEqual(foo._callmethod('f'), 'f()')
1179 self.assertRaises(RemoteError, foo._callmethod, '_h')
1180
1181 self.assertEqual(bar.f(), 'f()')
1182 self.assertEqual(bar._h(), '_h()')
1183 self.assertEqual(bar._callmethod('f'), 'f()')
1184 self.assertEqual(bar._callmethod('_h'), '_h()')
1185
1186 self.assertEqual(list(baz), [i*i for i in range(10)])
1187
1188 manager.shutdown()
1189
1190#
1191# Test of connecting to a remote server and using xmlrpclib for serialization
1192#
1193
1194_queue = Queue.Queue()
1195def get_queue():
1196 return _queue
1197
1198class QueueManager(BaseManager):
1199 '''manager class used by server process'''
1200QueueManager.register('get_queue', callable=get_queue)
1201
1202class QueueManager2(BaseManager):
1203 '''manager class which specifies the same interface as QueueManager'''
1204QueueManager2.register('get_queue')
1205
1206
1207SERIALIZER = 'xmlrpclib'
1208
1209class _TestRemoteManager(BaseTestCase):
1210
1211 ALLOWED_TYPES = ('manager',)
1212
1213 def _putter(self, address, authkey):
1214 manager = QueueManager2(
1215 address=address, authkey=authkey, serializer=SERIALIZER
1216 )
1217 manager.connect()
1218 queue = manager.get_queue()
1219 queue.put(('hello world', None, True, 2.25))
1220
1221 def test_remote(self):
1222 authkey = os.urandom(32)
1223
1224 manager = QueueManager(
1225 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1226 )
1227 manager.start()
1228
1229 p = self.Process(target=self._putter, args=(manager.address, authkey))
1230 p.start()
1231
1232 manager2 = QueueManager2(
1233 address=manager.address, authkey=authkey, serializer=SERIALIZER
1234 )
1235 manager2.connect()
1236 queue = manager2.get_queue()
1237
1238 # Note that xmlrpclib will deserialize object as a list not a tuple
1239 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1240
1241 # Because we are using xmlrpclib for serialization instead of
1242 # pickle this will cause a serialization error.
1243 self.assertRaises(Exception, queue.put, time.sleep)
1244
1245 # Make queue finalizer run before the server is stopped
1246 del queue
1247 manager.shutdown()
1248
Jesse Noller459a6482009-03-30 15:50:42 +00001249class _TestManagerRestart(BaseTestCase):
1250
1251 def _putter(self, address, authkey):
1252 manager = QueueManager(
1253 address=address, authkey=authkey, serializer=SERIALIZER)
1254 manager.connect()
1255 queue = manager.get_queue()
1256 queue.put('hello world')
1257
1258 def test_rapid_restart(self):
1259 authkey = os.urandom(32)
1260 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001261 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001262 srvr = manager.get_server()
1263 addr = srvr.address
1264 # Close the connection.Listener socket which gets opened as a part
1265 # of manager.get_server(). It's not needed for the test.
1266 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001267 manager.start()
1268
1269 p = self.Process(target=self._putter, args=(manager.address, authkey))
1270 p.start()
1271 queue = manager.get_queue()
1272 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001273 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001274 manager.shutdown()
1275 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001276 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001277 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001278 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001279
Benjamin Petersondfd79492008-06-13 19:13:39 +00001280#
1281#
1282#
1283
1284SENTINEL = latin('')
1285
1286class _TestConnection(BaseTestCase):
1287
1288 ALLOWED_TYPES = ('processes', 'threads')
1289
1290 def _echo(self, conn):
1291 for msg in iter(conn.recv_bytes, SENTINEL):
1292 conn.send_bytes(msg)
1293 conn.close()
1294
1295 def test_connection(self):
1296 conn, child_conn = self.Pipe()
1297
1298 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001299 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001300 p.start()
1301
1302 seq = [1, 2.25, None]
1303 msg = latin('hello world')
1304 longmsg = msg * 10
1305 arr = array.array('i', range(4))
1306
1307 if self.TYPE == 'processes':
1308 self.assertEqual(type(conn.fileno()), int)
1309
1310 self.assertEqual(conn.send(seq), None)
1311 self.assertEqual(conn.recv(), seq)
1312
1313 self.assertEqual(conn.send_bytes(msg), None)
1314 self.assertEqual(conn.recv_bytes(), msg)
1315
1316 if self.TYPE == 'processes':
1317 buffer = array.array('i', [0]*10)
1318 expected = list(arr) + [0] * (10 - len(arr))
1319 self.assertEqual(conn.send_bytes(arr), None)
1320 self.assertEqual(conn.recv_bytes_into(buffer),
1321 len(arr) * buffer.itemsize)
1322 self.assertEqual(list(buffer), expected)
1323
1324 buffer = array.array('i', [0]*10)
1325 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1326 self.assertEqual(conn.send_bytes(arr), None)
1327 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1328 len(arr) * buffer.itemsize)
1329 self.assertEqual(list(buffer), expected)
1330
1331 buffer = bytearray(latin(' ' * 40))
1332 self.assertEqual(conn.send_bytes(longmsg), None)
1333 try:
1334 res = conn.recv_bytes_into(buffer)
1335 except multiprocessing.BufferTooShort, e:
1336 self.assertEqual(e.args, (longmsg,))
1337 else:
1338 self.fail('expected BufferTooShort, got %s' % res)
1339
1340 poll = TimingWrapper(conn.poll)
1341
1342 self.assertEqual(poll(), False)
1343 self.assertTimingAlmostEqual(poll.elapsed, 0)
1344
1345 self.assertEqual(poll(TIMEOUT1), False)
1346 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1347
1348 conn.send(None)
1349
1350 self.assertEqual(poll(TIMEOUT1), True)
1351 self.assertTimingAlmostEqual(poll.elapsed, 0)
1352
1353 self.assertEqual(conn.recv(), None)
1354
1355 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1356 conn.send_bytes(really_big_msg)
1357 self.assertEqual(conn.recv_bytes(), really_big_msg)
1358
1359 conn.send_bytes(SENTINEL) # tell child to quit
1360 child_conn.close()
1361
1362 if self.TYPE == 'processes':
1363 self.assertEqual(conn.readable, True)
1364 self.assertEqual(conn.writable, True)
1365 self.assertRaises(EOFError, conn.recv)
1366 self.assertRaises(EOFError, conn.recv_bytes)
1367
1368 p.join()
1369
1370 def test_duplex_false(self):
1371 reader, writer = self.Pipe(duplex=False)
1372 self.assertEqual(writer.send(1), None)
1373 self.assertEqual(reader.recv(), 1)
1374 if self.TYPE == 'processes':
1375 self.assertEqual(reader.readable, True)
1376 self.assertEqual(reader.writable, False)
1377 self.assertEqual(writer.readable, False)
1378 self.assertEqual(writer.writable, True)
1379 self.assertRaises(IOError, reader.send, 2)
1380 self.assertRaises(IOError, writer.recv)
1381 self.assertRaises(IOError, writer.poll)
1382
1383 def test_spawn_close(self):
1384 # We test that a pipe connection can be closed by parent
1385 # process immediately after child is spawned. On Windows this
1386 # would have sometimes failed on old versions because
1387 # child_conn would be closed before the child got a chance to
1388 # duplicate it.
1389 conn, child_conn = self.Pipe()
1390
1391 p = self.Process(target=self._echo, args=(child_conn,))
1392 p.start()
1393 child_conn.close() # this might complete before child initializes
1394
1395 msg = latin('hello')
1396 conn.send_bytes(msg)
1397 self.assertEqual(conn.recv_bytes(), msg)
1398
1399 conn.send_bytes(SENTINEL)
1400 conn.close()
1401 p.join()
1402
1403 def test_sendbytes(self):
1404 if self.TYPE != 'processes':
1405 return
1406
1407 msg = latin('abcdefghijklmnopqrstuvwxyz')
1408 a, b = self.Pipe()
1409
1410 a.send_bytes(msg)
1411 self.assertEqual(b.recv_bytes(), msg)
1412
1413 a.send_bytes(msg, 5)
1414 self.assertEqual(b.recv_bytes(), msg[5:])
1415
1416 a.send_bytes(msg, 7, 8)
1417 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1418
1419 a.send_bytes(msg, 26)
1420 self.assertEqual(b.recv_bytes(), latin(''))
1421
1422 a.send_bytes(msg, 26, 0)
1423 self.assertEqual(b.recv_bytes(), latin(''))
1424
1425 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1426
1427 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1428
1429 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1430
1431 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1432
1433 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1434
Benjamin Petersondfd79492008-06-13 19:13:39 +00001435class _TestListenerClient(BaseTestCase):
1436
1437 ALLOWED_TYPES = ('processes', 'threads')
1438
1439 def _test(self, address):
1440 conn = self.connection.Client(address)
1441 conn.send('hello')
1442 conn.close()
1443
1444 def test_listener_client(self):
1445 for family in self.connection.families:
1446 l = self.connection.Listener(family=family)
1447 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001448 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001449 p.start()
1450 conn = l.accept()
1451 self.assertEqual(conn.recv(), 'hello')
1452 p.join()
1453 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001454#
1455# Test of sending connection and socket objects between processes
1456#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001457"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001458class _TestPicklingConnections(BaseTestCase):
1459
1460 ALLOWED_TYPES = ('processes',)
1461
1462 def _listener(self, conn, families):
1463 for fam in families:
1464 l = self.connection.Listener(family=fam)
1465 conn.send(l.address)
1466 new_conn = l.accept()
1467 conn.send(new_conn)
1468
1469 if self.TYPE == 'processes':
1470 l = socket.socket()
1471 l.bind(('localhost', 0))
1472 conn.send(l.getsockname())
1473 l.listen(1)
1474 new_conn, addr = l.accept()
1475 conn.send(new_conn)
1476
1477 conn.recv()
1478
1479 def _remote(self, conn):
1480 for (address, msg) in iter(conn.recv, None):
1481 client = self.connection.Client(address)
1482 client.send(msg.upper())
1483 client.close()
1484
1485 if self.TYPE == 'processes':
1486 address, msg = conn.recv()
1487 client = socket.socket()
1488 client.connect(address)
1489 client.sendall(msg.upper())
1490 client.close()
1491
1492 conn.close()
1493
1494 def test_pickling(self):
1495 try:
1496 multiprocessing.allow_connection_pickling()
1497 except ImportError:
1498 return
1499
1500 families = self.connection.families
1501
1502 lconn, lconn0 = self.Pipe()
1503 lp = self.Process(target=self._listener, args=(lconn0, families))
1504 lp.start()
1505 lconn0.close()
1506
1507 rconn, rconn0 = self.Pipe()
1508 rp = self.Process(target=self._remote, args=(rconn0,))
1509 rp.start()
1510 rconn0.close()
1511
1512 for fam in families:
1513 msg = ('This connection uses family %s' % fam).encode('ascii')
1514 address = lconn.recv()
1515 rconn.send((address, msg))
1516 new_conn = lconn.recv()
1517 self.assertEqual(new_conn.recv(), msg.upper())
1518
1519 rconn.send(None)
1520
1521 if self.TYPE == 'processes':
1522 msg = latin('This connection uses a normal socket')
1523 address = lconn.recv()
1524 rconn.send((address, msg))
1525 if hasattr(socket, 'fromfd'):
1526 new_conn = lconn.recv()
1527 self.assertEqual(new_conn.recv(100), msg.upper())
1528 else:
1529 # XXX On Windows with Py2.6 need to backport fromfd()
1530 discard = lconn.recv_bytes()
1531
1532 lconn.send(None)
1533
1534 rconn.close()
1535 lconn.close()
1536
1537 lp.join()
1538 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001539"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001540#
1541#
1542#
1543
1544class _TestHeap(BaseTestCase):
1545
1546 ALLOWED_TYPES = ('processes',)
1547
1548 def test_heap(self):
1549 iterations = 5000
1550 maxblocks = 50
1551 blocks = []
1552
1553 # create and destroy lots of blocks of different sizes
1554 for i in xrange(iterations):
1555 size = int(random.lognormvariate(0, 1) * 1000)
1556 b = multiprocessing.heap.BufferWrapper(size)
1557 blocks.append(b)
1558 if len(blocks) > maxblocks:
1559 i = random.randrange(maxblocks)
1560 del blocks[i]
1561
1562 # get the heap object
1563 heap = multiprocessing.heap.BufferWrapper._heap
1564
1565 # verify the state of the heap
1566 all = []
1567 occupied = 0
1568 for L in heap._len_to_seq.values():
1569 for arena, start, stop in L:
1570 all.append((heap._arenas.index(arena), start, stop,
1571 stop-start, 'free'))
1572 for arena, start, stop in heap._allocated_blocks:
1573 all.append((heap._arenas.index(arena), start, stop,
1574 stop-start, 'occupied'))
1575 occupied += (stop-start)
1576
1577 all.sort()
1578
1579 for i in range(len(all)-1):
1580 (arena, start, stop) = all[i][:3]
1581 (narena, nstart, nstop) = all[i+1][:3]
1582 self.assertTrue((arena != narena and nstart == 0) or
1583 (stop == nstart))
1584
1585#
1586#
1587#
1588
Benjamin Petersondfd79492008-06-13 19:13:39 +00001589class _Foo(Structure):
1590 _fields_ = [
1591 ('x', c_int),
1592 ('y', c_double)
1593 ]
1594
Brian Curtina06e9b82010-10-07 02:27:41 +00001595@unittest.skipUnless(HAS_SHAREDCTYPES,
1596 "requires multiprocessing.sharedctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001597class _TestSharedCTypes(BaseTestCase):
1598
1599 ALLOWED_TYPES = ('processes',)
1600
1601 def _double(self, x, y, foo, arr, string):
1602 x.value *= 2
1603 y.value *= 2
1604 foo.x *= 2
1605 foo.y *= 2
1606 string.value *= 2
1607 for i in range(len(arr)):
1608 arr[i] *= 2
1609
1610 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001611 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001612 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001613 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001614 arr = self.Array('d', range(10), lock=lock)
1615 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001616 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001617
1618 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1619 p.start()
1620 p.join()
1621
1622 self.assertEqual(x.value, 14)
1623 self.assertAlmostEqual(y.value, 2.0/3.0)
1624 self.assertEqual(foo.x, 6)
1625 self.assertAlmostEqual(foo.y, 4.0)
1626 for i in range(10):
1627 self.assertAlmostEqual(arr[i], i*2)
1628 self.assertEqual(string.value, latin('hellohello'))
1629
1630 def test_synchronize(self):
1631 self.test_sharedctypes(lock=True)
1632
1633 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001634 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001635 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001636 foo.x = 0
1637 foo.y = 0
1638 self.assertEqual(bar.x, 2)
1639 self.assertAlmostEqual(bar.y, 5.0)
1640
1641#
1642#
1643#
1644
1645class _TestFinalize(BaseTestCase):
1646
1647 ALLOWED_TYPES = ('processes',)
1648
1649 def _test_finalize(self, conn):
1650 class Foo(object):
1651 pass
1652
1653 a = Foo()
1654 util.Finalize(a, conn.send, args=('a',))
1655 del a # triggers callback for a
1656
1657 b = Foo()
1658 close_b = util.Finalize(b, conn.send, args=('b',))
1659 close_b() # triggers callback for b
1660 close_b() # does nothing because callback has already been called
1661 del b # does nothing because callback has already been called
1662
1663 c = Foo()
1664 util.Finalize(c, conn.send, args=('c',))
1665
1666 d10 = Foo()
1667 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1668
1669 d01 = Foo()
1670 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1671 d02 = Foo()
1672 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1673 d03 = Foo()
1674 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1675
1676 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1677
1678 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1679
1680 # call mutliprocessing's cleanup function then exit process without
1681 # garbage collecting locals
1682 util._exit_function()
1683 conn.close()
1684 os._exit(0)
1685
1686 def test_finalize(self):
1687 conn, child_conn = self.Pipe()
1688
1689 p = self.Process(target=self._test_finalize, args=(child_conn,))
1690 p.start()
1691 p.join()
1692
1693 result = [obj for obj in iter(conn.recv, 'STOP')]
1694 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1695
1696#
1697# Test that from ... import * works for each module
1698#
1699
1700class _TestImportStar(BaseTestCase):
1701
1702 ALLOWED_TYPES = ('processes',)
1703
1704 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001705 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001706 'multiprocessing', 'multiprocessing.connection',
1707 'multiprocessing.heap', 'multiprocessing.managers',
1708 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001709 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001710 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001711 ]
1712
1713 if c_int is not None:
1714 # This module requires _ctypes
1715 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001716
1717 for name in modules:
1718 __import__(name)
1719 mod = sys.modules[name]
1720
1721 for attr in getattr(mod, '__all__', ()):
1722 self.assertTrue(
1723 hasattr(mod, attr),
1724 '%r does not have attribute %r' % (mod, attr)
1725 )
1726
1727#
1728# Quick test that logging works -- does not test logging output
1729#
1730
1731class _TestLogging(BaseTestCase):
1732
1733 ALLOWED_TYPES = ('processes',)
1734
1735 def test_enable_logging(self):
1736 logger = multiprocessing.get_logger()
1737 logger.setLevel(util.SUBWARNING)
1738 self.assertTrue(logger is not None)
1739 logger.debug('this will not be printed')
1740 logger.info('nor will this')
1741 logger.setLevel(LOG_LEVEL)
1742
1743 def _test_level(self, conn):
1744 logger = multiprocessing.get_logger()
1745 conn.send(logger.getEffectiveLevel())
1746
1747 def test_level(self):
1748 LEVEL1 = 32
1749 LEVEL2 = 37
1750
1751 logger = multiprocessing.get_logger()
1752 root_logger = logging.getLogger()
1753 root_level = root_logger.level
1754
1755 reader, writer = multiprocessing.Pipe(duplex=False)
1756
1757 logger.setLevel(LEVEL1)
1758 self.Process(target=self._test_level, args=(writer,)).start()
1759 self.assertEqual(LEVEL1, reader.recv())
1760
1761 logger.setLevel(logging.NOTSET)
1762 root_logger.setLevel(LEVEL2)
1763 self.Process(target=self._test_level, args=(writer,)).start()
1764 self.assertEqual(LEVEL2, reader.recv())
1765
1766 root_logger.setLevel(root_level)
1767 logger.setLevel(level=LOG_LEVEL)
1768
Jesse Noller814d02d2009-11-21 14:38:23 +00001769
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001770# class _TestLoggingProcessName(BaseTestCase):
1771#
1772# def handle(self, record):
1773# assert record.processName == multiprocessing.current_process().name
1774# self.__handled = True
1775#
1776# def test_logging(self):
1777# handler = logging.Handler()
1778# handler.handle = self.handle
1779# self.__handled = False
1780# # Bypass getLogger() and side-effects
1781# logger = logging.getLoggerClass()(
1782# 'multiprocessing.test.TestLoggingProcessName')
1783# logger.addHandler(handler)
1784# logger.propagate = False
1785#
1786# logger.warn('foo')
1787# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001788
Benjamin Petersondfd79492008-06-13 19:13:39 +00001789#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001790# Test to verify handle verification, see issue 3321
1791#
1792
1793class TestInvalidHandle(unittest.TestCase):
1794
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001795 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001796 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001797 conn = _multiprocessing.Connection(44977608)
1798 self.assertRaises(IOError, conn.poll)
1799 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001800
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001801#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001802# Functions used to create test cases from the base ones in this module
1803#
1804
1805def get_attributes(Source, names):
1806 d = {}
1807 for name in names:
1808 obj = getattr(Source, name)
1809 if type(obj) == type(get_attributes):
1810 obj = staticmethod(obj)
1811 d[name] = obj
1812 return d
1813
1814def create_test_cases(Mixin, type):
1815 result = {}
1816 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001817 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001818
1819 for name in glob.keys():
1820 if name.startswith('_Test'):
1821 base = glob[name]
1822 if type in base.ALLOWED_TYPES:
1823 newname = 'With' + Type + name[1:]
1824 class Temp(base, unittest.TestCase, Mixin):
1825 pass
1826 result[newname] = Temp
1827 Temp.__name__ = newname
1828 Temp.__module__ = Mixin.__module__
1829 return result
1830
1831#
1832# Create test cases
1833#
1834
1835class ProcessesMixin(object):
1836 TYPE = 'processes'
1837 Process = multiprocessing.Process
1838 locals().update(get_attributes(multiprocessing, (
1839 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1840 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1841 'RawArray', 'current_process', 'active_children', 'Pipe',
1842 'connection', 'JoinableQueue'
1843 )))
1844
1845testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1846globals().update(testcases_processes)
1847
1848
1849class ManagerMixin(object):
1850 TYPE = 'manager'
1851 Process = multiprocessing.Process
1852 manager = object.__new__(multiprocessing.managers.SyncManager)
1853 locals().update(get_attributes(manager, (
1854 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1855 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1856 'Namespace', 'JoinableQueue'
1857 )))
1858
1859testcases_manager = create_test_cases(ManagerMixin, type='manager')
1860globals().update(testcases_manager)
1861
1862
1863class ThreadsMixin(object):
1864 TYPE = 'threads'
1865 Process = multiprocessing.dummy.Process
1866 locals().update(get_attributes(multiprocessing.dummy, (
1867 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1868 'Condition', 'Event', 'Value', 'Array', 'current_process',
1869 'active_children', 'Pipe', 'connection', 'dict', 'list',
1870 'Namespace', 'JoinableQueue'
1871 )))
1872
1873testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1874globals().update(testcases_threads)
1875
Neal Norwitz0c519b32008-08-25 01:50:24 +00001876class OtherTest(unittest.TestCase):
1877 # TODO: add more tests for deliver/answer challenge.
1878 def test_deliver_challenge_auth_failure(self):
1879 class _FakeConnection(object):
1880 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001881 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001882 def send_bytes(self, data):
1883 pass
1884 self.assertRaises(multiprocessing.AuthenticationError,
1885 multiprocessing.connection.deliver_challenge,
1886 _FakeConnection(), b'abc')
1887
1888 def test_answer_challenge_auth_failure(self):
1889 class _FakeConnection(object):
1890 def __init__(self):
1891 self.count = 0
1892 def recv_bytes(self, size):
1893 self.count += 1
1894 if self.count == 1:
1895 return multiprocessing.connection.CHALLENGE
1896 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001897 return b'something bogus'
1898 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001899 def send_bytes(self, data):
1900 pass
1901 self.assertRaises(multiprocessing.AuthenticationError,
1902 multiprocessing.connection.answer_challenge,
1903 _FakeConnection(), b'abc')
1904
Jesse Noller7152f6d2009-04-02 05:17:26 +00001905#
1906# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1907#
1908
1909def initializer(ns):
1910 ns.test += 1
1911
1912class TestInitializers(unittest.TestCase):
1913 def setUp(self):
1914 self.mgr = multiprocessing.Manager()
1915 self.ns = self.mgr.Namespace()
1916 self.ns.test = 0
1917
1918 def tearDown(self):
1919 self.mgr.shutdown()
1920
1921 def test_manager_initializer(self):
1922 m = multiprocessing.managers.SyncManager()
1923 self.assertRaises(TypeError, m.start, 1)
1924 m.start(initializer, (self.ns,))
1925 self.assertEqual(self.ns.test, 1)
1926 m.shutdown()
1927
1928 def test_pool_initializer(self):
1929 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1930 p = multiprocessing.Pool(1, initializer, (self.ns,))
1931 p.close()
1932 p.join()
1933 self.assertEqual(self.ns.test, 1)
1934
Jesse Noller1b90efb2009-06-30 17:11:52 +00001935#
1936# Issue 5155, 5313, 5331: Test process in processes
1937# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1938#
1939
1940def _ThisSubProcess(q):
1941 try:
1942 item = q.get(block=False)
1943 except Queue.Empty:
1944 pass
1945
1946def _TestProcess(q):
1947 queue = multiprocessing.Queue()
1948 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1949 subProc.start()
1950 subProc.join()
1951
1952def _afunc(x):
1953 return x*x
1954
1955def pool_in_process():
1956 pool = multiprocessing.Pool(processes=4)
1957 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1958
1959class _file_like(object):
1960 def __init__(self, delegate):
1961 self._delegate = delegate
1962 self._pid = None
1963
1964 @property
1965 def cache(self):
1966 pid = os.getpid()
1967 # There are no race conditions since fork keeps only the running thread
1968 if pid != self._pid:
1969 self._pid = pid
1970 self._cache = []
1971 return self._cache
1972
1973 def write(self, data):
1974 self.cache.append(data)
1975
1976 def flush(self):
1977 self._delegate.write(''.join(self.cache))
1978 self._cache = []
1979
1980class TestStdinBadfiledescriptor(unittest.TestCase):
1981
1982 def test_queue_in_process(self):
1983 queue = multiprocessing.Queue()
1984 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1985 proc.start()
1986 proc.join()
1987
1988 def test_pool_in_process(self):
1989 p = multiprocessing.Process(target=pool_in_process)
1990 p.start()
1991 p.join()
1992
1993 def test_flushing(self):
1994 sio = StringIO()
1995 flike = _file_like(sio)
1996 flike.write('foo')
1997 proc = multiprocessing.Process(target=lambda: flike.flush())
1998 flike.flush()
1999 assert sio.getvalue() == 'foo'
2000
2001testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2002 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002003
Benjamin Petersondfd79492008-06-13 19:13:39 +00002004#
2005#
2006#
2007
2008def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002009 if sys.platform.startswith("linux"):
2010 try:
2011 lock = multiprocessing.RLock()
2012 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002013 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002014
Benjamin Petersondfd79492008-06-13 19:13:39 +00002015 if run is None:
2016 from test.test_support import run_unittest as run
2017
2018 util.get_temp_dir() # creates temp directory for use by all processes
2019
2020 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2021
Jesse Noller146b7ab2008-07-02 16:44:09 +00002022 ProcessesMixin.pool = multiprocessing.Pool(4)
2023 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2024 ManagerMixin.manager.__init__()
2025 ManagerMixin.manager.start()
2026 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002027
2028 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002029 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2030 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002031 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2032 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002033 )
2034
2035 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2036 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002037 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2038 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002039 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002040 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002041 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002042 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2043 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2044 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002045 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002046
Jesse Noller146b7ab2008-07-02 16:44:09 +00002047 ThreadsMixin.pool.terminate()
2048 ProcessesMixin.pool.terminate()
2049 ManagerMixin.pool.terminate()
2050 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002051
Jesse Noller146b7ab2008-07-02 16:44:09 +00002052 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002053
2054def main():
2055 test_main(unittest.TextTestRunner(verbosity=2).run)
2056
2057if __name__ == '__main__':
2058 main()