blob: 935e224d445ffb38129e720eef0c9292ba196fd2 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Antoine Pitroua1a8da82011-08-23 19:54:20 +020036from multiprocessing import util, reduction
Benjamin Petersondfd79492008-06-13 19:13:39 +000037
Brian Curtina06e9b82010-10-07 02:27:41 +000038try:
39 from multiprocessing.sharedctypes import Value, copy
40 HAS_SHAREDCTYPES = True
41except ImportError:
42 HAS_SHAREDCTYPES = False
43
Antoine Pitroua1a8da82011-08-23 19:54:20 +020044try:
45 import msvcrt
46except ImportError:
47 msvcrt = None
48
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50#
51#
52
Benjamin Petersone79edf52008-07-13 18:34:58 +000053latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000054
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56# Constants
57#
58
59LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000060#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000061
62DELTA = 0.1
63CHECK_TIMINGS = False # making true makes tests take a lot longer
64 # and can sometimes cause some non-serious
65 # failures because some calls block a bit
66 # longer than expected
67if CHECK_TIMINGS:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
69else:
70 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
71
72HAVE_GETVALUE = not getattr(_multiprocessing,
73 'HAVE_BROKEN_SEM_GETVALUE', False)
74
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000075WIN32 = (sys.platform == "win32")
76
Antoine Pitroua1a8da82011-08-23 19:54:20 +020077try:
78 MAXFD = os.sysconf("SC_OPEN_MAX")
79except:
80 MAXFD = 256
81
Benjamin Petersondfd79492008-06-13 19:13:39 +000082#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000083# Some tests require ctypes
84#
85
86try:
Nick Coghlan13623662010-04-10 14:24:36 +000087 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000088except ImportError:
89 Structure = object
90 c_int = c_double = None
91
92#
Benjamin Petersondfd79492008-06-13 19:13:39 +000093# Creates a wrapper for a function which records the time it takes to finish
94#
95
96class TimingWrapper(object):
97
98 def __init__(self, func):
99 self.func = func
100 self.elapsed = None
101
102 def __call__(self, *args, **kwds):
103 t = time.time()
104 try:
105 return self.func(*args, **kwds)
106 finally:
107 self.elapsed = time.time() - t
108
109#
110# Base class for test cases
111#
112
113class BaseTestCase(object):
114
115 ALLOWED_TYPES = ('processes', 'manager', 'threads')
116
117 def assertTimingAlmostEqual(self, a, b):
118 if CHECK_TIMINGS:
119 self.assertAlmostEqual(a, b, 1)
120
121 def assertReturnsIfImplemented(self, value, func, *args):
122 try:
123 res = func(*args)
124 except NotImplementedError:
125 pass
126 else:
127 return self.assertEqual(value, res)
128
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000129 # For the sanity of Windows users, rather than crashing or freezing in
130 # multiple ways.
131 def __reduce__(self, *args):
132 raise NotImplementedError("shouldn't try to pickle a test case")
133
134 __reduce_ex__ = __reduce__
135
Benjamin Petersondfd79492008-06-13 19:13:39 +0000136#
137# Return the value of a semaphore
138#
139
140def get_value(self):
141 try:
142 return self.get_value()
143 except AttributeError:
144 try:
145 return self._Semaphore__value
146 except AttributeError:
147 try:
148 return self._value
149 except AttributeError:
150 raise NotImplementedError
151
152#
153# Testcases
154#
155
156class _TestProcess(BaseTestCase):
157
158 ALLOWED_TYPES = ('processes', 'threads')
159
160 def test_current(self):
161 if self.TYPE == 'threads':
162 return
163
164 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000165 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166
167 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000169 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000170 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000171 self.assertEqual(current.ident, os.getpid())
172 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000173
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000174 @classmethod
175 def _test(cls, q, *args, **kwds):
176 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000177 q.put(args)
178 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000179 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000180 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000181 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000182 q.put(current.pid)
183
184 def test_process(self):
185 q = self.Queue(1)
186 e = self.Event()
187 args = (q, 1, 2)
188 kwargs = {'hello':23, 'bye':2.54}
189 name = 'SomeProcess'
190 p = self.Process(
191 target=self._test, args=args, kwargs=kwargs, name=name
192 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194 current = self.current_process()
195
196 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000197 self.assertEqual(p.authkey, current.authkey)
198 self.assertEqual(p.is_alive(), False)
199 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000200 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203
204 p.start()
205
Ezio Melotti2623a372010-11-21 13:34:58 +0000206 self.assertEqual(p.exitcode, None)
207 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000208 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000209
Ezio Melotti2623a372010-11-21 13:34:58 +0000210 self.assertEqual(q.get(), args[1:])
211 self.assertEqual(q.get(), kwargs)
212 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000213 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000214 self.assertEqual(q.get(), current.authkey)
215 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216
217 p.join()
218
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.exitcode, 0)
220 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000221 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000223 @classmethod
224 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225 time.sleep(1000)
226
227 def test_terminate(self):
228 if self.TYPE == 'threads':
229 return
230
231 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000232 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000233 p.start()
234
235 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000236 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000237 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.terminate()
240
241 join = TimingWrapper(p.join)
242 self.assertEqual(join(), None)
243 self.assertTimingAlmostEqual(join.elapsed, 0.0)
244
245 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247
248 p.join()
249
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000250 # XXX sometimes get p.exitcode == 0 on Windows ...
251 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 def test_cpu_count(self):
254 try:
255 cpus = multiprocessing.cpu_count()
256 except NotImplementedError:
257 cpus = 1
258 self.assertTrue(type(cpus) is int)
259 self.assertTrue(cpus >= 1)
260
261 def test_active_children(self):
262 self.assertEqual(type(self.active_children()), list)
263
264 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000265 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000266
267 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000271 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000272
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000273 @classmethod
274 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000275 from multiprocessing import forking
276 wconn.send(id)
277 if len(id) < 2:
278 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000279 p = cls.Process(
280 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000281 )
282 p.start()
283 p.join()
284
285 def test_recursion(self):
286 rconn, wconn = self.Pipe(duplex=False)
287 self._test_recursion(wconn, [])
288
289 time.sleep(DELTA)
290 result = []
291 while rconn.poll():
292 result.append(rconn.recv())
293
294 expected = [
295 [],
296 [0],
297 [0, 0],
298 [0, 1],
299 [1],
300 [1, 0],
301 [1, 1]
302 ]
303 self.assertEqual(result, expected)
304
305#
306#
307#
308
309class _UpperCaser(multiprocessing.Process):
310
311 def __init__(self):
312 multiprocessing.Process.__init__(self)
313 self.child_conn, self.parent_conn = multiprocessing.Pipe()
314
315 def run(self):
316 self.parent_conn.close()
317 for s in iter(self.child_conn.recv, None):
318 self.child_conn.send(s.upper())
319 self.child_conn.close()
320
321 def submit(self, s):
322 assert type(s) is str
323 self.parent_conn.send(s)
324 return self.parent_conn.recv()
325
326 def stop(self):
327 self.parent_conn.send(None)
328 self.parent_conn.close()
329 self.child_conn.close()
330
331class _TestSubclassingProcess(BaseTestCase):
332
333 ALLOWED_TYPES = ('processes',)
334
335 def test_subclassing(self):
336 uppercaser = _UpperCaser()
337 uppercaser.start()
338 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
339 self.assertEqual(uppercaser.submit('world'), 'WORLD')
340 uppercaser.stop()
341 uppercaser.join()
342
343#
344#
345#
346
347def queue_empty(q):
348 if hasattr(q, 'empty'):
349 return q.empty()
350 else:
351 return q.qsize() == 0
352
353def queue_full(q, maxsize):
354 if hasattr(q, 'full'):
355 return q.full()
356 else:
357 return q.qsize() == maxsize
358
359
360class _TestQueue(BaseTestCase):
361
362
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000363 @classmethod
364 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000365 child_can_start.wait()
366 for i in range(6):
367 queue.get()
368 parent_can_continue.set()
369
370 def test_put(self):
371 MAXSIZE = 6
372 queue = self.Queue(maxsize=MAXSIZE)
373 child_can_start = self.Event()
374 parent_can_continue = self.Event()
375
376 proc = self.Process(
377 target=self._test_put,
378 args=(queue, child_can_start, parent_can_continue)
379 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000380 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000381 proc.start()
382
383 self.assertEqual(queue_empty(queue), True)
384 self.assertEqual(queue_full(queue, MAXSIZE), False)
385
386 queue.put(1)
387 queue.put(2, True)
388 queue.put(3, True, None)
389 queue.put(4, False)
390 queue.put(5, False, None)
391 queue.put_nowait(6)
392
393 # the values may be in buffer but not yet in pipe so sleep a bit
394 time.sleep(DELTA)
395
396 self.assertEqual(queue_empty(queue), False)
397 self.assertEqual(queue_full(queue, MAXSIZE), True)
398
399 put = TimingWrapper(queue.put)
400 put_nowait = TimingWrapper(queue.put_nowait)
401
402 self.assertRaises(Queue.Full, put, 7, False)
403 self.assertTimingAlmostEqual(put.elapsed, 0)
404
405 self.assertRaises(Queue.Full, put, 7, False, None)
406 self.assertTimingAlmostEqual(put.elapsed, 0)
407
408 self.assertRaises(Queue.Full, put_nowait, 7)
409 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
410
411 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
412 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
413
414 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
415 self.assertTimingAlmostEqual(put.elapsed, 0)
416
417 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
418 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
419
420 child_can_start.set()
421 parent_can_continue.wait()
422
423 self.assertEqual(queue_empty(queue), True)
424 self.assertEqual(queue_full(queue, MAXSIZE), False)
425
426 proc.join()
427
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000428 @classmethod
429 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000430 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000431 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000432 queue.put(2)
433 queue.put(3)
434 queue.put(4)
435 queue.put(5)
436 parent_can_continue.set()
437
438 def test_get(self):
439 queue = self.Queue()
440 child_can_start = self.Event()
441 parent_can_continue = self.Event()
442
443 proc = self.Process(
444 target=self._test_get,
445 args=(queue, child_can_start, parent_can_continue)
446 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000447 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000448 proc.start()
449
450 self.assertEqual(queue_empty(queue), True)
451
452 child_can_start.set()
453 parent_can_continue.wait()
454
455 time.sleep(DELTA)
456 self.assertEqual(queue_empty(queue), False)
457
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000458 # Hangs unexpectedly, remove for now
459 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000460 self.assertEqual(queue.get(True, None), 2)
461 self.assertEqual(queue.get(True), 3)
462 self.assertEqual(queue.get(timeout=1), 4)
463 self.assertEqual(queue.get_nowait(), 5)
464
465 self.assertEqual(queue_empty(queue), True)
466
467 get = TimingWrapper(queue.get)
468 get_nowait = TimingWrapper(queue.get_nowait)
469
470 self.assertRaises(Queue.Empty, get, False)
471 self.assertTimingAlmostEqual(get.elapsed, 0)
472
473 self.assertRaises(Queue.Empty, get, False, None)
474 self.assertTimingAlmostEqual(get.elapsed, 0)
475
476 self.assertRaises(Queue.Empty, get_nowait)
477 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
478
479 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
480 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
481
482 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
483 self.assertTimingAlmostEqual(get.elapsed, 0)
484
485 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
486 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
487
488 proc.join()
489
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000490 @classmethod
491 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000492 for i in range(10, 20):
493 queue.put(i)
494 # note that at this point the items may only be buffered, so the
495 # process cannot shutdown until the feeder thread has finished
496 # pushing items onto the pipe.
497
498 def test_fork(self):
499 # Old versions of Queue would fail to create a new feeder
500 # thread for a forked process if the original process had its
501 # own feeder thread. This test checks that this no longer
502 # happens.
503
504 queue = self.Queue()
505
506 # put items on queue so that main process starts a feeder thread
507 for i in range(10):
508 queue.put(i)
509
510 # wait to make sure thread starts before we fork a new process
511 time.sleep(DELTA)
512
513 # fork process
514 p = self.Process(target=self._test_fork, args=(queue,))
515 p.start()
516
517 # check that all expected items are in the queue
518 for i in range(20):
519 self.assertEqual(queue.get(), i)
520 self.assertRaises(Queue.Empty, queue.get, False)
521
522 p.join()
523
524 def test_qsize(self):
525 q = self.Queue()
526 try:
527 self.assertEqual(q.qsize(), 0)
528 except NotImplementedError:
529 return
530 q.put(1)
531 self.assertEqual(q.qsize(), 1)
532 q.put(5)
533 self.assertEqual(q.qsize(), 2)
534 q.get()
535 self.assertEqual(q.qsize(), 1)
536 q.get()
537 self.assertEqual(q.qsize(), 0)
538
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000539 @classmethod
540 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000541 for obj in iter(q.get, None):
542 time.sleep(DELTA)
543 q.task_done()
544
545 def test_task_done(self):
546 queue = self.JoinableQueue()
547
548 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000549 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000550
551 workers = [self.Process(target=self._test_task_done, args=(queue,))
552 for i in xrange(4)]
553
554 for p in workers:
555 p.start()
556
557 for i in xrange(10):
558 queue.put(i)
559
560 queue.join()
561
562 for p in workers:
563 queue.put(None)
564
565 for p in workers:
566 p.join()
567
568#
569#
570#
571
572class _TestLock(BaseTestCase):
573
574 def test_lock(self):
575 lock = self.Lock()
576 self.assertEqual(lock.acquire(), True)
577 self.assertEqual(lock.acquire(False), False)
578 self.assertEqual(lock.release(), None)
579 self.assertRaises((ValueError, threading.ThreadError), lock.release)
580
581 def test_rlock(self):
582 lock = self.RLock()
583 self.assertEqual(lock.acquire(), True)
584 self.assertEqual(lock.acquire(), True)
585 self.assertEqual(lock.acquire(), True)
586 self.assertEqual(lock.release(), None)
587 self.assertEqual(lock.release(), None)
588 self.assertEqual(lock.release(), None)
589 self.assertRaises((AssertionError, RuntimeError), lock.release)
590
Jesse Noller82eb5902009-03-30 23:29:31 +0000591 def test_lock_context(self):
592 with self.Lock():
593 pass
594
Benjamin Petersondfd79492008-06-13 19:13:39 +0000595
596class _TestSemaphore(BaseTestCase):
597
598 def _test_semaphore(self, sem):
599 self.assertReturnsIfImplemented(2, get_value, sem)
600 self.assertEqual(sem.acquire(), True)
601 self.assertReturnsIfImplemented(1, get_value, sem)
602 self.assertEqual(sem.acquire(), True)
603 self.assertReturnsIfImplemented(0, get_value, sem)
604 self.assertEqual(sem.acquire(False), False)
605 self.assertReturnsIfImplemented(0, get_value, sem)
606 self.assertEqual(sem.release(), None)
607 self.assertReturnsIfImplemented(1, get_value, sem)
608 self.assertEqual(sem.release(), None)
609 self.assertReturnsIfImplemented(2, get_value, sem)
610
611 def test_semaphore(self):
612 sem = self.Semaphore(2)
613 self._test_semaphore(sem)
614 self.assertEqual(sem.release(), None)
615 self.assertReturnsIfImplemented(3, get_value, sem)
616 self.assertEqual(sem.release(), None)
617 self.assertReturnsIfImplemented(4, get_value, sem)
618
619 def test_bounded_semaphore(self):
620 sem = self.BoundedSemaphore(2)
621 self._test_semaphore(sem)
622 # Currently fails on OS/X
623 #if HAVE_GETVALUE:
624 # self.assertRaises(ValueError, sem.release)
625 # self.assertReturnsIfImplemented(2, get_value, sem)
626
627 def test_timeout(self):
628 if self.TYPE != 'processes':
629 return
630
631 sem = self.Semaphore(0)
632 acquire = TimingWrapper(sem.acquire)
633
634 self.assertEqual(acquire(False), False)
635 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
636
637 self.assertEqual(acquire(False, None), False)
638 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
639
640 self.assertEqual(acquire(False, TIMEOUT1), False)
641 self.assertTimingAlmostEqual(acquire.elapsed, 0)
642
643 self.assertEqual(acquire(True, TIMEOUT2), False)
644 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
645
646 self.assertEqual(acquire(timeout=TIMEOUT3), False)
647 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
648
649
650class _TestCondition(BaseTestCase):
651
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000652 @classmethod
653 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000654 cond.acquire()
655 sleeping.release()
656 cond.wait(timeout)
657 woken.release()
658 cond.release()
659
660 def check_invariant(self, cond):
661 # this is only supposed to succeed when there are no sleepers
662 if self.TYPE == 'processes':
663 try:
664 sleepers = (cond._sleeping_count.get_value() -
665 cond._woken_count.get_value())
666 self.assertEqual(sleepers, 0)
667 self.assertEqual(cond._wait_semaphore.get_value(), 0)
668 except NotImplementedError:
669 pass
670
671 def test_notify(self):
672 cond = self.Condition()
673 sleeping = self.Semaphore(0)
674 woken = self.Semaphore(0)
675
676 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000677 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000678 p.start()
679
680 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000681 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000682 p.start()
683
684 # wait for both children to start sleeping
685 sleeping.acquire()
686 sleeping.acquire()
687
688 # check no process/thread has woken up
689 time.sleep(DELTA)
690 self.assertReturnsIfImplemented(0, get_value, woken)
691
692 # wake up one process/thread
693 cond.acquire()
694 cond.notify()
695 cond.release()
696
697 # check one process/thread has woken up
698 time.sleep(DELTA)
699 self.assertReturnsIfImplemented(1, get_value, woken)
700
701 # wake up another
702 cond.acquire()
703 cond.notify()
704 cond.release()
705
706 # check other has woken up
707 time.sleep(DELTA)
708 self.assertReturnsIfImplemented(2, get_value, woken)
709
710 # check state is not mucked up
711 self.check_invariant(cond)
712 p.join()
713
714 def test_notify_all(self):
715 cond = self.Condition()
716 sleeping = self.Semaphore(0)
717 woken = self.Semaphore(0)
718
719 # start some threads/processes which will timeout
720 for i in range(3):
721 p = self.Process(target=self.f,
722 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000723 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000724 p.start()
725
726 t = threading.Thread(target=self.f,
727 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000728 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000729 t.start()
730
731 # wait for them all to sleep
732 for i in xrange(6):
733 sleeping.acquire()
734
735 # check they have all timed out
736 for i in xrange(6):
737 woken.acquire()
738 self.assertReturnsIfImplemented(0, get_value, woken)
739
740 # check state is not mucked up
741 self.check_invariant(cond)
742
743 # start some more threads/processes
744 for i in range(3):
745 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000746 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000747 p.start()
748
749 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000750 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000751 t.start()
752
753 # wait for them to all sleep
754 for i in xrange(6):
755 sleeping.acquire()
756
757 # check no process/thread has woken up
758 time.sleep(DELTA)
759 self.assertReturnsIfImplemented(0, get_value, woken)
760
761 # wake them all up
762 cond.acquire()
763 cond.notify_all()
764 cond.release()
765
766 # check they have all woken
767 time.sleep(DELTA)
768 self.assertReturnsIfImplemented(6, get_value, woken)
769
770 # check state is not mucked up
771 self.check_invariant(cond)
772
773 def test_timeout(self):
774 cond = self.Condition()
775 wait = TimingWrapper(cond.wait)
776 cond.acquire()
777 res = wait(TIMEOUT1)
778 cond.release()
779 self.assertEqual(res, None)
780 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
781
782
783class _TestEvent(BaseTestCase):
784
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000785 @classmethod
786 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000787 time.sleep(TIMEOUT2)
788 event.set()
789
790 def test_event(self):
791 event = self.Event()
792 wait = TimingWrapper(event.wait)
793
Ezio Melottic2077b02011-03-16 12:34:31 +0200794 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000795 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000796 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000797
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000798 # Removed, threading.Event.wait() will return the value of the __flag
799 # instead of None. API Shear with the semaphore backed mp.Event
800 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000802 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000803 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
804
805 event.set()
806
807 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000808 self.assertEqual(event.is_set(), True)
809 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000810 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000811 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000812 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
813 # self.assertEqual(event.is_set(), True)
814
815 event.clear()
816
817 #self.assertEqual(event.is_set(), False)
818
819 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000820 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000821
822#
823#
824#
825
826class _TestValue(BaseTestCase):
827
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000828 ALLOWED_TYPES = ('processes',)
829
Benjamin Petersondfd79492008-06-13 19:13:39 +0000830 codes_values = [
831 ('i', 4343, 24234),
832 ('d', 3.625, -4.25),
833 ('h', -232, 234),
834 ('c', latin('x'), latin('y'))
835 ]
836
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000837 def setUp(self):
838 if not HAS_SHAREDCTYPES:
839 self.skipTest("requires multiprocessing.sharedctypes")
840
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000841 @classmethod
842 def _test(cls, values):
843 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000844 sv.value = cv[2]
845
846
847 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000848 if raw:
849 values = [self.RawValue(code, value)
850 for code, value, _ in self.codes_values]
851 else:
852 values = [self.Value(code, value)
853 for code, value, _ in self.codes_values]
854
855 for sv, cv in zip(values, self.codes_values):
856 self.assertEqual(sv.value, cv[1])
857
858 proc = self.Process(target=self._test, args=(values,))
859 proc.start()
860 proc.join()
861
862 for sv, cv in zip(values, self.codes_values):
863 self.assertEqual(sv.value, cv[2])
864
865 def test_rawvalue(self):
866 self.test_value(raw=True)
867
868 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000869 val1 = self.Value('i', 5)
870 lock1 = val1.get_lock()
871 obj1 = val1.get_obj()
872
873 val2 = self.Value('i', 5, lock=None)
874 lock2 = val2.get_lock()
875 obj2 = val2.get_obj()
876
877 lock = self.Lock()
878 val3 = self.Value('i', 5, lock=lock)
879 lock3 = val3.get_lock()
880 obj3 = val3.get_obj()
881 self.assertEqual(lock, lock3)
882
Jesse Noller6ab22152009-01-18 02:45:38 +0000883 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000884 self.assertFalse(hasattr(arr4, 'get_lock'))
885 self.assertFalse(hasattr(arr4, 'get_obj'))
886
Jesse Noller6ab22152009-01-18 02:45:38 +0000887 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
888
889 arr5 = self.RawValue('i', 5)
890 self.assertFalse(hasattr(arr5, 'get_lock'))
891 self.assertFalse(hasattr(arr5, 'get_obj'))
892
Benjamin Petersondfd79492008-06-13 19:13:39 +0000893
894class _TestArray(BaseTestCase):
895
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000896 ALLOWED_TYPES = ('processes',)
897
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000898 @classmethod
899 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000900 for i in range(1, len(seq)):
901 seq[i] += seq[i-1]
902
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000903 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000904 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000905 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
906 if raw:
907 arr = self.RawArray('i', seq)
908 else:
909 arr = self.Array('i', seq)
910
911 self.assertEqual(len(arr), len(seq))
912 self.assertEqual(arr[3], seq[3])
913 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
914
915 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
916
917 self.assertEqual(list(arr[:]), seq)
918
919 self.f(seq)
920
921 p = self.Process(target=self.f, args=(arr,))
922 p.start()
923 p.join()
924
925 self.assertEqual(list(arr[:]), seq)
926
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000927 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000928 def test_array_from_size(self):
929 size = 10
930 # Test for zeroing (see issue #11675).
931 # The repetition below strengthens the test by increasing the chances
932 # of previously allocated non-zero memory being used for the new array
933 # on the 2nd and 3rd loops.
934 for _ in range(3):
935 arr = self.Array('i', size)
936 self.assertEqual(len(arr), size)
937 self.assertEqual(list(arr), [0] * size)
938 arr[:] = range(10)
939 self.assertEqual(list(arr), range(10))
940 del arr
941
942 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000943 def test_rawarray(self):
944 self.test_array(raw=True)
945
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000946 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000947 def test_array_accepts_long(self):
948 arr = self.Array('i', 10L)
949 self.assertEqual(len(arr), 10)
950 raw_arr = self.RawArray('i', 10L)
951 self.assertEqual(len(raw_arr), 10)
952
953 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000954 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000955 arr1 = self.Array('i', range(10))
956 lock1 = arr1.get_lock()
957 obj1 = arr1.get_obj()
958
959 arr2 = self.Array('i', range(10), lock=None)
960 lock2 = arr2.get_lock()
961 obj2 = arr2.get_obj()
962
963 lock = self.Lock()
964 arr3 = self.Array('i', range(10), lock=lock)
965 lock3 = arr3.get_lock()
966 obj3 = arr3.get_obj()
967 self.assertEqual(lock, lock3)
968
Jesse Noller6ab22152009-01-18 02:45:38 +0000969 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000970 self.assertFalse(hasattr(arr4, 'get_lock'))
971 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000972 self.assertRaises(AttributeError,
973 self.Array, 'i', range(10), lock='notalock')
974
975 arr5 = self.RawArray('i', range(10))
976 self.assertFalse(hasattr(arr5, 'get_lock'))
977 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000978
979#
980#
981#
982
983class _TestContainers(BaseTestCase):
984
985 ALLOWED_TYPES = ('manager',)
986
987 def test_list(self):
988 a = self.list(range(10))
989 self.assertEqual(a[:], range(10))
990
991 b = self.list()
992 self.assertEqual(b[:], [])
993
994 b.extend(range(5))
995 self.assertEqual(b[:], range(5))
996
997 self.assertEqual(b[2], 2)
998 self.assertEqual(b[2:10], [2,3,4])
999
1000 b *= 2
1001 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1002
1003 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1004
1005 self.assertEqual(a[:], range(10))
1006
1007 d = [a, b]
1008 e = self.list(d)
1009 self.assertEqual(
1010 e[:],
1011 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1012 )
1013
1014 f = self.list([a])
1015 a.append('hello')
1016 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1017
1018 def test_dict(self):
1019 d = self.dict()
1020 indices = range(65, 70)
1021 for i in indices:
1022 d[i] = chr(i)
1023 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1024 self.assertEqual(sorted(d.keys()), indices)
1025 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1026 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1027
1028 def test_namespace(self):
1029 n = self.Namespace()
1030 n.name = 'Bob'
1031 n.job = 'Builder'
1032 n._hidden = 'hidden'
1033 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1034 del n.job
1035 self.assertEqual(str(n), "Namespace(name='Bob')")
1036 self.assertTrue(hasattr(n, 'name'))
1037 self.assertTrue(not hasattr(n, 'job'))
1038
1039#
1040#
1041#
1042
1043def sqr(x, wait=0.0):
1044 time.sleep(wait)
1045 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001046class _TestPool(BaseTestCase):
1047
1048 def test_apply(self):
1049 papply = self.pool.apply
1050 self.assertEqual(papply(sqr, (5,)), sqr(5))
1051 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1052
1053 def test_map(self):
1054 pmap = self.pool.map
1055 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1056 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1057 map(sqr, range(100)))
1058
Jesse Noller7530e472009-07-16 14:23:04 +00001059 def test_map_chunksize(self):
1060 try:
1061 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1062 except multiprocessing.TimeoutError:
1063 self.fail("pool.map_async with chunksize stalled on null list")
1064
Benjamin Petersondfd79492008-06-13 19:13:39 +00001065 def test_async(self):
1066 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1067 get = TimingWrapper(res.get)
1068 self.assertEqual(get(), 49)
1069 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1070
1071 def test_async_timeout(self):
1072 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1073 get = TimingWrapper(res.get)
1074 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1075 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1076
1077 def test_imap(self):
1078 it = self.pool.imap(sqr, range(10))
1079 self.assertEqual(list(it), map(sqr, range(10)))
1080
1081 it = self.pool.imap(sqr, range(10))
1082 for i in range(10):
1083 self.assertEqual(it.next(), i*i)
1084 self.assertRaises(StopIteration, it.next)
1085
1086 it = self.pool.imap(sqr, range(1000), chunksize=100)
1087 for i in range(1000):
1088 self.assertEqual(it.next(), i*i)
1089 self.assertRaises(StopIteration, it.next)
1090
1091 def test_imap_unordered(self):
1092 it = self.pool.imap_unordered(sqr, range(1000))
1093 self.assertEqual(sorted(it), map(sqr, range(1000)))
1094
1095 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1096 self.assertEqual(sorted(it), map(sqr, range(1000)))
1097
1098 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001099 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1100 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1101
Benjamin Petersondfd79492008-06-13 19:13:39 +00001102 p = multiprocessing.Pool(3)
1103 self.assertEqual(3, len(p._pool))
1104 p.close()
1105 p.join()
1106
1107 def test_terminate(self):
1108 if self.TYPE == 'manager':
1109 # On Unix a forked process increfs each shared object to
1110 # which its parent process held a reference. If the
1111 # forked process gets terminated then there is likely to
1112 # be a reference leak. So to prevent
1113 # _TestZZZNumberOfObjects from failing we skip this test
1114 # when using a manager.
1115 return
1116
1117 result = self.pool.map_async(
1118 time.sleep, [0.1 for i in range(10000)], chunksize=1
1119 )
1120 self.pool.terminate()
1121 join = TimingWrapper(self.pool.join)
1122 join()
1123 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001124
1125class _TestPoolWorkerLifetime(BaseTestCase):
1126
1127 ALLOWED_TYPES = ('processes', )
1128 def test_pool_worker_lifetime(self):
1129 p = multiprocessing.Pool(3, maxtasksperchild=10)
1130 self.assertEqual(3, len(p._pool))
1131 origworkerpids = [w.pid for w in p._pool]
1132 # Run many tasks so each worker gets replaced (hopefully)
1133 results = []
1134 for i in range(100):
1135 results.append(p.apply_async(sqr, (i, )))
1136 # Fetch the results and verify we got the right answers,
1137 # also ensuring all the tasks have completed.
1138 for (j, res) in enumerate(results):
1139 self.assertEqual(res.get(), sqr(j))
1140 # Refill the pool
1141 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001142 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001143 # (countdown * DELTA = 5 seconds max startup process time)
1144 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001145 while countdown and not all(w.is_alive() for w in p._pool):
1146 countdown -= 1
1147 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001148 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001149 # All pids should be assigned. See issue #7805.
1150 self.assertNotIn(None, origworkerpids)
1151 self.assertNotIn(None, finalworkerpids)
1152 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001153 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1154 p.close()
1155 p.join()
1156
Benjamin Petersondfd79492008-06-13 19:13:39 +00001157#
1158# Test that manager has expected number of shared objects left
1159#
1160
1161class _TestZZZNumberOfObjects(BaseTestCase):
1162 # Because test cases are sorted alphabetically, this one will get
1163 # run after all the other tests for the manager. It tests that
1164 # there have been no "reference leaks" for the manager's shared
1165 # objects. Note the comment in _TestPool.test_terminate().
1166 ALLOWED_TYPES = ('manager',)
1167
1168 def test_number_of_objects(self):
1169 EXPECTED_NUMBER = 1 # the pool object is still alive
1170 multiprocessing.active_children() # discard dead process objs
1171 gc.collect() # do garbage collection
1172 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001173 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001174 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001175 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001176 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001177
1178 self.assertEqual(refs, EXPECTED_NUMBER)
1179
1180#
1181# Test of creating a customized manager class
1182#
1183
1184from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1185
1186class FooBar(object):
1187 def f(self):
1188 return 'f()'
1189 def g(self):
1190 raise ValueError
1191 def _h(self):
1192 return '_h()'
1193
1194def baz():
1195 for i in xrange(10):
1196 yield i*i
1197
1198class IteratorProxy(BaseProxy):
1199 _exposed_ = ('next', '__next__')
1200 def __iter__(self):
1201 return self
1202 def next(self):
1203 return self._callmethod('next')
1204 def __next__(self):
1205 return self._callmethod('__next__')
1206
1207class MyManager(BaseManager):
1208 pass
1209
1210MyManager.register('Foo', callable=FooBar)
1211MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1212MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1213
1214
1215class _TestMyManager(BaseTestCase):
1216
1217 ALLOWED_TYPES = ('manager',)
1218
1219 def test_mymanager(self):
1220 manager = MyManager()
1221 manager.start()
1222
1223 foo = manager.Foo()
1224 bar = manager.Bar()
1225 baz = manager.baz()
1226
1227 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1228 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1229
1230 self.assertEqual(foo_methods, ['f', 'g'])
1231 self.assertEqual(bar_methods, ['f', '_h'])
1232
1233 self.assertEqual(foo.f(), 'f()')
1234 self.assertRaises(ValueError, foo.g)
1235 self.assertEqual(foo._callmethod('f'), 'f()')
1236 self.assertRaises(RemoteError, foo._callmethod, '_h')
1237
1238 self.assertEqual(bar.f(), 'f()')
1239 self.assertEqual(bar._h(), '_h()')
1240 self.assertEqual(bar._callmethod('f'), 'f()')
1241 self.assertEqual(bar._callmethod('_h'), '_h()')
1242
1243 self.assertEqual(list(baz), [i*i for i in range(10)])
1244
1245 manager.shutdown()
1246
1247#
1248# Test of connecting to a remote server and using xmlrpclib for serialization
1249#
1250
1251_queue = Queue.Queue()
1252def get_queue():
1253 return _queue
1254
1255class QueueManager(BaseManager):
1256 '''manager class used by server process'''
1257QueueManager.register('get_queue', callable=get_queue)
1258
1259class QueueManager2(BaseManager):
1260 '''manager class which specifies the same interface as QueueManager'''
1261QueueManager2.register('get_queue')
1262
1263
1264SERIALIZER = 'xmlrpclib'
1265
1266class _TestRemoteManager(BaseTestCase):
1267
1268 ALLOWED_TYPES = ('manager',)
1269
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001270 @classmethod
1271 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001272 manager = QueueManager2(
1273 address=address, authkey=authkey, serializer=SERIALIZER
1274 )
1275 manager.connect()
1276 queue = manager.get_queue()
1277 queue.put(('hello world', None, True, 2.25))
1278
1279 def test_remote(self):
1280 authkey = os.urandom(32)
1281
1282 manager = QueueManager(
1283 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1284 )
1285 manager.start()
1286
1287 p = self.Process(target=self._putter, args=(manager.address, authkey))
1288 p.start()
1289
1290 manager2 = QueueManager2(
1291 address=manager.address, authkey=authkey, serializer=SERIALIZER
1292 )
1293 manager2.connect()
1294 queue = manager2.get_queue()
1295
1296 # Note that xmlrpclib will deserialize object as a list not a tuple
1297 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1298
1299 # Because we are using xmlrpclib for serialization instead of
1300 # pickle this will cause a serialization error.
1301 self.assertRaises(Exception, queue.put, time.sleep)
1302
1303 # Make queue finalizer run before the server is stopped
1304 del queue
1305 manager.shutdown()
1306
Jesse Noller459a6482009-03-30 15:50:42 +00001307class _TestManagerRestart(BaseTestCase):
1308
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001309 @classmethod
1310 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001311 manager = QueueManager(
1312 address=address, authkey=authkey, serializer=SERIALIZER)
1313 manager.connect()
1314 queue = manager.get_queue()
1315 queue.put('hello world')
1316
1317 def test_rapid_restart(self):
1318 authkey = os.urandom(32)
1319 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001320 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001321 srvr = manager.get_server()
1322 addr = srvr.address
1323 # Close the connection.Listener socket which gets opened as a part
1324 # of manager.get_server(). It's not needed for the test.
1325 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001326 manager.start()
1327
1328 p = self.Process(target=self._putter, args=(manager.address, authkey))
1329 p.start()
1330 queue = manager.get_queue()
1331 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001332 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001333 manager.shutdown()
1334 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001335 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001336 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001337 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001338
Benjamin Petersondfd79492008-06-13 19:13:39 +00001339#
1340#
1341#
1342
1343SENTINEL = latin('')
1344
1345class _TestConnection(BaseTestCase):
1346
1347 ALLOWED_TYPES = ('processes', 'threads')
1348
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001349 @classmethod
1350 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001351 for msg in iter(conn.recv_bytes, SENTINEL):
1352 conn.send_bytes(msg)
1353 conn.close()
1354
1355 def test_connection(self):
1356 conn, child_conn = self.Pipe()
1357
1358 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001359 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001360 p.start()
1361
1362 seq = [1, 2.25, None]
1363 msg = latin('hello world')
1364 longmsg = msg * 10
1365 arr = array.array('i', range(4))
1366
1367 if self.TYPE == 'processes':
1368 self.assertEqual(type(conn.fileno()), int)
1369
1370 self.assertEqual(conn.send(seq), None)
1371 self.assertEqual(conn.recv(), seq)
1372
1373 self.assertEqual(conn.send_bytes(msg), None)
1374 self.assertEqual(conn.recv_bytes(), msg)
1375
1376 if self.TYPE == 'processes':
1377 buffer = array.array('i', [0]*10)
1378 expected = list(arr) + [0] * (10 - len(arr))
1379 self.assertEqual(conn.send_bytes(arr), None)
1380 self.assertEqual(conn.recv_bytes_into(buffer),
1381 len(arr) * buffer.itemsize)
1382 self.assertEqual(list(buffer), expected)
1383
1384 buffer = array.array('i', [0]*10)
1385 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1386 self.assertEqual(conn.send_bytes(arr), None)
1387 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1388 len(arr) * buffer.itemsize)
1389 self.assertEqual(list(buffer), expected)
1390
1391 buffer = bytearray(latin(' ' * 40))
1392 self.assertEqual(conn.send_bytes(longmsg), None)
1393 try:
1394 res = conn.recv_bytes_into(buffer)
1395 except multiprocessing.BufferTooShort, e:
1396 self.assertEqual(e.args, (longmsg,))
1397 else:
1398 self.fail('expected BufferTooShort, got %s' % res)
1399
1400 poll = TimingWrapper(conn.poll)
1401
1402 self.assertEqual(poll(), False)
1403 self.assertTimingAlmostEqual(poll.elapsed, 0)
1404
1405 self.assertEqual(poll(TIMEOUT1), False)
1406 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1407
1408 conn.send(None)
1409
1410 self.assertEqual(poll(TIMEOUT1), True)
1411 self.assertTimingAlmostEqual(poll.elapsed, 0)
1412
1413 self.assertEqual(conn.recv(), None)
1414
1415 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1416 conn.send_bytes(really_big_msg)
1417 self.assertEqual(conn.recv_bytes(), really_big_msg)
1418
1419 conn.send_bytes(SENTINEL) # tell child to quit
1420 child_conn.close()
1421
1422 if self.TYPE == 'processes':
1423 self.assertEqual(conn.readable, True)
1424 self.assertEqual(conn.writable, True)
1425 self.assertRaises(EOFError, conn.recv)
1426 self.assertRaises(EOFError, conn.recv_bytes)
1427
1428 p.join()
1429
1430 def test_duplex_false(self):
1431 reader, writer = self.Pipe(duplex=False)
1432 self.assertEqual(writer.send(1), None)
1433 self.assertEqual(reader.recv(), 1)
1434 if self.TYPE == 'processes':
1435 self.assertEqual(reader.readable, True)
1436 self.assertEqual(reader.writable, False)
1437 self.assertEqual(writer.readable, False)
1438 self.assertEqual(writer.writable, True)
1439 self.assertRaises(IOError, reader.send, 2)
1440 self.assertRaises(IOError, writer.recv)
1441 self.assertRaises(IOError, writer.poll)
1442
1443 def test_spawn_close(self):
1444 # We test that a pipe connection can be closed by parent
1445 # process immediately after child is spawned. On Windows this
1446 # would have sometimes failed on old versions because
1447 # child_conn would be closed before the child got a chance to
1448 # duplicate it.
1449 conn, child_conn = self.Pipe()
1450
1451 p = self.Process(target=self._echo, args=(child_conn,))
1452 p.start()
1453 child_conn.close() # this might complete before child initializes
1454
1455 msg = latin('hello')
1456 conn.send_bytes(msg)
1457 self.assertEqual(conn.recv_bytes(), msg)
1458
1459 conn.send_bytes(SENTINEL)
1460 conn.close()
1461 p.join()
1462
1463 def test_sendbytes(self):
1464 if self.TYPE != 'processes':
1465 return
1466
1467 msg = latin('abcdefghijklmnopqrstuvwxyz')
1468 a, b = self.Pipe()
1469
1470 a.send_bytes(msg)
1471 self.assertEqual(b.recv_bytes(), msg)
1472
1473 a.send_bytes(msg, 5)
1474 self.assertEqual(b.recv_bytes(), msg[5:])
1475
1476 a.send_bytes(msg, 7, 8)
1477 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1478
1479 a.send_bytes(msg, 26)
1480 self.assertEqual(b.recv_bytes(), latin(''))
1481
1482 a.send_bytes(msg, 26, 0)
1483 self.assertEqual(b.recv_bytes(), latin(''))
1484
1485 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1486
1487 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1488
1489 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1490
1491 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1492
1493 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1494
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001495 @classmethod
1496 def _is_fd_assigned(cls, fd):
1497 try:
1498 os.fstat(fd)
1499 except OSError as e:
1500 if e.errno == errno.EBADF:
1501 return False
1502 raise
1503 else:
1504 return True
1505
1506 @classmethod
1507 def _writefd(cls, conn, data, create_dummy_fds=False):
1508 if create_dummy_fds:
1509 for i in range(0, 256):
1510 if not cls._is_fd_assigned(i):
1511 os.dup2(conn.fileno(), i)
1512 fd = reduction.recv_handle(conn)
1513 if msvcrt:
1514 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1515 os.write(fd, data)
1516 os.close(fd)
1517
1518 def test_fd_transfer(self):
1519 if self.TYPE != 'processes':
1520 self.skipTest("only makes sense with processes")
1521 conn, child_conn = self.Pipe(duplex=True)
1522
1523 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
1524 p.start()
1525 with open(test_support.TESTFN, "wb") as f:
1526 fd = f.fileno()
1527 if msvcrt:
1528 fd = msvcrt.get_osfhandle(fd)
1529 reduction.send_handle(conn, fd, p.pid)
1530 p.join()
1531 with open(test_support.TESTFN, "rb") as f:
1532 self.assertEqual(f.read(), b"foo")
1533
1534 @unittest.skipIf(sys.platform == "win32",
1535 "test semantics don't make sense on Windows")
1536 @unittest.skipIf(MAXFD <= 256,
1537 "largest assignable fd number is too small")
1538 @unittest.skipUnless(hasattr(os, "dup2"),
1539 "test needs os.dup2()")
1540 def test_large_fd_transfer(self):
1541 # With fd > 256 (issue #11657)
1542 if self.TYPE != 'processes':
1543 self.skipTest("only makes sense with processes")
1544 conn, child_conn = self.Pipe(duplex=True)
1545
1546 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
1547 p.start()
1548 with open(test_support.TESTFN, "wb") as f:
1549 fd = f.fileno()
1550 for newfd in range(256, MAXFD):
1551 if not self._is_fd_assigned(newfd):
1552 break
1553 else:
1554 self.fail("could not find an unassigned large file descriptor")
1555 os.dup2(fd, newfd)
1556 try:
1557 reduction.send_handle(conn, newfd, p.pid)
1558 finally:
1559 os.close(newfd)
1560 p.join()
1561 with open(test_support.TESTFN, "rb") as f:
1562 self.assertEqual(f.read(), b"bar")
1563
1564
Benjamin Petersondfd79492008-06-13 19:13:39 +00001565class _TestListenerClient(BaseTestCase):
1566
1567 ALLOWED_TYPES = ('processes', 'threads')
1568
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001569 @classmethod
1570 def _test(cls, address):
1571 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001572 conn.send('hello')
1573 conn.close()
1574
1575 def test_listener_client(self):
1576 for family in self.connection.families:
1577 l = self.connection.Listener(family=family)
1578 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001579 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001580 p.start()
1581 conn = l.accept()
1582 self.assertEqual(conn.recv(), 'hello')
1583 p.join()
1584 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001585#
1586# Test of sending connection and socket objects between processes
1587#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001588"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001589class _TestPicklingConnections(BaseTestCase):
1590
1591 ALLOWED_TYPES = ('processes',)
1592
1593 def _listener(self, conn, families):
1594 for fam in families:
1595 l = self.connection.Listener(family=fam)
1596 conn.send(l.address)
1597 new_conn = l.accept()
1598 conn.send(new_conn)
1599
1600 if self.TYPE == 'processes':
1601 l = socket.socket()
1602 l.bind(('localhost', 0))
1603 conn.send(l.getsockname())
1604 l.listen(1)
1605 new_conn, addr = l.accept()
1606 conn.send(new_conn)
1607
1608 conn.recv()
1609
1610 def _remote(self, conn):
1611 for (address, msg) in iter(conn.recv, None):
1612 client = self.connection.Client(address)
1613 client.send(msg.upper())
1614 client.close()
1615
1616 if self.TYPE == 'processes':
1617 address, msg = conn.recv()
1618 client = socket.socket()
1619 client.connect(address)
1620 client.sendall(msg.upper())
1621 client.close()
1622
1623 conn.close()
1624
1625 def test_pickling(self):
1626 try:
1627 multiprocessing.allow_connection_pickling()
1628 except ImportError:
1629 return
1630
1631 families = self.connection.families
1632
1633 lconn, lconn0 = self.Pipe()
1634 lp = self.Process(target=self._listener, args=(lconn0, families))
1635 lp.start()
1636 lconn0.close()
1637
1638 rconn, rconn0 = self.Pipe()
1639 rp = self.Process(target=self._remote, args=(rconn0,))
1640 rp.start()
1641 rconn0.close()
1642
1643 for fam in families:
1644 msg = ('This connection uses family %s' % fam).encode('ascii')
1645 address = lconn.recv()
1646 rconn.send((address, msg))
1647 new_conn = lconn.recv()
1648 self.assertEqual(new_conn.recv(), msg.upper())
1649
1650 rconn.send(None)
1651
1652 if self.TYPE == 'processes':
1653 msg = latin('This connection uses a normal socket')
1654 address = lconn.recv()
1655 rconn.send((address, msg))
1656 if hasattr(socket, 'fromfd'):
1657 new_conn = lconn.recv()
1658 self.assertEqual(new_conn.recv(100), msg.upper())
1659 else:
1660 # XXX On Windows with Py2.6 need to backport fromfd()
1661 discard = lconn.recv_bytes()
1662
1663 lconn.send(None)
1664
1665 rconn.close()
1666 lconn.close()
1667
1668 lp.join()
1669 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001670"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001671#
1672#
1673#
1674
1675class _TestHeap(BaseTestCase):
1676
1677 ALLOWED_TYPES = ('processes',)
1678
1679 def test_heap(self):
1680 iterations = 5000
1681 maxblocks = 50
1682 blocks = []
1683
1684 # create and destroy lots of blocks of different sizes
1685 for i in xrange(iterations):
1686 size = int(random.lognormvariate(0, 1) * 1000)
1687 b = multiprocessing.heap.BufferWrapper(size)
1688 blocks.append(b)
1689 if len(blocks) > maxblocks:
1690 i = random.randrange(maxblocks)
1691 del blocks[i]
1692
1693 # get the heap object
1694 heap = multiprocessing.heap.BufferWrapper._heap
1695
1696 # verify the state of the heap
1697 all = []
1698 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001699 heap._lock.acquire()
1700 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001701 for L in heap._len_to_seq.values():
1702 for arena, start, stop in L:
1703 all.append((heap._arenas.index(arena), start, stop,
1704 stop-start, 'free'))
1705 for arena, start, stop in heap._allocated_blocks:
1706 all.append((heap._arenas.index(arena), start, stop,
1707 stop-start, 'occupied'))
1708 occupied += (stop-start)
1709
1710 all.sort()
1711
1712 for i in range(len(all)-1):
1713 (arena, start, stop) = all[i][:3]
1714 (narena, nstart, nstop) = all[i+1][:3]
1715 self.assertTrue((arena != narena and nstart == 0) or
1716 (stop == nstart))
1717
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001718 def test_free_from_gc(self):
1719 # Check that freeing of blocks by the garbage collector doesn't deadlock
1720 # (issue #12352).
1721 # Make sure the GC is enabled, and set lower collection thresholds to
1722 # make collections more frequent (and increase the probability of
1723 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001724 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001725 gc.enable()
1726 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001727 thresholds = gc.get_threshold()
1728 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001729 gc.set_threshold(10)
1730
1731 # perform numerous block allocations, with cyclic references to make
1732 # sure objects are collected asynchronously by the gc
1733 for i in range(5000):
1734 a = multiprocessing.heap.BufferWrapper(1)
1735 b = multiprocessing.heap.BufferWrapper(1)
1736 # circular references
1737 a.buddy = b
1738 b.buddy = a
1739
Benjamin Petersondfd79492008-06-13 19:13:39 +00001740#
1741#
1742#
1743
Benjamin Petersondfd79492008-06-13 19:13:39 +00001744class _Foo(Structure):
1745 _fields_ = [
1746 ('x', c_int),
1747 ('y', c_double)
1748 ]
1749
1750class _TestSharedCTypes(BaseTestCase):
1751
1752 ALLOWED_TYPES = ('processes',)
1753
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001754 def setUp(self):
1755 if not HAS_SHAREDCTYPES:
1756 self.skipTest("requires multiprocessing.sharedctypes")
1757
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001758 @classmethod
1759 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001760 x.value *= 2
1761 y.value *= 2
1762 foo.x *= 2
1763 foo.y *= 2
1764 string.value *= 2
1765 for i in range(len(arr)):
1766 arr[i] *= 2
1767
1768 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001769 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001770 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001771 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001772 arr = self.Array('d', range(10), lock=lock)
1773 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001774 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001775
1776 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1777 p.start()
1778 p.join()
1779
1780 self.assertEqual(x.value, 14)
1781 self.assertAlmostEqual(y.value, 2.0/3.0)
1782 self.assertEqual(foo.x, 6)
1783 self.assertAlmostEqual(foo.y, 4.0)
1784 for i in range(10):
1785 self.assertAlmostEqual(arr[i], i*2)
1786 self.assertEqual(string.value, latin('hellohello'))
1787
1788 def test_synchronize(self):
1789 self.test_sharedctypes(lock=True)
1790
1791 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001792 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001793 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001794 foo.x = 0
1795 foo.y = 0
1796 self.assertEqual(bar.x, 2)
1797 self.assertAlmostEqual(bar.y, 5.0)
1798
1799#
1800#
1801#
1802
1803class _TestFinalize(BaseTestCase):
1804
1805 ALLOWED_TYPES = ('processes',)
1806
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001807 @classmethod
1808 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001809 class Foo(object):
1810 pass
1811
1812 a = Foo()
1813 util.Finalize(a, conn.send, args=('a',))
1814 del a # triggers callback for a
1815
1816 b = Foo()
1817 close_b = util.Finalize(b, conn.send, args=('b',))
1818 close_b() # triggers callback for b
1819 close_b() # does nothing because callback has already been called
1820 del b # does nothing because callback has already been called
1821
1822 c = Foo()
1823 util.Finalize(c, conn.send, args=('c',))
1824
1825 d10 = Foo()
1826 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1827
1828 d01 = Foo()
1829 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1830 d02 = Foo()
1831 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1832 d03 = Foo()
1833 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1834
1835 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1836
1837 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1838
Ezio Melottic2077b02011-03-16 12:34:31 +02001839 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001840 # garbage collecting locals
1841 util._exit_function()
1842 conn.close()
1843 os._exit(0)
1844
1845 def test_finalize(self):
1846 conn, child_conn = self.Pipe()
1847
1848 p = self.Process(target=self._test_finalize, args=(child_conn,))
1849 p.start()
1850 p.join()
1851
1852 result = [obj for obj in iter(conn.recv, 'STOP')]
1853 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1854
1855#
1856# Test that from ... import * works for each module
1857#
1858
1859class _TestImportStar(BaseTestCase):
1860
1861 ALLOWED_TYPES = ('processes',)
1862
1863 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001864 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001865 'multiprocessing', 'multiprocessing.connection',
1866 'multiprocessing.heap', 'multiprocessing.managers',
1867 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001868 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001869 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001870 ]
1871
1872 if c_int is not None:
1873 # This module requires _ctypes
1874 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001875
1876 for name in modules:
1877 __import__(name)
1878 mod = sys.modules[name]
1879
1880 for attr in getattr(mod, '__all__', ()):
1881 self.assertTrue(
1882 hasattr(mod, attr),
1883 '%r does not have attribute %r' % (mod, attr)
1884 )
1885
1886#
1887# Quick test that logging works -- does not test logging output
1888#
1889
1890class _TestLogging(BaseTestCase):
1891
1892 ALLOWED_TYPES = ('processes',)
1893
1894 def test_enable_logging(self):
1895 logger = multiprocessing.get_logger()
1896 logger.setLevel(util.SUBWARNING)
1897 self.assertTrue(logger is not None)
1898 logger.debug('this will not be printed')
1899 logger.info('nor will this')
1900 logger.setLevel(LOG_LEVEL)
1901
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001902 @classmethod
1903 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001904 logger = multiprocessing.get_logger()
1905 conn.send(logger.getEffectiveLevel())
1906
1907 def test_level(self):
1908 LEVEL1 = 32
1909 LEVEL2 = 37
1910
1911 logger = multiprocessing.get_logger()
1912 root_logger = logging.getLogger()
1913 root_level = root_logger.level
1914
1915 reader, writer = multiprocessing.Pipe(duplex=False)
1916
1917 logger.setLevel(LEVEL1)
1918 self.Process(target=self._test_level, args=(writer,)).start()
1919 self.assertEqual(LEVEL1, reader.recv())
1920
1921 logger.setLevel(logging.NOTSET)
1922 root_logger.setLevel(LEVEL2)
1923 self.Process(target=self._test_level, args=(writer,)).start()
1924 self.assertEqual(LEVEL2, reader.recv())
1925
1926 root_logger.setLevel(root_level)
1927 logger.setLevel(level=LOG_LEVEL)
1928
Jesse Noller814d02d2009-11-21 14:38:23 +00001929
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001930# class _TestLoggingProcessName(BaseTestCase):
1931#
1932# def handle(self, record):
1933# assert record.processName == multiprocessing.current_process().name
1934# self.__handled = True
1935#
1936# def test_logging(self):
1937# handler = logging.Handler()
1938# handler.handle = self.handle
1939# self.__handled = False
1940# # Bypass getLogger() and side-effects
1941# logger = logging.getLoggerClass()(
1942# 'multiprocessing.test.TestLoggingProcessName')
1943# logger.addHandler(handler)
1944# logger.propagate = False
1945#
1946# logger.warn('foo')
1947# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001948
Benjamin Petersondfd79492008-06-13 19:13:39 +00001949#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001950# Test to verify handle verification, see issue 3321
1951#
1952
1953class TestInvalidHandle(unittest.TestCase):
1954
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001955 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001956 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001957 conn = _multiprocessing.Connection(44977608)
1958 self.assertRaises(IOError, conn.poll)
1959 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001960
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001961#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001962# Functions used to create test cases from the base ones in this module
1963#
1964
1965def get_attributes(Source, names):
1966 d = {}
1967 for name in names:
1968 obj = getattr(Source, name)
1969 if type(obj) == type(get_attributes):
1970 obj = staticmethod(obj)
1971 d[name] = obj
1972 return d
1973
1974def create_test_cases(Mixin, type):
1975 result = {}
1976 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001977 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001978
1979 for name in glob.keys():
1980 if name.startswith('_Test'):
1981 base = glob[name]
1982 if type in base.ALLOWED_TYPES:
1983 newname = 'With' + Type + name[1:]
1984 class Temp(base, unittest.TestCase, Mixin):
1985 pass
1986 result[newname] = Temp
1987 Temp.__name__ = newname
1988 Temp.__module__ = Mixin.__module__
1989 return result
1990
1991#
1992# Create test cases
1993#
1994
1995class ProcessesMixin(object):
1996 TYPE = 'processes'
1997 Process = multiprocessing.Process
1998 locals().update(get_attributes(multiprocessing, (
1999 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2000 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2001 'RawArray', 'current_process', 'active_children', 'Pipe',
2002 'connection', 'JoinableQueue'
2003 )))
2004
2005testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2006globals().update(testcases_processes)
2007
2008
2009class ManagerMixin(object):
2010 TYPE = 'manager'
2011 Process = multiprocessing.Process
2012 manager = object.__new__(multiprocessing.managers.SyncManager)
2013 locals().update(get_attributes(manager, (
2014 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2015 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2016 'Namespace', 'JoinableQueue'
2017 )))
2018
2019testcases_manager = create_test_cases(ManagerMixin, type='manager')
2020globals().update(testcases_manager)
2021
2022
2023class ThreadsMixin(object):
2024 TYPE = 'threads'
2025 Process = multiprocessing.dummy.Process
2026 locals().update(get_attributes(multiprocessing.dummy, (
2027 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2028 'Condition', 'Event', 'Value', 'Array', 'current_process',
2029 'active_children', 'Pipe', 'connection', 'dict', 'list',
2030 'Namespace', 'JoinableQueue'
2031 )))
2032
2033testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2034globals().update(testcases_threads)
2035
Neal Norwitz0c519b32008-08-25 01:50:24 +00002036class OtherTest(unittest.TestCase):
2037 # TODO: add more tests for deliver/answer challenge.
2038 def test_deliver_challenge_auth_failure(self):
2039 class _FakeConnection(object):
2040 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002041 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002042 def send_bytes(self, data):
2043 pass
2044 self.assertRaises(multiprocessing.AuthenticationError,
2045 multiprocessing.connection.deliver_challenge,
2046 _FakeConnection(), b'abc')
2047
2048 def test_answer_challenge_auth_failure(self):
2049 class _FakeConnection(object):
2050 def __init__(self):
2051 self.count = 0
2052 def recv_bytes(self, size):
2053 self.count += 1
2054 if self.count == 1:
2055 return multiprocessing.connection.CHALLENGE
2056 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002057 return b'something bogus'
2058 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002059 def send_bytes(self, data):
2060 pass
2061 self.assertRaises(multiprocessing.AuthenticationError,
2062 multiprocessing.connection.answer_challenge,
2063 _FakeConnection(), b'abc')
2064
Jesse Noller7152f6d2009-04-02 05:17:26 +00002065#
2066# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2067#
2068
2069def initializer(ns):
2070 ns.test += 1
2071
2072class TestInitializers(unittest.TestCase):
2073 def setUp(self):
2074 self.mgr = multiprocessing.Manager()
2075 self.ns = self.mgr.Namespace()
2076 self.ns.test = 0
2077
2078 def tearDown(self):
2079 self.mgr.shutdown()
2080
2081 def test_manager_initializer(self):
2082 m = multiprocessing.managers.SyncManager()
2083 self.assertRaises(TypeError, m.start, 1)
2084 m.start(initializer, (self.ns,))
2085 self.assertEqual(self.ns.test, 1)
2086 m.shutdown()
2087
2088 def test_pool_initializer(self):
2089 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2090 p = multiprocessing.Pool(1, initializer, (self.ns,))
2091 p.close()
2092 p.join()
2093 self.assertEqual(self.ns.test, 1)
2094
Jesse Noller1b90efb2009-06-30 17:11:52 +00002095#
2096# Issue 5155, 5313, 5331: Test process in processes
2097# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2098#
2099
2100def _ThisSubProcess(q):
2101 try:
2102 item = q.get(block=False)
2103 except Queue.Empty:
2104 pass
2105
2106def _TestProcess(q):
2107 queue = multiprocessing.Queue()
2108 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2109 subProc.start()
2110 subProc.join()
2111
2112def _afunc(x):
2113 return x*x
2114
2115def pool_in_process():
2116 pool = multiprocessing.Pool(processes=4)
2117 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2118
2119class _file_like(object):
2120 def __init__(self, delegate):
2121 self._delegate = delegate
2122 self._pid = None
2123
2124 @property
2125 def cache(self):
2126 pid = os.getpid()
2127 # There are no race conditions since fork keeps only the running thread
2128 if pid != self._pid:
2129 self._pid = pid
2130 self._cache = []
2131 return self._cache
2132
2133 def write(self, data):
2134 self.cache.append(data)
2135
2136 def flush(self):
2137 self._delegate.write(''.join(self.cache))
2138 self._cache = []
2139
2140class TestStdinBadfiledescriptor(unittest.TestCase):
2141
2142 def test_queue_in_process(self):
2143 queue = multiprocessing.Queue()
2144 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2145 proc.start()
2146 proc.join()
2147
2148 def test_pool_in_process(self):
2149 p = multiprocessing.Process(target=pool_in_process)
2150 p.start()
2151 p.join()
2152
2153 def test_flushing(self):
2154 sio = StringIO()
2155 flike = _file_like(sio)
2156 flike.write('foo')
2157 proc = multiprocessing.Process(target=lambda: flike.flush())
2158 flike.flush()
2159 assert sio.getvalue() == 'foo'
2160
2161testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2162 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002163
Benjamin Petersondfd79492008-06-13 19:13:39 +00002164#
2165#
2166#
2167
2168def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002169 if sys.platform.startswith("linux"):
2170 try:
2171 lock = multiprocessing.RLock()
2172 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002173 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002174
Benjamin Petersondfd79492008-06-13 19:13:39 +00002175 if run is None:
2176 from test.test_support import run_unittest as run
2177
2178 util.get_temp_dir() # creates temp directory for use by all processes
2179
2180 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2181
Jesse Noller146b7ab2008-07-02 16:44:09 +00002182 ProcessesMixin.pool = multiprocessing.Pool(4)
2183 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2184 ManagerMixin.manager.__init__()
2185 ManagerMixin.manager.start()
2186 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002187
2188 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002189 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2190 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002191 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2192 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002193 )
2194
2195 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2196 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002197 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2198 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002199 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002200 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002201 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002202 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2203 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2204 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002205 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002206
Jesse Noller146b7ab2008-07-02 16:44:09 +00002207 ThreadsMixin.pool.terminate()
2208 ProcessesMixin.pool.terminate()
2209 ManagerMixin.pool.terminate()
2210 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002211
Jesse Noller146b7ab2008-07-02 16:44:09 +00002212 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002213
2214def main():
2215 test_main(unittest.TextTestRunner(verbosity=2).run)
2216
2217if __name__ == '__main__':
2218 main()