blob: 8f32cfba8a7dd080b967e489502a563b56c6c8f3 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000016import socket
17import random
18import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
Benjamin Petersondfd79492008-06-13 19:13:39 +000021
Jesse Noller37040cd2008-09-30 00:15:45 +000022
R. David Murray3db8a342009-03-30 23:05:48 +000023_multiprocessing = test_support.import_module('_multiprocessing')
24
Jesse Noller37040cd2008-09-30 00:15:45 +000025# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000026test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000027
Benjamin Petersondfd79492008-06-13 19:13:39 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000032import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000033
34from multiprocessing import util
35
36#
37#
38#
39
Benjamin Petersone79edf52008-07-13 18:34:58 +000040latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000041
Benjamin Petersondfd79492008-06-13 19:13:39 +000042#
43# Constants
44#
45
46LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000047#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000048
49DELTA = 0.1
50CHECK_TIMINGS = False # making true makes tests take a lot longer
51 # and can sometimes cause some non-serious
52 # failures because some calls block a bit
53 # longer than expected
54if CHECK_TIMINGS:
55 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
56else:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
58
59HAVE_GETVALUE = not getattr(_multiprocessing,
60 'HAVE_BROKEN_SEM_GETVALUE', False)
61
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000062WIN32 = (sys.platform == "win32")
63
Benjamin Petersondfd79492008-06-13 19:13:39 +000064#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000065# Some tests require ctypes
66#
67
68try:
Nick Coghlan13623662010-04-10 14:24:36 +000069 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000070except ImportError:
71 Structure = object
72 c_int = c_double = None
73
Nick Coghlan13623662010-04-10 14:24:36 +000074try:
75 from ctypes import Value
76except ImportError:
77 Value = None
78
79try:
80 from ctypes import copy as ctypes_copy
81except ImportError:
82 ctypes_copy = None
83
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000084#
Benjamin Petersondfd79492008-06-13 19:13:39 +000085# Creates a wrapper for a function which records the time it takes to finish
86#
87
88class TimingWrapper(object):
89
90 def __init__(self, func):
91 self.func = func
92 self.elapsed = None
93
94 def __call__(self, *args, **kwds):
95 t = time.time()
96 try:
97 return self.func(*args, **kwds)
98 finally:
99 self.elapsed = time.time() - t
100
101#
102# Base class for test cases
103#
104
105class BaseTestCase(object):
106
107 ALLOWED_TYPES = ('processes', 'manager', 'threads')
108
109 def assertTimingAlmostEqual(self, a, b):
110 if CHECK_TIMINGS:
111 self.assertAlmostEqual(a, b, 1)
112
113 def assertReturnsIfImplemented(self, value, func, *args):
114 try:
115 res = func(*args)
116 except NotImplementedError:
117 pass
118 else:
119 return self.assertEqual(value, res)
120
121#
122# Return the value of a semaphore
123#
124
125def get_value(self):
126 try:
127 return self.get_value()
128 except AttributeError:
129 try:
130 return self._Semaphore__value
131 except AttributeError:
132 try:
133 return self._value
134 except AttributeError:
135 raise NotImplementedError
136
137#
138# Testcases
139#
140
141class _TestProcess(BaseTestCase):
142
143 ALLOWED_TYPES = ('processes', 'threads')
144
145 def test_current(self):
146 if self.TYPE == 'threads':
147 return
148
149 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000150 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000151
152 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000153 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000154 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000155 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000156 self.assertEqual(current.ident, os.getpid())
157 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158
159 def _test(self, q, *args, **kwds):
160 current = self.current_process()
161 q.put(args)
162 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000163 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000164 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000165 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 q.put(current.pid)
167
168 def test_process(self):
169 q = self.Queue(1)
170 e = self.Event()
171 args = (q, 1, 2)
172 kwargs = {'hello':23, 'bye':2.54}
173 name = 'SomeProcess'
174 p = self.Process(
175 target=self._test, args=args, kwargs=kwargs, name=name
176 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000177 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000178 current = self.current_process()
179
180 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000181 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000182 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000183 self.assertEquals(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000184 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000185 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000186 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000187
188 p.start()
189
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000191 self.assertEquals(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000192 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000193
194 self.assertEquals(q.get(), args[1:])
195 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000196 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000197 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000198 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 self.assertEquals(q.get(), p.pid)
200
201 p.join()
202
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 self.assertEquals(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000205 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000206
207 def _test_terminate(self):
208 time.sleep(1000)
209
210 def test_terminate(self):
211 if self.TYPE == 'threads':
212 return
213
214 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 p.start()
217
218 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000219 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000220 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000221
222 p.terminate()
223
224 join = TimingWrapper(p.join)
225 self.assertEqual(join(), None)
226 self.assertTimingAlmostEqual(join.elapsed, 0.0)
227
228 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000229 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000230
231 p.join()
232
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000233 # XXX sometimes get p.exitcode == 0 on Windows ...
234 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235
236 def test_cpu_count(self):
237 try:
238 cpus = multiprocessing.cpu_count()
239 except NotImplementedError:
240 cpus = 1
241 self.assertTrue(type(cpus) is int)
242 self.assertTrue(cpus >= 1)
243
244 def test_active_children(self):
245 self.assertEqual(type(self.active_children()), list)
246
247 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000249
250 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000251 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000254 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255
256 def _test_recursion(self, wconn, id):
257 from multiprocessing import forking
258 wconn.send(id)
259 if len(id) < 2:
260 for i in range(2):
261 p = self.Process(
262 target=self._test_recursion, args=(wconn, id+[i])
263 )
264 p.start()
265 p.join()
266
267 def test_recursion(self):
268 rconn, wconn = self.Pipe(duplex=False)
269 self._test_recursion(wconn, [])
270
271 time.sleep(DELTA)
272 result = []
273 while rconn.poll():
274 result.append(rconn.recv())
275
276 expected = [
277 [],
278 [0],
279 [0, 0],
280 [0, 1],
281 [1],
282 [1, 0],
283 [1, 1]
284 ]
285 self.assertEqual(result, expected)
286
287#
288#
289#
290
291class _UpperCaser(multiprocessing.Process):
292
293 def __init__(self):
294 multiprocessing.Process.__init__(self)
295 self.child_conn, self.parent_conn = multiprocessing.Pipe()
296
297 def run(self):
298 self.parent_conn.close()
299 for s in iter(self.child_conn.recv, None):
300 self.child_conn.send(s.upper())
301 self.child_conn.close()
302
303 def submit(self, s):
304 assert type(s) is str
305 self.parent_conn.send(s)
306 return self.parent_conn.recv()
307
308 def stop(self):
309 self.parent_conn.send(None)
310 self.parent_conn.close()
311 self.child_conn.close()
312
313class _TestSubclassingProcess(BaseTestCase):
314
315 ALLOWED_TYPES = ('processes',)
316
317 def test_subclassing(self):
318 uppercaser = _UpperCaser()
319 uppercaser.start()
320 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
321 self.assertEqual(uppercaser.submit('world'), 'WORLD')
322 uppercaser.stop()
323 uppercaser.join()
324
325#
326#
327#
328
329def queue_empty(q):
330 if hasattr(q, 'empty'):
331 return q.empty()
332 else:
333 return q.qsize() == 0
334
335def queue_full(q, maxsize):
336 if hasattr(q, 'full'):
337 return q.full()
338 else:
339 return q.qsize() == maxsize
340
341
342class _TestQueue(BaseTestCase):
343
344
345 def _test_put(self, queue, child_can_start, parent_can_continue):
346 child_can_start.wait()
347 for i in range(6):
348 queue.get()
349 parent_can_continue.set()
350
351 def test_put(self):
352 MAXSIZE = 6
353 queue = self.Queue(maxsize=MAXSIZE)
354 child_can_start = self.Event()
355 parent_can_continue = self.Event()
356
357 proc = self.Process(
358 target=self._test_put,
359 args=(queue, child_can_start, parent_can_continue)
360 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000361 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000362 proc.start()
363
364 self.assertEqual(queue_empty(queue), True)
365 self.assertEqual(queue_full(queue, MAXSIZE), False)
366
367 queue.put(1)
368 queue.put(2, True)
369 queue.put(3, True, None)
370 queue.put(4, False)
371 queue.put(5, False, None)
372 queue.put_nowait(6)
373
374 # the values may be in buffer but not yet in pipe so sleep a bit
375 time.sleep(DELTA)
376
377 self.assertEqual(queue_empty(queue), False)
378 self.assertEqual(queue_full(queue, MAXSIZE), True)
379
380 put = TimingWrapper(queue.put)
381 put_nowait = TimingWrapper(queue.put_nowait)
382
383 self.assertRaises(Queue.Full, put, 7, False)
384 self.assertTimingAlmostEqual(put.elapsed, 0)
385
386 self.assertRaises(Queue.Full, put, 7, False, None)
387 self.assertTimingAlmostEqual(put.elapsed, 0)
388
389 self.assertRaises(Queue.Full, put_nowait, 7)
390 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
391
392 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
393 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
394
395 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
396 self.assertTimingAlmostEqual(put.elapsed, 0)
397
398 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
399 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
400
401 child_can_start.set()
402 parent_can_continue.wait()
403
404 self.assertEqual(queue_empty(queue), True)
405 self.assertEqual(queue_full(queue, MAXSIZE), False)
406
407 proc.join()
408
409 def _test_get(self, queue, child_can_start, parent_can_continue):
410 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000411 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000412 queue.put(2)
413 queue.put(3)
414 queue.put(4)
415 queue.put(5)
416 parent_can_continue.set()
417
418 def test_get(self):
419 queue = self.Queue()
420 child_can_start = self.Event()
421 parent_can_continue = self.Event()
422
423 proc = self.Process(
424 target=self._test_get,
425 args=(queue, child_can_start, parent_can_continue)
426 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000427 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000428 proc.start()
429
430 self.assertEqual(queue_empty(queue), True)
431
432 child_can_start.set()
433 parent_can_continue.wait()
434
435 time.sleep(DELTA)
436 self.assertEqual(queue_empty(queue), False)
437
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000438 # Hangs unexpectedly, remove for now
439 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000440 self.assertEqual(queue.get(True, None), 2)
441 self.assertEqual(queue.get(True), 3)
442 self.assertEqual(queue.get(timeout=1), 4)
443 self.assertEqual(queue.get_nowait(), 5)
444
445 self.assertEqual(queue_empty(queue), True)
446
447 get = TimingWrapper(queue.get)
448 get_nowait = TimingWrapper(queue.get_nowait)
449
450 self.assertRaises(Queue.Empty, get, False)
451 self.assertTimingAlmostEqual(get.elapsed, 0)
452
453 self.assertRaises(Queue.Empty, get, False, None)
454 self.assertTimingAlmostEqual(get.elapsed, 0)
455
456 self.assertRaises(Queue.Empty, get_nowait)
457 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
458
459 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
460 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
461
462 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
466 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
467
468 proc.join()
469
470 def _test_fork(self, queue):
471 for i in range(10, 20):
472 queue.put(i)
473 # note that at this point the items may only be buffered, so the
474 # process cannot shutdown until the feeder thread has finished
475 # pushing items onto the pipe.
476
477 def test_fork(self):
478 # Old versions of Queue would fail to create a new feeder
479 # thread for a forked process if the original process had its
480 # own feeder thread. This test checks that this no longer
481 # happens.
482
483 queue = self.Queue()
484
485 # put items on queue so that main process starts a feeder thread
486 for i in range(10):
487 queue.put(i)
488
489 # wait to make sure thread starts before we fork a new process
490 time.sleep(DELTA)
491
492 # fork process
493 p = self.Process(target=self._test_fork, args=(queue,))
494 p.start()
495
496 # check that all expected items are in the queue
497 for i in range(20):
498 self.assertEqual(queue.get(), i)
499 self.assertRaises(Queue.Empty, queue.get, False)
500
501 p.join()
502
503 def test_qsize(self):
504 q = self.Queue()
505 try:
506 self.assertEqual(q.qsize(), 0)
507 except NotImplementedError:
508 return
509 q.put(1)
510 self.assertEqual(q.qsize(), 1)
511 q.put(5)
512 self.assertEqual(q.qsize(), 2)
513 q.get()
514 self.assertEqual(q.qsize(), 1)
515 q.get()
516 self.assertEqual(q.qsize(), 0)
517
518 def _test_task_done(self, q):
519 for obj in iter(q.get, None):
520 time.sleep(DELTA)
521 q.task_done()
522
523 def test_task_done(self):
524 queue = self.JoinableQueue()
525
526 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000527 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000528
529 workers = [self.Process(target=self._test_task_done, args=(queue,))
530 for i in xrange(4)]
531
532 for p in workers:
533 p.start()
534
535 for i in xrange(10):
536 queue.put(i)
537
538 queue.join()
539
540 for p in workers:
541 queue.put(None)
542
543 for p in workers:
544 p.join()
545
546#
547#
548#
549
550class _TestLock(BaseTestCase):
551
552 def test_lock(self):
553 lock = self.Lock()
554 self.assertEqual(lock.acquire(), True)
555 self.assertEqual(lock.acquire(False), False)
556 self.assertEqual(lock.release(), None)
557 self.assertRaises((ValueError, threading.ThreadError), lock.release)
558
559 def test_rlock(self):
560 lock = self.RLock()
561 self.assertEqual(lock.acquire(), True)
562 self.assertEqual(lock.acquire(), True)
563 self.assertEqual(lock.acquire(), True)
564 self.assertEqual(lock.release(), None)
565 self.assertEqual(lock.release(), None)
566 self.assertEqual(lock.release(), None)
567 self.assertRaises((AssertionError, RuntimeError), lock.release)
568
Jesse Noller82eb5902009-03-30 23:29:31 +0000569 def test_lock_context(self):
570 with self.Lock():
571 pass
572
Benjamin Petersondfd79492008-06-13 19:13:39 +0000573
574class _TestSemaphore(BaseTestCase):
575
576 def _test_semaphore(self, sem):
577 self.assertReturnsIfImplemented(2, get_value, sem)
578 self.assertEqual(sem.acquire(), True)
579 self.assertReturnsIfImplemented(1, get_value, sem)
580 self.assertEqual(sem.acquire(), True)
581 self.assertReturnsIfImplemented(0, get_value, sem)
582 self.assertEqual(sem.acquire(False), False)
583 self.assertReturnsIfImplemented(0, get_value, sem)
584 self.assertEqual(sem.release(), None)
585 self.assertReturnsIfImplemented(1, get_value, sem)
586 self.assertEqual(sem.release(), None)
587 self.assertReturnsIfImplemented(2, get_value, sem)
588
589 def test_semaphore(self):
590 sem = self.Semaphore(2)
591 self._test_semaphore(sem)
592 self.assertEqual(sem.release(), None)
593 self.assertReturnsIfImplemented(3, get_value, sem)
594 self.assertEqual(sem.release(), None)
595 self.assertReturnsIfImplemented(4, get_value, sem)
596
597 def test_bounded_semaphore(self):
598 sem = self.BoundedSemaphore(2)
599 self._test_semaphore(sem)
600 # Currently fails on OS/X
601 #if HAVE_GETVALUE:
602 # self.assertRaises(ValueError, sem.release)
603 # self.assertReturnsIfImplemented(2, get_value, sem)
604
605 def test_timeout(self):
606 if self.TYPE != 'processes':
607 return
608
609 sem = self.Semaphore(0)
610 acquire = TimingWrapper(sem.acquire)
611
612 self.assertEqual(acquire(False), False)
613 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
614
615 self.assertEqual(acquire(False, None), False)
616 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
617
618 self.assertEqual(acquire(False, TIMEOUT1), False)
619 self.assertTimingAlmostEqual(acquire.elapsed, 0)
620
621 self.assertEqual(acquire(True, TIMEOUT2), False)
622 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
623
624 self.assertEqual(acquire(timeout=TIMEOUT3), False)
625 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
626
627
628class _TestCondition(BaseTestCase):
629
630 def f(self, cond, sleeping, woken, timeout=None):
631 cond.acquire()
632 sleeping.release()
633 cond.wait(timeout)
634 woken.release()
635 cond.release()
636
637 def check_invariant(self, cond):
638 # this is only supposed to succeed when there are no sleepers
639 if self.TYPE == 'processes':
640 try:
641 sleepers = (cond._sleeping_count.get_value() -
642 cond._woken_count.get_value())
643 self.assertEqual(sleepers, 0)
644 self.assertEqual(cond._wait_semaphore.get_value(), 0)
645 except NotImplementedError:
646 pass
647
648 def test_notify(self):
649 cond = self.Condition()
650 sleeping = self.Semaphore(0)
651 woken = self.Semaphore(0)
652
653 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000654 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000655 p.start()
656
657 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000658 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000659 p.start()
660
661 # wait for both children to start sleeping
662 sleeping.acquire()
663 sleeping.acquire()
664
665 # check no process/thread has woken up
666 time.sleep(DELTA)
667 self.assertReturnsIfImplemented(0, get_value, woken)
668
669 # wake up one process/thread
670 cond.acquire()
671 cond.notify()
672 cond.release()
673
674 # check one process/thread has woken up
675 time.sleep(DELTA)
676 self.assertReturnsIfImplemented(1, get_value, woken)
677
678 # wake up another
679 cond.acquire()
680 cond.notify()
681 cond.release()
682
683 # check other has woken up
684 time.sleep(DELTA)
685 self.assertReturnsIfImplemented(2, get_value, woken)
686
687 # check state is not mucked up
688 self.check_invariant(cond)
689 p.join()
690
691 def test_notify_all(self):
692 cond = self.Condition()
693 sleeping = self.Semaphore(0)
694 woken = self.Semaphore(0)
695
696 # start some threads/processes which will timeout
697 for i in range(3):
698 p = self.Process(target=self.f,
699 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000700 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000701 p.start()
702
703 t = threading.Thread(target=self.f,
704 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000705 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000706 t.start()
707
708 # wait for them all to sleep
709 for i in xrange(6):
710 sleeping.acquire()
711
712 # check they have all timed out
713 for i in xrange(6):
714 woken.acquire()
715 self.assertReturnsIfImplemented(0, get_value, woken)
716
717 # check state is not mucked up
718 self.check_invariant(cond)
719
720 # start some more threads/processes
721 for i in range(3):
722 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000723 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000724 p.start()
725
726 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000727 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000728 t.start()
729
730 # wait for them to all sleep
731 for i in xrange(6):
732 sleeping.acquire()
733
734 # check no process/thread has woken up
735 time.sleep(DELTA)
736 self.assertReturnsIfImplemented(0, get_value, woken)
737
738 # wake them all up
739 cond.acquire()
740 cond.notify_all()
741 cond.release()
742
743 # check they have all woken
744 time.sleep(DELTA)
745 self.assertReturnsIfImplemented(6, get_value, woken)
746
747 # check state is not mucked up
748 self.check_invariant(cond)
749
750 def test_timeout(self):
751 cond = self.Condition()
752 wait = TimingWrapper(cond.wait)
753 cond.acquire()
754 res = wait(TIMEOUT1)
755 cond.release()
756 self.assertEqual(res, None)
757 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
758
759
760class _TestEvent(BaseTestCase):
761
762 def _test_event(self, event):
763 time.sleep(TIMEOUT2)
764 event.set()
765
766 def test_event(self):
767 event = self.Event()
768 wait = TimingWrapper(event.wait)
769
770 # Removed temporaily, due to API shear, this does not
771 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000772 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000773
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000774 # Removed, threading.Event.wait() will return the value of the __flag
775 # instead of None. API Shear with the semaphore backed mp.Event
776 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000777 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000778 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000779 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
780
781 event.set()
782
783 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000784 self.assertEqual(event.is_set(), True)
785 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000787 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000788 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
789 # self.assertEqual(event.is_set(), True)
790
791 event.clear()
792
793 #self.assertEqual(event.is_set(), False)
794
795 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000796 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000797
798#
799#
800#
801
802class _TestValue(BaseTestCase):
803
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000804 ALLOWED_TYPES = ('processes',)
805
Benjamin Petersondfd79492008-06-13 19:13:39 +0000806 codes_values = [
807 ('i', 4343, 24234),
808 ('d', 3.625, -4.25),
809 ('h', -232, 234),
810 ('c', latin('x'), latin('y'))
811 ]
812
813 def _test(self, values):
814 for sv, cv in zip(values, self.codes_values):
815 sv.value = cv[2]
816
817
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000818 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000819 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000820 if raw:
821 values = [self.RawValue(code, value)
822 for code, value, _ in self.codes_values]
823 else:
824 values = [self.Value(code, value)
825 for code, value, _ in self.codes_values]
826
827 for sv, cv in zip(values, self.codes_values):
828 self.assertEqual(sv.value, cv[1])
829
830 proc = self.Process(target=self._test, args=(values,))
831 proc.start()
832 proc.join()
833
834 for sv, cv in zip(values, self.codes_values):
835 self.assertEqual(sv.value, cv[2])
836
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000837 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000838 def test_rawvalue(self):
839 self.test_value(raw=True)
840
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000841 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000842 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000843 val1 = self.Value('i', 5)
844 lock1 = val1.get_lock()
845 obj1 = val1.get_obj()
846
847 val2 = self.Value('i', 5, lock=None)
848 lock2 = val2.get_lock()
849 obj2 = val2.get_obj()
850
851 lock = self.Lock()
852 val3 = self.Value('i', 5, lock=lock)
853 lock3 = val3.get_lock()
854 obj3 = val3.get_obj()
855 self.assertEqual(lock, lock3)
856
Jesse Noller6ab22152009-01-18 02:45:38 +0000857 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 self.assertFalse(hasattr(arr4, 'get_lock'))
859 self.assertFalse(hasattr(arr4, 'get_obj'))
860
Jesse Noller6ab22152009-01-18 02:45:38 +0000861 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
862
863 arr5 = self.RawValue('i', 5)
864 self.assertFalse(hasattr(arr5, 'get_lock'))
865 self.assertFalse(hasattr(arr5, 'get_obj'))
866
Benjamin Petersondfd79492008-06-13 19:13:39 +0000867
868class _TestArray(BaseTestCase):
869
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000870 ALLOWED_TYPES = ('processes',)
871
Benjamin Petersondfd79492008-06-13 19:13:39 +0000872 def f(self, seq):
873 for i in range(1, len(seq)):
874 seq[i] += seq[i-1]
875
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000876 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000877 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000878 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
879 if raw:
880 arr = self.RawArray('i', seq)
881 else:
882 arr = self.Array('i', seq)
883
884 self.assertEqual(len(arr), len(seq))
885 self.assertEqual(arr[3], seq[3])
886 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
887
888 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
889
890 self.assertEqual(list(arr[:]), seq)
891
892 self.f(seq)
893
894 p = self.Process(target=self.f, args=(arr,))
895 p.start()
896 p.join()
897
898 self.assertEqual(list(arr[:]), seq)
899
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000900 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000901 def test_rawarray(self):
902 self.test_array(raw=True)
903
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000904 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000905 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000906 arr1 = self.Array('i', range(10))
907 lock1 = arr1.get_lock()
908 obj1 = arr1.get_obj()
909
910 arr2 = self.Array('i', range(10), lock=None)
911 lock2 = arr2.get_lock()
912 obj2 = arr2.get_obj()
913
914 lock = self.Lock()
915 arr3 = self.Array('i', range(10), lock=lock)
916 lock3 = arr3.get_lock()
917 obj3 = arr3.get_obj()
918 self.assertEqual(lock, lock3)
919
Jesse Noller6ab22152009-01-18 02:45:38 +0000920 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000921 self.assertFalse(hasattr(arr4, 'get_lock'))
922 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000923 self.assertRaises(AttributeError,
924 self.Array, 'i', range(10), lock='notalock')
925
926 arr5 = self.RawArray('i', range(10))
927 self.assertFalse(hasattr(arr5, 'get_lock'))
928 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929
930#
931#
932#
933
934class _TestContainers(BaseTestCase):
935
936 ALLOWED_TYPES = ('manager',)
937
938 def test_list(self):
939 a = self.list(range(10))
940 self.assertEqual(a[:], range(10))
941
942 b = self.list()
943 self.assertEqual(b[:], [])
944
945 b.extend(range(5))
946 self.assertEqual(b[:], range(5))
947
948 self.assertEqual(b[2], 2)
949 self.assertEqual(b[2:10], [2,3,4])
950
951 b *= 2
952 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
953
954 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
955
956 self.assertEqual(a[:], range(10))
957
958 d = [a, b]
959 e = self.list(d)
960 self.assertEqual(
961 e[:],
962 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
963 )
964
965 f = self.list([a])
966 a.append('hello')
967 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
968
969 def test_dict(self):
970 d = self.dict()
971 indices = range(65, 70)
972 for i in indices:
973 d[i] = chr(i)
974 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
975 self.assertEqual(sorted(d.keys()), indices)
976 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
977 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
978
979 def test_namespace(self):
980 n = self.Namespace()
981 n.name = 'Bob'
982 n.job = 'Builder'
983 n._hidden = 'hidden'
984 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
985 del n.job
986 self.assertEqual(str(n), "Namespace(name='Bob')")
987 self.assertTrue(hasattr(n, 'name'))
988 self.assertTrue(not hasattr(n, 'job'))
989
990#
991#
992#
993
994def sqr(x, wait=0.0):
995 time.sleep(wait)
996 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000997class _TestPool(BaseTestCase):
998
999 def test_apply(self):
1000 papply = self.pool.apply
1001 self.assertEqual(papply(sqr, (5,)), sqr(5))
1002 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1003
1004 def test_map(self):
1005 pmap = self.pool.map
1006 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1007 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1008 map(sqr, range(100)))
1009
Jesse Noller7530e472009-07-16 14:23:04 +00001010 def test_map_chunksize(self):
1011 try:
1012 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1013 except multiprocessing.TimeoutError:
1014 self.fail("pool.map_async with chunksize stalled on null list")
1015
Benjamin Petersondfd79492008-06-13 19:13:39 +00001016 def test_async(self):
1017 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1018 get = TimingWrapper(res.get)
1019 self.assertEqual(get(), 49)
1020 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1021
1022 def test_async_timeout(self):
1023 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1024 get = TimingWrapper(res.get)
1025 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1026 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1027
1028 def test_imap(self):
1029 it = self.pool.imap(sqr, range(10))
1030 self.assertEqual(list(it), map(sqr, range(10)))
1031
1032 it = self.pool.imap(sqr, range(10))
1033 for i in range(10):
1034 self.assertEqual(it.next(), i*i)
1035 self.assertRaises(StopIteration, it.next)
1036
1037 it = self.pool.imap(sqr, range(1000), chunksize=100)
1038 for i in range(1000):
1039 self.assertEqual(it.next(), i*i)
1040 self.assertRaises(StopIteration, it.next)
1041
1042 def test_imap_unordered(self):
1043 it = self.pool.imap_unordered(sqr, range(1000))
1044 self.assertEqual(sorted(it), map(sqr, range(1000)))
1045
1046 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1047 self.assertEqual(sorted(it), map(sqr, range(1000)))
1048
1049 def test_make_pool(self):
1050 p = multiprocessing.Pool(3)
1051 self.assertEqual(3, len(p._pool))
1052 p.close()
1053 p.join()
1054
1055 def test_terminate(self):
1056 if self.TYPE == 'manager':
1057 # On Unix a forked process increfs each shared object to
1058 # which its parent process held a reference. If the
1059 # forked process gets terminated then there is likely to
1060 # be a reference leak. So to prevent
1061 # _TestZZZNumberOfObjects from failing we skip this test
1062 # when using a manager.
1063 return
1064
1065 result = self.pool.map_async(
1066 time.sleep, [0.1 for i in range(10000)], chunksize=1
1067 )
1068 self.pool.terminate()
1069 join = TimingWrapper(self.pool.join)
1070 join()
1071 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001072
1073class _TestPoolWorkerLifetime(BaseTestCase):
1074
1075 ALLOWED_TYPES = ('processes', )
1076 def test_pool_worker_lifetime(self):
1077 p = multiprocessing.Pool(3, maxtasksperchild=10)
1078 self.assertEqual(3, len(p._pool))
1079 origworkerpids = [w.pid for w in p._pool]
1080 # Run many tasks so each worker gets replaced (hopefully)
1081 results = []
1082 for i in range(100):
1083 results.append(p.apply_async(sqr, (i, )))
1084 # Fetch the results and verify we got the right answers,
1085 # also ensuring all the tasks have completed.
1086 for (j, res) in enumerate(results):
1087 self.assertEqual(res.get(), sqr(j))
1088 # Refill the pool
1089 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001090 # Wait until all workers are alive
1091 countdown = 5
1092 while countdown and not all(w.is_alive() for w in p._pool):
1093 countdown -= 1
1094 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001095 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001096 # All pids should be assigned. See issue #7805.
1097 self.assertNotIn(None, origworkerpids)
1098 self.assertNotIn(None, finalworkerpids)
1099 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001100 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1101 p.close()
1102 p.join()
1103
Benjamin Petersondfd79492008-06-13 19:13:39 +00001104#
1105# Test that manager has expected number of shared objects left
1106#
1107
1108class _TestZZZNumberOfObjects(BaseTestCase):
1109 # Because test cases are sorted alphabetically, this one will get
1110 # run after all the other tests for the manager. It tests that
1111 # there have been no "reference leaks" for the manager's shared
1112 # objects. Note the comment in _TestPool.test_terminate().
1113 ALLOWED_TYPES = ('manager',)
1114
1115 def test_number_of_objects(self):
1116 EXPECTED_NUMBER = 1 # the pool object is still alive
1117 multiprocessing.active_children() # discard dead process objs
1118 gc.collect() # do garbage collection
1119 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001120 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001121 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001122 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001123 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001124
1125 self.assertEqual(refs, EXPECTED_NUMBER)
1126
1127#
1128# Test of creating a customized manager class
1129#
1130
1131from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1132
1133class FooBar(object):
1134 def f(self):
1135 return 'f()'
1136 def g(self):
1137 raise ValueError
1138 def _h(self):
1139 return '_h()'
1140
1141def baz():
1142 for i in xrange(10):
1143 yield i*i
1144
1145class IteratorProxy(BaseProxy):
1146 _exposed_ = ('next', '__next__')
1147 def __iter__(self):
1148 return self
1149 def next(self):
1150 return self._callmethod('next')
1151 def __next__(self):
1152 return self._callmethod('__next__')
1153
1154class MyManager(BaseManager):
1155 pass
1156
1157MyManager.register('Foo', callable=FooBar)
1158MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1159MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1160
1161
1162class _TestMyManager(BaseTestCase):
1163
1164 ALLOWED_TYPES = ('manager',)
1165
1166 def test_mymanager(self):
1167 manager = MyManager()
1168 manager.start()
1169
1170 foo = manager.Foo()
1171 bar = manager.Bar()
1172 baz = manager.baz()
1173
1174 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1175 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1176
1177 self.assertEqual(foo_methods, ['f', 'g'])
1178 self.assertEqual(bar_methods, ['f', '_h'])
1179
1180 self.assertEqual(foo.f(), 'f()')
1181 self.assertRaises(ValueError, foo.g)
1182 self.assertEqual(foo._callmethod('f'), 'f()')
1183 self.assertRaises(RemoteError, foo._callmethod, '_h')
1184
1185 self.assertEqual(bar.f(), 'f()')
1186 self.assertEqual(bar._h(), '_h()')
1187 self.assertEqual(bar._callmethod('f'), 'f()')
1188 self.assertEqual(bar._callmethod('_h'), '_h()')
1189
1190 self.assertEqual(list(baz), [i*i for i in range(10)])
1191
1192 manager.shutdown()
1193
1194#
1195# Test of connecting to a remote server and using xmlrpclib for serialization
1196#
1197
1198_queue = Queue.Queue()
1199def get_queue():
1200 return _queue
1201
1202class QueueManager(BaseManager):
1203 '''manager class used by server process'''
1204QueueManager.register('get_queue', callable=get_queue)
1205
1206class QueueManager2(BaseManager):
1207 '''manager class which specifies the same interface as QueueManager'''
1208QueueManager2.register('get_queue')
1209
1210
1211SERIALIZER = 'xmlrpclib'
1212
1213class _TestRemoteManager(BaseTestCase):
1214
1215 ALLOWED_TYPES = ('manager',)
1216
1217 def _putter(self, address, authkey):
1218 manager = QueueManager2(
1219 address=address, authkey=authkey, serializer=SERIALIZER
1220 )
1221 manager.connect()
1222 queue = manager.get_queue()
1223 queue.put(('hello world', None, True, 2.25))
1224
1225 def test_remote(self):
1226 authkey = os.urandom(32)
1227
1228 manager = QueueManager(
1229 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1230 )
1231 manager.start()
1232
1233 p = self.Process(target=self._putter, args=(manager.address, authkey))
1234 p.start()
1235
1236 manager2 = QueueManager2(
1237 address=manager.address, authkey=authkey, serializer=SERIALIZER
1238 )
1239 manager2.connect()
1240 queue = manager2.get_queue()
1241
1242 # Note that xmlrpclib will deserialize object as a list not a tuple
1243 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1244
1245 # Because we are using xmlrpclib for serialization instead of
1246 # pickle this will cause a serialization error.
1247 self.assertRaises(Exception, queue.put, time.sleep)
1248
1249 # Make queue finalizer run before the server is stopped
1250 del queue
1251 manager.shutdown()
1252
Jesse Noller459a6482009-03-30 15:50:42 +00001253class _TestManagerRestart(BaseTestCase):
1254
1255 def _putter(self, address, authkey):
1256 manager = QueueManager(
1257 address=address, authkey=authkey, serializer=SERIALIZER)
1258 manager.connect()
1259 queue = manager.get_queue()
1260 queue.put('hello world')
1261
1262 def test_rapid_restart(self):
1263 authkey = os.urandom(32)
R. David Murrayc7298ff2009-12-14 21:57:39 +00001264 port = test_support.find_unused_port()
Jesse Noller459a6482009-03-30 15:50:42 +00001265 manager = QueueManager(
R. David Murrayc7298ff2009-12-14 21:57:39 +00001266 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001267 manager.start()
1268
1269 p = self.Process(target=self._putter, args=(manager.address, authkey))
1270 p.start()
1271 queue = manager.get_queue()
1272 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001273 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001274 manager.shutdown()
1275 manager = QueueManager(
R. David Murrayc7298ff2009-12-14 21:57:39 +00001276 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001277 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001278 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001279
Benjamin Petersondfd79492008-06-13 19:13:39 +00001280#
1281#
1282#
1283
1284SENTINEL = latin('')
1285
1286class _TestConnection(BaseTestCase):
1287
1288 ALLOWED_TYPES = ('processes', 'threads')
1289
1290 def _echo(self, conn):
1291 for msg in iter(conn.recv_bytes, SENTINEL):
1292 conn.send_bytes(msg)
1293 conn.close()
1294
1295 def test_connection(self):
1296 conn, child_conn = self.Pipe()
1297
1298 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001299 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001300 p.start()
1301
1302 seq = [1, 2.25, None]
1303 msg = latin('hello world')
1304 longmsg = msg * 10
1305 arr = array.array('i', range(4))
1306
1307 if self.TYPE == 'processes':
1308 self.assertEqual(type(conn.fileno()), int)
1309
1310 self.assertEqual(conn.send(seq), None)
1311 self.assertEqual(conn.recv(), seq)
1312
1313 self.assertEqual(conn.send_bytes(msg), None)
1314 self.assertEqual(conn.recv_bytes(), msg)
1315
1316 if self.TYPE == 'processes':
1317 buffer = array.array('i', [0]*10)
1318 expected = list(arr) + [0] * (10 - len(arr))
1319 self.assertEqual(conn.send_bytes(arr), None)
1320 self.assertEqual(conn.recv_bytes_into(buffer),
1321 len(arr) * buffer.itemsize)
1322 self.assertEqual(list(buffer), expected)
1323
1324 buffer = array.array('i', [0]*10)
1325 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1326 self.assertEqual(conn.send_bytes(arr), None)
1327 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1328 len(arr) * buffer.itemsize)
1329 self.assertEqual(list(buffer), expected)
1330
1331 buffer = bytearray(latin(' ' * 40))
1332 self.assertEqual(conn.send_bytes(longmsg), None)
1333 try:
1334 res = conn.recv_bytes_into(buffer)
1335 except multiprocessing.BufferTooShort, e:
1336 self.assertEqual(e.args, (longmsg,))
1337 else:
1338 self.fail('expected BufferTooShort, got %s' % res)
1339
1340 poll = TimingWrapper(conn.poll)
1341
1342 self.assertEqual(poll(), False)
1343 self.assertTimingAlmostEqual(poll.elapsed, 0)
1344
1345 self.assertEqual(poll(TIMEOUT1), False)
1346 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1347
1348 conn.send(None)
1349
1350 self.assertEqual(poll(TIMEOUT1), True)
1351 self.assertTimingAlmostEqual(poll.elapsed, 0)
1352
1353 self.assertEqual(conn.recv(), None)
1354
1355 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1356 conn.send_bytes(really_big_msg)
1357 self.assertEqual(conn.recv_bytes(), really_big_msg)
1358
1359 conn.send_bytes(SENTINEL) # tell child to quit
1360 child_conn.close()
1361
1362 if self.TYPE == 'processes':
1363 self.assertEqual(conn.readable, True)
1364 self.assertEqual(conn.writable, True)
1365 self.assertRaises(EOFError, conn.recv)
1366 self.assertRaises(EOFError, conn.recv_bytes)
1367
1368 p.join()
1369
1370 def test_duplex_false(self):
1371 reader, writer = self.Pipe(duplex=False)
1372 self.assertEqual(writer.send(1), None)
1373 self.assertEqual(reader.recv(), 1)
1374 if self.TYPE == 'processes':
1375 self.assertEqual(reader.readable, True)
1376 self.assertEqual(reader.writable, False)
1377 self.assertEqual(writer.readable, False)
1378 self.assertEqual(writer.writable, True)
1379 self.assertRaises(IOError, reader.send, 2)
1380 self.assertRaises(IOError, writer.recv)
1381 self.assertRaises(IOError, writer.poll)
1382
1383 def test_spawn_close(self):
1384 # We test that a pipe connection can be closed by parent
1385 # process immediately after child is spawned. On Windows this
1386 # would have sometimes failed on old versions because
1387 # child_conn would be closed before the child got a chance to
1388 # duplicate it.
1389 conn, child_conn = self.Pipe()
1390
1391 p = self.Process(target=self._echo, args=(child_conn,))
1392 p.start()
1393 child_conn.close() # this might complete before child initializes
1394
1395 msg = latin('hello')
1396 conn.send_bytes(msg)
1397 self.assertEqual(conn.recv_bytes(), msg)
1398
1399 conn.send_bytes(SENTINEL)
1400 conn.close()
1401 p.join()
1402
1403 def test_sendbytes(self):
1404 if self.TYPE != 'processes':
1405 return
1406
1407 msg = latin('abcdefghijklmnopqrstuvwxyz')
1408 a, b = self.Pipe()
1409
1410 a.send_bytes(msg)
1411 self.assertEqual(b.recv_bytes(), msg)
1412
1413 a.send_bytes(msg, 5)
1414 self.assertEqual(b.recv_bytes(), msg[5:])
1415
1416 a.send_bytes(msg, 7, 8)
1417 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1418
1419 a.send_bytes(msg, 26)
1420 self.assertEqual(b.recv_bytes(), latin(''))
1421
1422 a.send_bytes(msg, 26, 0)
1423 self.assertEqual(b.recv_bytes(), latin(''))
1424
1425 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1426
1427 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1428
1429 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1430
1431 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1432
1433 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1434
Benjamin Petersondfd79492008-06-13 19:13:39 +00001435class _TestListenerClient(BaseTestCase):
1436
1437 ALLOWED_TYPES = ('processes', 'threads')
1438
1439 def _test(self, address):
1440 conn = self.connection.Client(address)
1441 conn.send('hello')
1442 conn.close()
1443
1444 def test_listener_client(self):
1445 for family in self.connection.families:
1446 l = self.connection.Listener(family=family)
1447 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001448 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001449 p.start()
1450 conn = l.accept()
1451 self.assertEqual(conn.recv(), 'hello')
1452 p.join()
1453 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001454#
1455# Test of sending connection and socket objects between processes
1456#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001457"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001458class _TestPicklingConnections(BaseTestCase):
1459
1460 ALLOWED_TYPES = ('processes',)
1461
1462 def _listener(self, conn, families):
1463 for fam in families:
1464 l = self.connection.Listener(family=fam)
1465 conn.send(l.address)
1466 new_conn = l.accept()
1467 conn.send(new_conn)
1468
1469 if self.TYPE == 'processes':
1470 l = socket.socket()
1471 l.bind(('localhost', 0))
1472 conn.send(l.getsockname())
1473 l.listen(1)
1474 new_conn, addr = l.accept()
1475 conn.send(new_conn)
1476
1477 conn.recv()
1478
1479 def _remote(self, conn):
1480 for (address, msg) in iter(conn.recv, None):
1481 client = self.connection.Client(address)
1482 client.send(msg.upper())
1483 client.close()
1484
1485 if self.TYPE == 'processes':
1486 address, msg = conn.recv()
1487 client = socket.socket()
1488 client.connect(address)
1489 client.sendall(msg.upper())
1490 client.close()
1491
1492 conn.close()
1493
1494 def test_pickling(self):
1495 try:
1496 multiprocessing.allow_connection_pickling()
1497 except ImportError:
1498 return
1499
1500 families = self.connection.families
1501
1502 lconn, lconn0 = self.Pipe()
1503 lp = self.Process(target=self._listener, args=(lconn0, families))
1504 lp.start()
1505 lconn0.close()
1506
1507 rconn, rconn0 = self.Pipe()
1508 rp = self.Process(target=self._remote, args=(rconn0,))
1509 rp.start()
1510 rconn0.close()
1511
1512 for fam in families:
1513 msg = ('This connection uses family %s' % fam).encode('ascii')
1514 address = lconn.recv()
1515 rconn.send((address, msg))
1516 new_conn = lconn.recv()
1517 self.assertEqual(new_conn.recv(), msg.upper())
1518
1519 rconn.send(None)
1520
1521 if self.TYPE == 'processes':
1522 msg = latin('This connection uses a normal socket')
1523 address = lconn.recv()
1524 rconn.send((address, msg))
1525 if hasattr(socket, 'fromfd'):
1526 new_conn = lconn.recv()
1527 self.assertEqual(new_conn.recv(100), msg.upper())
1528 else:
1529 # XXX On Windows with Py2.6 need to backport fromfd()
1530 discard = lconn.recv_bytes()
1531
1532 lconn.send(None)
1533
1534 rconn.close()
1535 lconn.close()
1536
1537 lp.join()
1538 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001539"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001540#
1541#
1542#
1543
1544class _TestHeap(BaseTestCase):
1545
1546 ALLOWED_TYPES = ('processes',)
1547
1548 def test_heap(self):
1549 iterations = 5000
1550 maxblocks = 50
1551 blocks = []
1552
1553 # create and destroy lots of blocks of different sizes
1554 for i in xrange(iterations):
1555 size = int(random.lognormvariate(0, 1) * 1000)
1556 b = multiprocessing.heap.BufferWrapper(size)
1557 blocks.append(b)
1558 if len(blocks) > maxblocks:
1559 i = random.randrange(maxblocks)
1560 del blocks[i]
1561
1562 # get the heap object
1563 heap = multiprocessing.heap.BufferWrapper._heap
1564
1565 # verify the state of the heap
1566 all = []
1567 occupied = 0
1568 for L in heap._len_to_seq.values():
1569 for arena, start, stop in L:
1570 all.append((heap._arenas.index(arena), start, stop,
1571 stop-start, 'free'))
1572 for arena, start, stop in heap._allocated_blocks:
1573 all.append((heap._arenas.index(arena), start, stop,
1574 stop-start, 'occupied'))
1575 occupied += (stop-start)
1576
1577 all.sort()
1578
1579 for i in range(len(all)-1):
1580 (arena, start, stop) = all[i][:3]
1581 (narena, nstart, nstop) = all[i+1][:3]
1582 self.assertTrue((arena != narena and nstart == 0) or
1583 (stop == nstart))
1584
1585#
1586#
1587#
1588
Benjamin Petersondfd79492008-06-13 19:13:39 +00001589class _Foo(Structure):
1590 _fields_ = [
1591 ('x', c_int),
1592 ('y', c_double)
1593 ]
1594
1595class _TestSharedCTypes(BaseTestCase):
1596
1597 ALLOWED_TYPES = ('processes',)
1598
1599 def _double(self, x, y, foo, arr, string):
1600 x.value *= 2
1601 y.value *= 2
1602 foo.x *= 2
1603 foo.y *= 2
1604 string.value *= 2
1605 for i in range(len(arr)):
1606 arr[i] *= 2
1607
Nick Coghlan13623662010-04-10 14:24:36 +00001608 @unittest.skipIf(Value is None, "requires ctypes.Value")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001609 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001610 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001611 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001612 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001613 arr = self.Array('d', range(10), lock=lock)
1614 string = self.Array('c', 20, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001615 string.value = 'hello'
1616
1617 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1618 p.start()
1619 p.join()
1620
1621 self.assertEqual(x.value, 14)
1622 self.assertAlmostEqual(y.value, 2.0/3.0)
1623 self.assertEqual(foo.x, 6)
1624 self.assertAlmostEqual(foo.y, 4.0)
1625 for i in range(10):
1626 self.assertAlmostEqual(arr[i], i*2)
1627 self.assertEqual(string.value, latin('hellohello'))
1628
Nick Coghlan13623662010-04-10 14:24:36 +00001629 @unittest.skipIf(Value is None, "requires ctypes.Value")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001630 def test_synchronize(self):
1631 self.test_sharedctypes(lock=True)
1632
Nick Coghlan13623662010-04-10 14:24:36 +00001633 @unittest.skipIf(ctypes_copy is None, "requires ctypes.copy")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001634 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001635 foo = _Foo(2, 5.0)
Nick Coghlan13623662010-04-10 14:24:36 +00001636 bar = ctypes_copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001637 foo.x = 0
1638 foo.y = 0
1639 self.assertEqual(bar.x, 2)
1640 self.assertAlmostEqual(bar.y, 5.0)
1641
1642#
1643#
1644#
1645
1646class _TestFinalize(BaseTestCase):
1647
1648 ALLOWED_TYPES = ('processes',)
1649
1650 def _test_finalize(self, conn):
1651 class Foo(object):
1652 pass
1653
1654 a = Foo()
1655 util.Finalize(a, conn.send, args=('a',))
1656 del a # triggers callback for a
1657
1658 b = Foo()
1659 close_b = util.Finalize(b, conn.send, args=('b',))
1660 close_b() # triggers callback for b
1661 close_b() # does nothing because callback has already been called
1662 del b # does nothing because callback has already been called
1663
1664 c = Foo()
1665 util.Finalize(c, conn.send, args=('c',))
1666
1667 d10 = Foo()
1668 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1669
1670 d01 = Foo()
1671 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1672 d02 = Foo()
1673 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1674 d03 = Foo()
1675 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1676
1677 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1678
1679 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1680
1681 # call mutliprocessing's cleanup function then exit process without
1682 # garbage collecting locals
1683 util._exit_function()
1684 conn.close()
1685 os._exit(0)
1686
1687 def test_finalize(self):
1688 conn, child_conn = self.Pipe()
1689
1690 p = self.Process(target=self._test_finalize, args=(child_conn,))
1691 p.start()
1692 p.join()
1693
1694 result = [obj for obj in iter(conn.recv, 'STOP')]
1695 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1696
1697#
1698# Test that from ... import * works for each module
1699#
1700
1701class _TestImportStar(BaseTestCase):
1702
1703 ALLOWED_TYPES = ('processes',)
1704
1705 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001706 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001707 'multiprocessing', 'multiprocessing.connection',
1708 'multiprocessing.heap', 'multiprocessing.managers',
1709 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001710 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001711 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001712 ]
1713
1714 if c_int is not None:
1715 # This module requires _ctypes
1716 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001717
1718 for name in modules:
1719 __import__(name)
1720 mod = sys.modules[name]
1721
1722 for attr in getattr(mod, '__all__', ()):
1723 self.assertTrue(
1724 hasattr(mod, attr),
1725 '%r does not have attribute %r' % (mod, attr)
1726 )
1727
1728#
1729# Quick test that logging works -- does not test logging output
1730#
1731
1732class _TestLogging(BaseTestCase):
1733
1734 ALLOWED_TYPES = ('processes',)
1735
1736 def test_enable_logging(self):
1737 logger = multiprocessing.get_logger()
1738 logger.setLevel(util.SUBWARNING)
1739 self.assertTrue(logger is not None)
1740 logger.debug('this will not be printed')
1741 logger.info('nor will this')
1742 logger.setLevel(LOG_LEVEL)
1743
1744 def _test_level(self, conn):
1745 logger = multiprocessing.get_logger()
1746 conn.send(logger.getEffectiveLevel())
1747
1748 def test_level(self):
1749 LEVEL1 = 32
1750 LEVEL2 = 37
1751
1752 logger = multiprocessing.get_logger()
1753 root_logger = logging.getLogger()
1754 root_level = root_logger.level
1755
1756 reader, writer = multiprocessing.Pipe(duplex=False)
1757
1758 logger.setLevel(LEVEL1)
1759 self.Process(target=self._test_level, args=(writer,)).start()
1760 self.assertEqual(LEVEL1, reader.recv())
1761
1762 logger.setLevel(logging.NOTSET)
1763 root_logger.setLevel(LEVEL2)
1764 self.Process(target=self._test_level, args=(writer,)).start()
1765 self.assertEqual(LEVEL2, reader.recv())
1766
1767 root_logger.setLevel(root_level)
1768 logger.setLevel(level=LOG_LEVEL)
1769
Jesse Noller814d02d2009-11-21 14:38:23 +00001770
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001771# class _TestLoggingProcessName(BaseTestCase):
1772#
1773# def handle(self, record):
1774# assert record.processName == multiprocessing.current_process().name
1775# self.__handled = True
1776#
1777# def test_logging(self):
1778# handler = logging.Handler()
1779# handler.handle = self.handle
1780# self.__handled = False
1781# # Bypass getLogger() and side-effects
1782# logger = logging.getLoggerClass()(
1783# 'multiprocessing.test.TestLoggingProcessName')
1784# logger.addHandler(handler)
1785# logger.propagate = False
1786#
1787# logger.warn('foo')
1788# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001789
Benjamin Petersondfd79492008-06-13 19:13:39 +00001790#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001791# Test to verify handle verification, see issue 3321
1792#
1793
1794class TestInvalidHandle(unittest.TestCase):
1795
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001796 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001797 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001798 conn = _multiprocessing.Connection(44977608)
1799 self.assertRaises(IOError, conn.poll)
1800 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001801
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001802#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001803# Functions used to create test cases from the base ones in this module
1804#
1805
1806def get_attributes(Source, names):
1807 d = {}
1808 for name in names:
1809 obj = getattr(Source, name)
1810 if type(obj) == type(get_attributes):
1811 obj = staticmethod(obj)
1812 d[name] = obj
1813 return d
1814
1815def create_test_cases(Mixin, type):
1816 result = {}
1817 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001818 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001819
1820 for name in glob.keys():
1821 if name.startswith('_Test'):
1822 base = glob[name]
1823 if type in base.ALLOWED_TYPES:
1824 newname = 'With' + Type + name[1:]
1825 class Temp(base, unittest.TestCase, Mixin):
1826 pass
1827 result[newname] = Temp
1828 Temp.__name__ = newname
1829 Temp.__module__ = Mixin.__module__
1830 return result
1831
1832#
1833# Create test cases
1834#
1835
1836class ProcessesMixin(object):
1837 TYPE = 'processes'
1838 Process = multiprocessing.Process
1839 locals().update(get_attributes(multiprocessing, (
1840 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1841 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1842 'RawArray', 'current_process', 'active_children', 'Pipe',
1843 'connection', 'JoinableQueue'
1844 )))
1845
1846testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1847globals().update(testcases_processes)
1848
1849
1850class ManagerMixin(object):
1851 TYPE = 'manager'
1852 Process = multiprocessing.Process
1853 manager = object.__new__(multiprocessing.managers.SyncManager)
1854 locals().update(get_attributes(manager, (
1855 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1856 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1857 'Namespace', 'JoinableQueue'
1858 )))
1859
1860testcases_manager = create_test_cases(ManagerMixin, type='manager')
1861globals().update(testcases_manager)
1862
1863
1864class ThreadsMixin(object):
1865 TYPE = 'threads'
1866 Process = multiprocessing.dummy.Process
1867 locals().update(get_attributes(multiprocessing.dummy, (
1868 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1869 'Condition', 'Event', 'Value', 'Array', 'current_process',
1870 'active_children', 'Pipe', 'connection', 'dict', 'list',
1871 'Namespace', 'JoinableQueue'
1872 )))
1873
1874testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1875globals().update(testcases_threads)
1876
Neal Norwitz0c519b32008-08-25 01:50:24 +00001877class OtherTest(unittest.TestCase):
1878 # TODO: add more tests for deliver/answer challenge.
1879 def test_deliver_challenge_auth_failure(self):
1880 class _FakeConnection(object):
1881 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001882 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001883 def send_bytes(self, data):
1884 pass
1885 self.assertRaises(multiprocessing.AuthenticationError,
1886 multiprocessing.connection.deliver_challenge,
1887 _FakeConnection(), b'abc')
1888
1889 def test_answer_challenge_auth_failure(self):
1890 class _FakeConnection(object):
1891 def __init__(self):
1892 self.count = 0
1893 def recv_bytes(self, size):
1894 self.count += 1
1895 if self.count == 1:
1896 return multiprocessing.connection.CHALLENGE
1897 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001898 return b'something bogus'
1899 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001900 def send_bytes(self, data):
1901 pass
1902 self.assertRaises(multiprocessing.AuthenticationError,
1903 multiprocessing.connection.answer_challenge,
1904 _FakeConnection(), b'abc')
1905
Jesse Noller7152f6d2009-04-02 05:17:26 +00001906#
1907# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1908#
1909
1910def initializer(ns):
1911 ns.test += 1
1912
1913class TestInitializers(unittest.TestCase):
1914 def setUp(self):
1915 self.mgr = multiprocessing.Manager()
1916 self.ns = self.mgr.Namespace()
1917 self.ns.test = 0
1918
1919 def tearDown(self):
1920 self.mgr.shutdown()
1921
1922 def test_manager_initializer(self):
1923 m = multiprocessing.managers.SyncManager()
1924 self.assertRaises(TypeError, m.start, 1)
1925 m.start(initializer, (self.ns,))
1926 self.assertEqual(self.ns.test, 1)
1927 m.shutdown()
1928
1929 def test_pool_initializer(self):
1930 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1931 p = multiprocessing.Pool(1, initializer, (self.ns,))
1932 p.close()
1933 p.join()
1934 self.assertEqual(self.ns.test, 1)
1935
Jesse Noller1b90efb2009-06-30 17:11:52 +00001936#
1937# Issue 5155, 5313, 5331: Test process in processes
1938# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1939#
1940
1941def _ThisSubProcess(q):
1942 try:
1943 item = q.get(block=False)
1944 except Queue.Empty:
1945 pass
1946
1947def _TestProcess(q):
1948 queue = multiprocessing.Queue()
1949 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1950 subProc.start()
1951 subProc.join()
1952
1953def _afunc(x):
1954 return x*x
1955
1956def pool_in_process():
1957 pool = multiprocessing.Pool(processes=4)
1958 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1959
1960class _file_like(object):
1961 def __init__(self, delegate):
1962 self._delegate = delegate
1963 self._pid = None
1964
1965 @property
1966 def cache(self):
1967 pid = os.getpid()
1968 # There are no race conditions since fork keeps only the running thread
1969 if pid != self._pid:
1970 self._pid = pid
1971 self._cache = []
1972 return self._cache
1973
1974 def write(self, data):
1975 self.cache.append(data)
1976
1977 def flush(self):
1978 self._delegate.write(''.join(self.cache))
1979 self._cache = []
1980
1981class TestStdinBadfiledescriptor(unittest.TestCase):
1982
1983 def test_queue_in_process(self):
1984 queue = multiprocessing.Queue()
1985 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1986 proc.start()
1987 proc.join()
1988
1989 def test_pool_in_process(self):
1990 p = multiprocessing.Process(target=pool_in_process)
1991 p.start()
1992 p.join()
1993
1994 def test_flushing(self):
1995 sio = StringIO()
1996 flike = _file_like(sio)
1997 flike.write('foo')
1998 proc = multiprocessing.Process(target=lambda: flike.flush())
1999 flike.flush()
2000 assert sio.getvalue() == 'foo'
2001
2002testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2003 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002004
Benjamin Petersondfd79492008-06-13 19:13:39 +00002005#
2006#
2007#
2008
2009def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002010 if sys.platform.startswith("linux"):
2011 try:
2012 lock = multiprocessing.RLock()
2013 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002014 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002015
Benjamin Petersondfd79492008-06-13 19:13:39 +00002016 if run is None:
2017 from test.test_support import run_unittest as run
2018
2019 util.get_temp_dir() # creates temp directory for use by all processes
2020
2021 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2022
Jesse Noller146b7ab2008-07-02 16:44:09 +00002023 ProcessesMixin.pool = multiprocessing.Pool(4)
2024 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2025 ManagerMixin.manager.__init__()
2026 ManagerMixin.manager.start()
2027 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002028
2029 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002030 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2031 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002032 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2033 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002034 )
2035
2036 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2037 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002038 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2039 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002040 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002041 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002042 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002043 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2044 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2045 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002046 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002047
Jesse Noller146b7ab2008-07-02 16:44:09 +00002048 ThreadsMixin.pool.terminate()
2049 ProcessesMixin.pool.terminate()
2050 ManagerMixin.pool.terminate()
2051 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002052
Jesse Noller146b7ab2008-07-02 16:44:09 +00002053 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002054
2055def main():
2056 test_main(unittest.TextTestRunner(verbosity=2).run)
2057
2058if __name__ == '__main__':
2059 main()