blob: 8cc4f5423acd51a37a6b31e8c24178bad2374a29 [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Antoine Pitrou5084ff72017-03-24 16:03:46 +010017import weakref
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010018import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
178class _TestProcess(BaseTestCase):
179
180 ALLOWED_TYPES = ('processes', 'threads')
181
182 def test_current(self):
183 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600184 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000185
186 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000191 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEqual(current.ident, os.getpid())
194 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 q.put(args)
200 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000223 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225
226 p.start()
227
Ezio Melotti2623a372010-11-21 13:34:58 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
Ezio Melotti2623a372010-11-21 13:34:58 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.join()
240
Ezio Melotti2623a372010-11-21 13:34:58 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600251 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000254 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000258 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
271
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288
Jesus Cea6f6016b2011-09-09 20:26:57 +0200289 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000290 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000291 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000292
293 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000294 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000295
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000296 @classmethod
297 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298 from multiprocessing import forking
299 wconn.send(id)
300 if len(id) < 2:
301 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 p = cls.Process(
303 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 )
305 p.start()
306 p.join()
307
308 def test_recursion(self):
309 rconn, wconn = self.Pipe(duplex=False)
310 self._test_recursion(wconn, [])
311
312 time.sleep(DELTA)
313 result = []
314 while rconn.poll():
315 result.append(rconn.recv())
316
317 expected = [
318 [],
319 [0],
320 [0, 0],
321 [0, 1],
322 [1],
323 [1, 0],
324 [1, 1]
325 ]
326 self.assertEqual(result, expected)
327
Richard Oudkerk2182e052012-06-06 19:01:14 +0100328 @classmethod
329 def _test_sys_exit(cls, reason, testfn):
330 sys.stderr = open(testfn, 'w')
331 sys.exit(reason)
332
333 def test_sys_exit(self):
334 # See Issue 13854
335 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600336 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100337
338 testfn = test_support.TESTFN
339 self.addCleanup(test_support.unlink, testfn)
340
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000341 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100342 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
343 p.daemon = True
344 p.start()
345 p.join(5)
346 self.assertEqual(p.exitcode, code)
347
348 with open(testfn, 'r') as f:
349 self.assertEqual(f.read().rstrip(), str(reason))
350
351 for reason in (True, False, 8):
352 p = self.Process(target=sys.exit, args=(reason,))
353 p.daemon = True
354 p.start()
355 p.join(5)
356 self.assertEqual(p.exitcode, reason)
357
Benjamin Petersondfd79492008-06-13 19:13:39 +0000358#
359#
360#
361
362class _UpperCaser(multiprocessing.Process):
363
364 def __init__(self):
365 multiprocessing.Process.__init__(self)
366 self.child_conn, self.parent_conn = multiprocessing.Pipe()
367
368 def run(self):
369 self.parent_conn.close()
370 for s in iter(self.child_conn.recv, None):
371 self.child_conn.send(s.upper())
372 self.child_conn.close()
373
374 def submit(self, s):
375 assert type(s) is str
376 self.parent_conn.send(s)
377 return self.parent_conn.recv()
378
379 def stop(self):
380 self.parent_conn.send(None)
381 self.parent_conn.close()
382 self.child_conn.close()
383
384class _TestSubclassingProcess(BaseTestCase):
385
386 ALLOWED_TYPES = ('processes',)
387
388 def test_subclassing(self):
389 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200390 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000391 uppercaser.start()
392 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
393 self.assertEqual(uppercaser.submit('world'), 'WORLD')
394 uppercaser.stop()
395 uppercaser.join()
396
397#
398#
399#
400
401def queue_empty(q):
402 if hasattr(q, 'empty'):
403 return q.empty()
404 else:
405 return q.qsize() == 0
406
407def queue_full(q, maxsize):
408 if hasattr(q, 'full'):
409 return q.full()
410 else:
411 return q.qsize() == maxsize
412
413
414class _TestQueue(BaseTestCase):
415
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
420 for i in range(6):
421 queue.get()
422 parent_can_continue.set()
423
424 def test_put(self):
425 MAXSIZE = 6
426 queue = self.Queue(maxsize=MAXSIZE)
427 child_can_start = self.Event()
428 parent_can_continue = self.Event()
429
430 proc = self.Process(
431 target=self._test_put,
432 args=(queue, child_can_start, parent_can_continue)
433 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000434 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000435 proc.start()
436
437 self.assertEqual(queue_empty(queue), True)
438 self.assertEqual(queue_full(queue, MAXSIZE), False)
439
440 queue.put(1)
441 queue.put(2, True)
442 queue.put(3, True, None)
443 queue.put(4, False)
444 queue.put(5, False, None)
445 queue.put_nowait(6)
446
447 # the values may be in buffer but not yet in pipe so sleep a bit
448 time.sleep(DELTA)
449
450 self.assertEqual(queue_empty(queue), False)
451 self.assertEqual(queue_full(queue, MAXSIZE), True)
452
453 put = TimingWrapper(queue.put)
454 put_nowait = TimingWrapper(queue.put_nowait)
455
456 self.assertRaises(Queue.Full, put, 7, False)
457 self.assertTimingAlmostEqual(put.elapsed, 0)
458
459 self.assertRaises(Queue.Full, put, 7, False, None)
460 self.assertTimingAlmostEqual(put.elapsed, 0)
461
462 self.assertRaises(Queue.Full, put_nowait, 7)
463 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
464
465 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
466 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
467
468 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
469 self.assertTimingAlmostEqual(put.elapsed, 0)
470
471 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
472 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
473
474 child_can_start.set()
475 parent_can_continue.wait()
476
477 self.assertEqual(queue_empty(queue), True)
478 self.assertEqual(queue_full(queue, MAXSIZE), False)
479
480 proc.join()
481
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000482 @classmethod
483 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000484 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000485 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000486 queue.put(2)
487 queue.put(3)
488 queue.put(4)
489 queue.put(5)
490 parent_can_continue.set()
491
492 def test_get(self):
493 queue = self.Queue()
494 child_can_start = self.Event()
495 parent_can_continue = self.Event()
496
497 proc = self.Process(
498 target=self._test_get,
499 args=(queue, child_can_start, parent_can_continue)
500 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000501 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000502 proc.start()
503
504 self.assertEqual(queue_empty(queue), True)
505
506 child_can_start.set()
507 parent_can_continue.wait()
508
509 time.sleep(DELTA)
510 self.assertEqual(queue_empty(queue), False)
511
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000512 # Hangs unexpectedly, remove for now
513 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000514 self.assertEqual(queue.get(True, None), 2)
515 self.assertEqual(queue.get(True), 3)
516 self.assertEqual(queue.get(timeout=1), 4)
517 self.assertEqual(queue.get_nowait(), 5)
518
519 self.assertEqual(queue_empty(queue), True)
520
521 get = TimingWrapper(queue.get)
522 get_nowait = TimingWrapper(queue.get_nowait)
523
524 self.assertRaises(Queue.Empty, get, False)
525 self.assertTimingAlmostEqual(get.elapsed, 0)
526
527 self.assertRaises(Queue.Empty, get, False, None)
528 self.assertTimingAlmostEqual(get.elapsed, 0)
529
530 self.assertRaises(Queue.Empty, get_nowait)
531 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
532
533 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
534 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
535
536 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
537 self.assertTimingAlmostEqual(get.elapsed, 0)
538
539 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
540 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
541
542 proc.join()
543
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000544 @classmethod
545 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000546 for i in range(10, 20):
547 queue.put(i)
548 # note that at this point the items may only be buffered, so the
549 # process cannot shutdown until the feeder thread has finished
550 # pushing items onto the pipe.
551
552 def test_fork(self):
553 # Old versions of Queue would fail to create a new feeder
554 # thread for a forked process if the original process had its
555 # own feeder thread. This test checks that this no longer
556 # happens.
557
558 queue = self.Queue()
559
560 # put items on queue so that main process starts a feeder thread
561 for i in range(10):
562 queue.put(i)
563
564 # wait to make sure thread starts before we fork a new process
565 time.sleep(DELTA)
566
567 # fork process
568 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200569 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000570 p.start()
571
572 # check that all expected items are in the queue
573 for i in range(20):
574 self.assertEqual(queue.get(), i)
575 self.assertRaises(Queue.Empty, queue.get, False)
576
577 p.join()
578
579 def test_qsize(self):
580 q = self.Queue()
581 try:
582 self.assertEqual(q.qsize(), 0)
583 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600584 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000585 q.put(1)
586 self.assertEqual(q.qsize(), 1)
587 q.put(5)
588 self.assertEqual(q.qsize(), 2)
589 q.get()
590 self.assertEqual(q.qsize(), 1)
591 q.get()
592 self.assertEqual(q.qsize(), 0)
593
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000594 @classmethod
595 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000596 for obj in iter(q.get, None):
597 time.sleep(DELTA)
598 q.task_done()
599
600 def test_task_done(self):
601 queue = self.JoinableQueue()
602
603 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000604 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000605
606 workers = [self.Process(target=self._test_task_done, args=(queue,))
607 for i in xrange(4)]
608
609 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200610 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000611 p.start()
612
613 for i in xrange(10):
614 queue.put(i)
615
616 queue.join()
617
618 for p in workers:
619 queue.put(None)
620
621 for p in workers:
622 p.join()
623
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200624 def test_no_import_lock_contention(self):
625 with test_support.temp_cwd():
626 module_name = 'imported_by_an_imported_module'
627 with open(module_name + '.py', 'w') as f:
628 f.write("""if 1:
629 import multiprocessing
630
631 q = multiprocessing.Queue()
632 q.put('knock knock')
633 q.get(timeout=3)
634 q.close()
635 """)
636
637 with test_support.DirsOnSysPath(os.getcwd()):
638 try:
639 __import__(module_name)
640 except Queue.Empty:
641 self.fail("Probable regression on import lock contention;"
642 " see Issue #22853")
643
Benjamin Petersondfd79492008-06-13 19:13:39 +0000644#
645#
646#
647
648class _TestLock(BaseTestCase):
649
650 def test_lock(self):
651 lock = self.Lock()
652 self.assertEqual(lock.acquire(), True)
653 self.assertEqual(lock.acquire(False), False)
654 self.assertEqual(lock.release(), None)
655 self.assertRaises((ValueError, threading.ThreadError), lock.release)
656
657 def test_rlock(self):
658 lock = self.RLock()
659 self.assertEqual(lock.acquire(), True)
660 self.assertEqual(lock.acquire(), True)
661 self.assertEqual(lock.acquire(), True)
662 self.assertEqual(lock.release(), None)
663 self.assertEqual(lock.release(), None)
664 self.assertEqual(lock.release(), None)
665 self.assertRaises((AssertionError, RuntimeError), lock.release)
666
Jesse Noller82eb5902009-03-30 23:29:31 +0000667 def test_lock_context(self):
668 with self.Lock():
669 pass
670
Benjamin Petersondfd79492008-06-13 19:13:39 +0000671
672class _TestSemaphore(BaseTestCase):
673
674 def _test_semaphore(self, sem):
675 self.assertReturnsIfImplemented(2, get_value, sem)
676 self.assertEqual(sem.acquire(), True)
677 self.assertReturnsIfImplemented(1, get_value, sem)
678 self.assertEqual(sem.acquire(), True)
679 self.assertReturnsIfImplemented(0, get_value, sem)
680 self.assertEqual(sem.acquire(False), False)
681 self.assertReturnsIfImplemented(0, get_value, sem)
682 self.assertEqual(sem.release(), None)
683 self.assertReturnsIfImplemented(1, get_value, sem)
684 self.assertEqual(sem.release(), None)
685 self.assertReturnsIfImplemented(2, get_value, sem)
686
687 def test_semaphore(self):
688 sem = self.Semaphore(2)
689 self._test_semaphore(sem)
690 self.assertEqual(sem.release(), None)
691 self.assertReturnsIfImplemented(3, get_value, sem)
692 self.assertEqual(sem.release(), None)
693 self.assertReturnsIfImplemented(4, get_value, sem)
694
695 def test_bounded_semaphore(self):
696 sem = self.BoundedSemaphore(2)
697 self._test_semaphore(sem)
698 # Currently fails on OS/X
699 #if HAVE_GETVALUE:
700 # self.assertRaises(ValueError, sem.release)
701 # self.assertReturnsIfImplemented(2, get_value, sem)
702
703 def test_timeout(self):
704 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600705 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000706
707 sem = self.Semaphore(0)
708 acquire = TimingWrapper(sem.acquire)
709
710 self.assertEqual(acquire(False), False)
711 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
712
713 self.assertEqual(acquire(False, None), False)
714 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
715
716 self.assertEqual(acquire(False, TIMEOUT1), False)
717 self.assertTimingAlmostEqual(acquire.elapsed, 0)
718
719 self.assertEqual(acquire(True, TIMEOUT2), False)
720 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
721
722 self.assertEqual(acquire(timeout=TIMEOUT3), False)
723 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
724
725
726class _TestCondition(BaseTestCase):
727
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000728 @classmethod
729 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000730 cond.acquire()
731 sleeping.release()
732 cond.wait(timeout)
733 woken.release()
734 cond.release()
735
736 def check_invariant(self, cond):
737 # this is only supposed to succeed when there are no sleepers
738 if self.TYPE == 'processes':
739 try:
740 sleepers = (cond._sleeping_count.get_value() -
741 cond._woken_count.get_value())
742 self.assertEqual(sleepers, 0)
743 self.assertEqual(cond._wait_semaphore.get_value(), 0)
744 except NotImplementedError:
745 pass
746
747 def test_notify(self):
748 cond = self.Condition()
749 sleeping = self.Semaphore(0)
750 woken = self.Semaphore(0)
751
752 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000753 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000754 p.start()
755
756 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000757 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000758 p.start()
759
760 # wait for both children to start sleeping
761 sleeping.acquire()
762 sleeping.acquire()
763
764 # check no process/thread has woken up
765 time.sleep(DELTA)
766 self.assertReturnsIfImplemented(0, get_value, woken)
767
768 # wake up one process/thread
769 cond.acquire()
770 cond.notify()
771 cond.release()
772
773 # check one process/thread has woken up
774 time.sleep(DELTA)
775 self.assertReturnsIfImplemented(1, get_value, woken)
776
777 # wake up another
778 cond.acquire()
779 cond.notify()
780 cond.release()
781
782 # check other has woken up
783 time.sleep(DELTA)
784 self.assertReturnsIfImplemented(2, get_value, woken)
785
786 # check state is not mucked up
787 self.check_invariant(cond)
788 p.join()
789
790 def test_notify_all(self):
791 cond = self.Condition()
792 sleeping = self.Semaphore(0)
793 woken = self.Semaphore(0)
794
795 # start some threads/processes which will timeout
796 for i in range(3):
797 p = self.Process(target=self.f,
798 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000799 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000800 p.start()
801
802 t = threading.Thread(target=self.f,
803 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000804 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000805 t.start()
806
807 # wait for them all to sleep
808 for i in xrange(6):
809 sleeping.acquire()
810
811 # check they have all timed out
812 for i in xrange(6):
813 woken.acquire()
814 self.assertReturnsIfImplemented(0, get_value, woken)
815
816 # check state is not mucked up
817 self.check_invariant(cond)
818
819 # start some more threads/processes
820 for i in range(3):
821 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000822 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000823 p.start()
824
825 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000826 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000827 t.start()
828
829 # wait for them to all sleep
830 for i in xrange(6):
831 sleeping.acquire()
832
833 # check no process/thread has woken up
834 time.sleep(DELTA)
835 self.assertReturnsIfImplemented(0, get_value, woken)
836
837 # wake them all up
838 cond.acquire()
839 cond.notify_all()
840 cond.release()
841
842 # check they have all woken
843 time.sleep(DELTA)
844 self.assertReturnsIfImplemented(6, get_value, woken)
845
846 # check state is not mucked up
847 self.check_invariant(cond)
848
849 def test_timeout(self):
850 cond = self.Condition()
851 wait = TimingWrapper(cond.wait)
852 cond.acquire()
853 res = wait(TIMEOUT1)
854 cond.release()
855 self.assertEqual(res, None)
856 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
857
858
859class _TestEvent(BaseTestCase):
860
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000861 @classmethod
862 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000863 time.sleep(TIMEOUT2)
864 event.set()
865
866 def test_event(self):
867 event = self.Event()
868 wait = TimingWrapper(event.wait)
869
Ezio Melottic2077b02011-03-16 12:34:31 +0200870 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000871 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000872 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000873
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000874 # Removed, threading.Event.wait() will return the value of the __flag
875 # instead of None. API Shear with the semaphore backed mp.Event
876 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000877 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000878 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000879 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
880
881 event.set()
882
883 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000884 self.assertEqual(event.is_set(), True)
885 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000886 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000887 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000888 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
889 # self.assertEqual(event.is_set(), True)
890
891 event.clear()
892
893 #self.assertEqual(event.is_set(), False)
894
Jesus Cea6f6016b2011-09-09 20:26:57 +0200895 p = self.Process(target=self._test_event, args=(event,))
896 p.daemon = True
897 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000898 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000899
900#
901#
902#
903
904class _TestValue(BaseTestCase):
905
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000906 ALLOWED_TYPES = ('processes',)
907
Benjamin Petersondfd79492008-06-13 19:13:39 +0000908 codes_values = [
909 ('i', 4343, 24234),
910 ('d', 3.625, -4.25),
911 ('h', -232, 234),
912 ('c', latin('x'), latin('y'))
913 ]
914
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000915 def setUp(self):
916 if not HAS_SHAREDCTYPES:
917 self.skipTest("requires multiprocessing.sharedctypes")
918
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000919 @classmethod
920 def _test(cls, values):
921 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000922 sv.value = cv[2]
923
924
925 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000926 if raw:
927 values = [self.RawValue(code, value)
928 for code, value, _ in self.codes_values]
929 else:
930 values = [self.Value(code, value)
931 for code, value, _ in self.codes_values]
932
933 for sv, cv in zip(values, self.codes_values):
934 self.assertEqual(sv.value, cv[1])
935
936 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200937 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000938 proc.start()
939 proc.join()
940
941 for sv, cv in zip(values, self.codes_values):
942 self.assertEqual(sv.value, cv[2])
943
944 def test_rawvalue(self):
945 self.test_value(raw=True)
946
947 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000948 val1 = self.Value('i', 5)
949 lock1 = val1.get_lock()
950 obj1 = val1.get_obj()
951
952 val2 = self.Value('i', 5, lock=None)
953 lock2 = val2.get_lock()
954 obj2 = val2.get_obj()
955
956 lock = self.Lock()
957 val3 = self.Value('i', 5, lock=lock)
958 lock3 = val3.get_lock()
959 obj3 = val3.get_obj()
960 self.assertEqual(lock, lock3)
961
Jesse Noller6ab22152009-01-18 02:45:38 +0000962 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000963 self.assertFalse(hasattr(arr4, 'get_lock'))
964 self.assertFalse(hasattr(arr4, 'get_obj'))
965
Jesse Noller6ab22152009-01-18 02:45:38 +0000966 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
967
968 arr5 = self.RawValue('i', 5)
969 self.assertFalse(hasattr(arr5, 'get_lock'))
970 self.assertFalse(hasattr(arr5, 'get_obj'))
971
Benjamin Petersondfd79492008-06-13 19:13:39 +0000972
973class _TestArray(BaseTestCase):
974
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000975 ALLOWED_TYPES = ('processes',)
976
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000977 @classmethod
978 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000979 for i in range(1, len(seq)):
980 seq[i] += seq[i-1]
981
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000982 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000983 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000984 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
985 if raw:
986 arr = self.RawArray('i', seq)
987 else:
988 arr = self.Array('i', seq)
989
990 self.assertEqual(len(arr), len(seq))
991 self.assertEqual(arr[3], seq[3])
992 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
993
994 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
995
996 self.assertEqual(list(arr[:]), seq)
997
998 self.f(seq)
999
1000 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001001 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001002 p.start()
1003 p.join()
1004
1005 self.assertEqual(list(arr[:]), seq)
1006
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001007 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001008 def test_array_from_size(self):
1009 size = 10
1010 # Test for zeroing (see issue #11675).
1011 # The repetition below strengthens the test by increasing the chances
1012 # of previously allocated non-zero memory being used for the new array
1013 # on the 2nd and 3rd loops.
1014 for _ in range(3):
1015 arr = self.Array('i', size)
1016 self.assertEqual(len(arr), size)
1017 self.assertEqual(list(arr), [0] * size)
1018 arr[:] = range(10)
1019 self.assertEqual(list(arr), range(10))
1020 del arr
1021
1022 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001023 def test_rawarray(self):
1024 self.test_array(raw=True)
1025
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001026 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001027 def test_array_accepts_long(self):
1028 arr = self.Array('i', 10L)
1029 self.assertEqual(len(arr), 10)
1030 raw_arr = self.RawArray('i', 10L)
1031 self.assertEqual(len(raw_arr), 10)
1032
1033 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001034 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001035 arr1 = self.Array('i', range(10))
1036 lock1 = arr1.get_lock()
1037 obj1 = arr1.get_obj()
1038
1039 arr2 = self.Array('i', range(10), lock=None)
1040 lock2 = arr2.get_lock()
1041 obj2 = arr2.get_obj()
1042
1043 lock = self.Lock()
1044 arr3 = self.Array('i', range(10), lock=lock)
1045 lock3 = arr3.get_lock()
1046 obj3 = arr3.get_obj()
1047 self.assertEqual(lock, lock3)
1048
Jesse Noller6ab22152009-01-18 02:45:38 +00001049 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001050 self.assertFalse(hasattr(arr4, 'get_lock'))
1051 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001052 self.assertRaises(AttributeError,
1053 self.Array, 'i', range(10), lock='notalock')
1054
1055 arr5 = self.RawArray('i', range(10))
1056 self.assertFalse(hasattr(arr5, 'get_lock'))
1057 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001058
1059#
1060#
1061#
1062
1063class _TestContainers(BaseTestCase):
1064
1065 ALLOWED_TYPES = ('manager',)
1066
1067 def test_list(self):
1068 a = self.list(range(10))
1069 self.assertEqual(a[:], range(10))
1070
1071 b = self.list()
1072 self.assertEqual(b[:], [])
1073
1074 b.extend(range(5))
1075 self.assertEqual(b[:], range(5))
1076
1077 self.assertEqual(b[2], 2)
1078 self.assertEqual(b[2:10], [2,3,4])
1079
1080 b *= 2
1081 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1082
1083 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1084
1085 self.assertEqual(a[:], range(10))
1086
1087 d = [a, b]
1088 e = self.list(d)
1089 self.assertEqual(
1090 e[:],
1091 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1092 )
1093
1094 f = self.list([a])
1095 a.append('hello')
1096 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1097
1098 def test_dict(self):
1099 d = self.dict()
1100 indices = range(65, 70)
1101 for i in indices:
1102 d[i] = chr(i)
1103 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1104 self.assertEqual(sorted(d.keys()), indices)
1105 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1106 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1107
1108 def test_namespace(self):
1109 n = self.Namespace()
1110 n.name = 'Bob'
1111 n.job = 'Builder'
1112 n._hidden = 'hidden'
1113 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1114 del n.job
1115 self.assertEqual(str(n), "Namespace(name='Bob')")
1116 self.assertTrue(hasattr(n, 'name'))
1117 self.assertTrue(not hasattr(n, 'job'))
1118
1119#
1120#
1121#
1122
1123def sqr(x, wait=0.0):
1124 time.sleep(wait)
1125 return x*x
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001126
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001127def identity(x):
1128 return x
1129
1130class CountedObject(object):
1131 n_instances = 0
1132
1133 def __new__(cls):
1134 cls.n_instances += 1
1135 return object.__new__(cls)
1136
1137 def __del__(self):
1138 type(self).n_instances -= 1
1139
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001140class SayWhenError(ValueError): pass
1141
1142def exception_throwing_generator(total, when):
1143 for i in range(total):
1144 if i == when:
1145 raise SayWhenError("Somebody said when")
1146 yield i
1147
Benjamin Petersondfd79492008-06-13 19:13:39 +00001148class _TestPool(BaseTestCase):
1149
1150 def test_apply(self):
1151 papply = self.pool.apply
1152 self.assertEqual(papply(sqr, (5,)), sqr(5))
1153 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1154
1155 def test_map(self):
1156 pmap = self.pool.map
1157 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1158 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1159 map(sqr, range(100)))
1160
Richard Oudkerk21aad972013-10-28 23:02:22 +00001161 def test_map_unplicklable(self):
1162 # Issue #19425 -- failure to pickle should not cause a hang
1163 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001164 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001165 class A(object):
1166 def __reduce__(self):
1167 raise RuntimeError('cannot pickle')
1168 with self.assertRaises(RuntimeError):
1169 self.pool.map(sqr, [A()]*10)
1170
Jesse Noller7530e472009-07-16 14:23:04 +00001171 def test_map_chunksize(self):
1172 try:
1173 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1174 except multiprocessing.TimeoutError:
1175 self.fail("pool.map_async with chunksize stalled on null list")
1176
Benjamin Petersondfd79492008-06-13 19:13:39 +00001177 def test_async(self):
1178 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1179 get = TimingWrapper(res.get)
1180 self.assertEqual(get(), 49)
1181 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1182
1183 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001184 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001185 get = TimingWrapper(res.get)
1186 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1187 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1188
1189 def test_imap(self):
1190 it = self.pool.imap(sqr, range(10))
1191 self.assertEqual(list(it), map(sqr, range(10)))
1192
1193 it = self.pool.imap(sqr, range(10))
1194 for i in range(10):
1195 self.assertEqual(it.next(), i*i)
1196 self.assertRaises(StopIteration, it.next)
1197
1198 it = self.pool.imap(sqr, range(1000), chunksize=100)
1199 for i in range(1000):
1200 self.assertEqual(it.next(), i*i)
1201 self.assertRaises(StopIteration, it.next)
1202
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001203 def test_imap_handle_iterable_exception(self):
1204 if self.TYPE == 'manager':
1205 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1206
1207 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1208 for i in range(3):
1209 self.assertEqual(next(it), i*i)
1210 self.assertRaises(SayWhenError, it.next)
1211
1212 # SayWhenError seen at start of problematic chunk's results
1213 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1214 for i in range(6):
1215 self.assertEqual(next(it), i*i)
1216 self.assertRaises(SayWhenError, it.next)
1217 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1218 for i in range(4):
1219 self.assertEqual(next(it), i*i)
1220 self.assertRaises(SayWhenError, it.next)
1221
Benjamin Petersondfd79492008-06-13 19:13:39 +00001222 def test_imap_unordered(self):
1223 it = self.pool.imap_unordered(sqr, range(1000))
1224 self.assertEqual(sorted(it), map(sqr, range(1000)))
1225
1226 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1227 self.assertEqual(sorted(it), map(sqr, range(1000)))
1228
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001229 def test_imap_unordered_handle_iterable_exception(self):
1230 if self.TYPE == 'manager':
1231 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1232
1233 it = self.pool.imap_unordered(sqr,
1234 exception_throwing_generator(10, 3),
1235 1)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001236 expected_values = map(sqr, range(10))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001237 with self.assertRaises(SayWhenError):
1238 # imap_unordered makes it difficult to anticipate the SayWhenError
1239 for i in range(10):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001240 value = next(it)
1241 self.assertIn(value, expected_values)
1242 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001243
1244 it = self.pool.imap_unordered(sqr,
1245 exception_throwing_generator(20, 7),
1246 2)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001247 expected_values = map(sqr, range(20))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001248 with self.assertRaises(SayWhenError):
1249 for i in range(20):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001250 value = next(it)
1251 self.assertIn(value, expected_values)
1252 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001253
Benjamin Petersondfd79492008-06-13 19:13:39 +00001254 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001255 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1256 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1257
Benjamin Petersondfd79492008-06-13 19:13:39 +00001258 p = multiprocessing.Pool(3)
1259 self.assertEqual(3, len(p._pool))
1260 p.close()
1261 p.join()
1262
1263 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001264 p = self.Pool(4)
1265 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001266 time.sleep, [0.1 for i in range(10000)], chunksize=1
1267 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001268 p.terminate()
1269 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001270 join()
1271 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001272
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001273 def test_empty_iterable(self):
1274 # See Issue 12157
1275 p = self.Pool(1)
1276
1277 self.assertEqual(p.map(sqr, []), [])
1278 self.assertEqual(list(p.imap(sqr, [])), [])
1279 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1280 self.assertEqual(p.map_async(sqr, []).get(), [])
1281
1282 p.close()
1283 p.join()
1284
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001285 def test_release_task_refs(self):
1286 # Issue #29861: task arguments and results should not be kept
1287 # alive after we are done with them.
1288 objs = list(CountedObject() for i in range(10))
1289 refs = list(weakref.ref(o) for o in objs)
1290 self.pool.map(identity, objs)
1291
1292 del objs
Victor Stinnerfd6094c2017-05-05 09:47:11 +02001293 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001294 self.assertEqual(set(wr() for wr in refs), {None})
1295 # With a process pool, copies of the objects are returned, check
1296 # they were released too.
1297 self.assertEqual(CountedObject.n_instances, 0)
1298
1299
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001300def unpickleable_result():
1301 return lambda: 42
1302
1303class _TestPoolWorkerErrors(BaseTestCase):
1304 ALLOWED_TYPES = ('processes', )
1305
1306 def test_unpickleable_result(self):
1307 from multiprocessing.pool import MaybeEncodingError
1308 p = multiprocessing.Pool(2)
1309
1310 # Make sure we don't lose pool processes because of encoding errors.
1311 for iteration in range(20):
1312 res = p.apply_async(unpickleable_result)
1313 self.assertRaises(MaybeEncodingError, res.get)
1314
1315 p.close()
1316 p.join()
1317
Jesse Noller654ade32010-01-27 03:05:57 +00001318class _TestPoolWorkerLifetime(BaseTestCase):
1319
1320 ALLOWED_TYPES = ('processes', )
1321 def test_pool_worker_lifetime(self):
1322 p = multiprocessing.Pool(3, maxtasksperchild=10)
1323 self.assertEqual(3, len(p._pool))
1324 origworkerpids = [w.pid for w in p._pool]
1325 # Run many tasks so each worker gets replaced (hopefully)
1326 results = []
1327 for i in range(100):
1328 results.append(p.apply_async(sqr, (i, )))
1329 # Fetch the results and verify we got the right answers,
1330 # also ensuring all the tasks have completed.
1331 for (j, res) in enumerate(results):
1332 self.assertEqual(res.get(), sqr(j))
1333 # Refill the pool
1334 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001335 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001336 # (countdown * DELTA = 5 seconds max startup process time)
1337 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001338 while countdown and not all(w.is_alive() for w in p._pool):
1339 countdown -= 1
1340 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001341 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001342 # All pids should be assigned. See issue #7805.
1343 self.assertNotIn(None, origworkerpids)
1344 self.assertNotIn(None, finalworkerpids)
1345 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001346 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1347 p.close()
1348 p.join()
1349
Charles-François Natali46f990e2011-10-24 18:43:51 +02001350 def test_pool_worker_lifetime_early_close(self):
1351 # Issue #10332: closing a pool whose workers have limited lifetimes
1352 # before all the tasks completed would make join() hang.
1353 p = multiprocessing.Pool(3, maxtasksperchild=1)
1354 results = []
1355 for i in range(6):
1356 results.append(p.apply_async(sqr, (i, 0.3)))
1357 p.close()
1358 p.join()
1359 # check the results
1360 for (j, res) in enumerate(results):
1361 self.assertEqual(res.get(), sqr(j))
1362
1363
Benjamin Petersondfd79492008-06-13 19:13:39 +00001364#
1365# Test that manager has expected number of shared objects left
1366#
1367
1368class _TestZZZNumberOfObjects(BaseTestCase):
1369 # Because test cases are sorted alphabetically, this one will get
1370 # run after all the other tests for the manager. It tests that
1371 # there have been no "reference leaks" for the manager's shared
1372 # objects. Note the comment in _TestPool.test_terminate().
1373 ALLOWED_TYPES = ('manager',)
1374
1375 def test_number_of_objects(self):
1376 EXPECTED_NUMBER = 1 # the pool object is still alive
1377 multiprocessing.active_children() # discard dead process objs
1378 gc.collect() # do garbage collection
1379 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001380 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001381 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001382 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001383 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001384
1385 self.assertEqual(refs, EXPECTED_NUMBER)
1386
1387#
1388# Test of creating a customized manager class
1389#
1390
1391from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1392
1393class FooBar(object):
1394 def f(self):
1395 return 'f()'
1396 def g(self):
1397 raise ValueError
1398 def _h(self):
1399 return '_h()'
1400
1401def baz():
1402 for i in xrange(10):
1403 yield i*i
1404
1405class IteratorProxy(BaseProxy):
1406 _exposed_ = ('next', '__next__')
1407 def __iter__(self):
1408 return self
1409 def next(self):
1410 return self._callmethod('next')
1411 def __next__(self):
1412 return self._callmethod('__next__')
1413
1414class MyManager(BaseManager):
1415 pass
1416
1417MyManager.register('Foo', callable=FooBar)
1418MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1419MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1420
1421
1422class _TestMyManager(BaseTestCase):
1423
1424 ALLOWED_TYPES = ('manager',)
1425
1426 def test_mymanager(self):
1427 manager = MyManager()
1428 manager.start()
1429
1430 foo = manager.Foo()
1431 bar = manager.Bar()
1432 baz = manager.baz()
1433
1434 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1435 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1436
1437 self.assertEqual(foo_methods, ['f', 'g'])
1438 self.assertEqual(bar_methods, ['f', '_h'])
1439
1440 self.assertEqual(foo.f(), 'f()')
1441 self.assertRaises(ValueError, foo.g)
1442 self.assertEqual(foo._callmethod('f'), 'f()')
1443 self.assertRaises(RemoteError, foo._callmethod, '_h')
1444
1445 self.assertEqual(bar.f(), 'f()')
1446 self.assertEqual(bar._h(), '_h()')
1447 self.assertEqual(bar._callmethod('f'), 'f()')
1448 self.assertEqual(bar._callmethod('_h'), '_h()')
1449
1450 self.assertEqual(list(baz), [i*i for i in range(10)])
1451
1452 manager.shutdown()
1453
1454#
1455# Test of connecting to a remote server and using xmlrpclib for serialization
1456#
1457
1458_queue = Queue.Queue()
1459def get_queue():
1460 return _queue
1461
1462class QueueManager(BaseManager):
1463 '''manager class used by server process'''
1464QueueManager.register('get_queue', callable=get_queue)
1465
1466class QueueManager2(BaseManager):
1467 '''manager class which specifies the same interface as QueueManager'''
1468QueueManager2.register('get_queue')
1469
1470
1471SERIALIZER = 'xmlrpclib'
1472
1473class _TestRemoteManager(BaseTestCase):
1474
1475 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001476 values = ['hello world', None, True, 2.25,
1477 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1478 ]
1479 result = values[:]
1480 if test_support.have_unicode:
1481 #result[-1] = u'hall\xe5 v\xe4rlden'
1482 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1483 r'\u0441\u0432\u0456\u0442')
1484 values.append(uvalue)
1485 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001486
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001487 @classmethod
1488 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001489 manager = QueueManager2(
1490 address=address, authkey=authkey, serializer=SERIALIZER
1491 )
1492 manager.connect()
1493 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001494 # Note that xmlrpclib will deserialize object as a list not a tuple
1495 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001496
1497 def test_remote(self):
1498 authkey = os.urandom(32)
1499
1500 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001501 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001502 )
1503 manager.start()
1504
1505 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001506 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001507 p.start()
1508
1509 manager2 = QueueManager2(
1510 address=manager.address, authkey=authkey, serializer=SERIALIZER
1511 )
1512 manager2.connect()
1513 queue = manager2.get_queue()
1514
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001515 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001516
1517 # Because we are using xmlrpclib for serialization instead of
1518 # pickle this will cause a serialization error.
1519 self.assertRaises(Exception, queue.put, time.sleep)
1520
1521 # Make queue finalizer run before the server is stopped
1522 del queue
1523 manager.shutdown()
1524
Jesse Noller459a6482009-03-30 15:50:42 +00001525class _TestManagerRestart(BaseTestCase):
1526
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001527 @classmethod
1528 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001529 manager = QueueManager(
1530 address=address, authkey=authkey, serializer=SERIALIZER)
1531 manager.connect()
1532 queue = manager.get_queue()
1533 queue.put('hello world')
1534
1535 def test_rapid_restart(self):
1536 authkey = os.urandom(32)
1537 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001538 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001539 srvr = manager.get_server()
1540 addr = srvr.address
1541 # Close the connection.Listener socket which gets opened as a part
1542 # of manager.get_server(). It's not needed for the test.
1543 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001544 manager.start()
1545
1546 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001547 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001548 p.start()
1549 queue = manager.get_queue()
1550 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001551 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001552 manager.shutdown()
1553 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001554 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001555 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001556 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001557
Benjamin Petersondfd79492008-06-13 19:13:39 +00001558#
1559#
1560#
1561
1562SENTINEL = latin('')
1563
1564class _TestConnection(BaseTestCase):
1565
1566 ALLOWED_TYPES = ('processes', 'threads')
1567
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001568 @classmethod
1569 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001570 for msg in iter(conn.recv_bytes, SENTINEL):
1571 conn.send_bytes(msg)
1572 conn.close()
1573
1574 def test_connection(self):
1575 conn, child_conn = self.Pipe()
1576
1577 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001578 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001579 p.start()
1580
1581 seq = [1, 2.25, None]
1582 msg = latin('hello world')
1583 longmsg = msg * 10
1584 arr = array.array('i', range(4))
1585
1586 if self.TYPE == 'processes':
1587 self.assertEqual(type(conn.fileno()), int)
1588
1589 self.assertEqual(conn.send(seq), None)
1590 self.assertEqual(conn.recv(), seq)
1591
1592 self.assertEqual(conn.send_bytes(msg), None)
1593 self.assertEqual(conn.recv_bytes(), msg)
1594
1595 if self.TYPE == 'processes':
1596 buffer = array.array('i', [0]*10)
1597 expected = list(arr) + [0] * (10 - len(arr))
1598 self.assertEqual(conn.send_bytes(arr), None)
1599 self.assertEqual(conn.recv_bytes_into(buffer),
1600 len(arr) * buffer.itemsize)
1601 self.assertEqual(list(buffer), expected)
1602
1603 buffer = array.array('i', [0]*10)
1604 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1605 self.assertEqual(conn.send_bytes(arr), None)
1606 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1607 len(arr) * buffer.itemsize)
1608 self.assertEqual(list(buffer), expected)
1609
1610 buffer = bytearray(latin(' ' * 40))
1611 self.assertEqual(conn.send_bytes(longmsg), None)
1612 try:
1613 res = conn.recv_bytes_into(buffer)
1614 except multiprocessing.BufferTooShort, e:
1615 self.assertEqual(e.args, (longmsg,))
1616 else:
1617 self.fail('expected BufferTooShort, got %s' % res)
1618
1619 poll = TimingWrapper(conn.poll)
1620
1621 self.assertEqual(poll(), False)
1622 self.assertTimingAlmostEqual(poll.elapsed, 0)
1623
1624 self.assertEqual(poll(TIMEOUT1), False)
1625 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1626
1627 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001628 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001629
1630 self.assertEqual(poll(TIMEOUT1), True)
1631 self.assertTimingAlmostEqual(poll.elapsed, 0)
1632
1633 self.assertEqual(conn.recv(), None)
1634
1635 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1636 conn.send_bytes(really_big_msg)
1637 self.assertEqual(conn.recv_bytes(), really_big_msg)
1638
1639 conn.send_bytes(SENTINEL) # tell child to quit
1640 child_conn.close()
1641
1642 if self.TYPE == 'processes':
1643 self.assertEqual(conn.readable, True)
1644 self.assertEqual(conn.writable, True)
1645 self.assertRaises(EOFError, conn.recv)
1646 self.assertRaises(EOFError, conn.recv_bytes)
1647
1648 p.join()
1649
1650 def test_duplex_false(self):
1651 reader, writer = self.Pipe(duplex=False)
1652 self.assertEqual(writer.send(1), None)
1653 self.assertEqual(reader.recv(), 1)
1654 if self.TYPE == 'processes':
1655 self.assertEqual(reader.readable, True)
1656 self.assertEqual(reader.writable, False)
1657 self.assertEqual(writer.readable, False)
1658 self.assertEqual(writer.writable, True)
1659 self.assertRaises(IOError, reader.send, 2)
1660 self.assertRaises(IOError, writer.recv)
1661 self.assertRaises(IOError, writer.poll)
1662
1663 def test_spawn_close(self):
1664 # We test that a pipe connection can be closed by parent
1665 # process immediately after child is spawned. On Windows this
1666 # would have sometimes failed on old versions because
1667 # child_conn would be closed before the child got a chance to
1668 # duplicate it.
1669 conn, child_conn = self.Pipe()
1670
1671 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001672 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001673 p.start()
1674 child_conn.close() # this might complete before child initializes
1675
1676 msg = latin('hello')
1677 conn.send_bytes(msg)
1678 self.assertEqual(conn.recv_bytes(), msg)
1679
1680 conn.send_bytes(SENTINEL)
1681 conn.close()
1682 p.join()
1683
1684 def test_sendbytes(self):
1685 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001686 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001687
1688 msg = latin('abcdefghijklmnopqrstuvwxyz')
1689 a, b = self.Pipe()
1690
1691 a.send_bytes(msg)
1692 self.assertEqual(b.recv_bytes(), msg)
1693
1694 a.send_bytes(msg, 5)
1695 self.assertEqual(b.recv_bytes(), msg[5:])
1696
1697 a.send_bytes(msg, 7, 8)
1698 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1699
1700 a.send_bytes(msg, 26)
1701 self.assertEqual(b.recv_bytes(), latin(''))
1702
1703 a.send_bytes(msg, 26, 0)
1704 self.assertEqual(b.recv_bytes(), latin(''))
1705
1706 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1707
1708 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1709
1710 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1711
1712 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1713
1714 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1715
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001716 @classmethod
1717 def _is_fd_assigned(cls, fd):
1718 try:
1719 os.fstat(fd)
1720 except OSError as e:
1721 if e.errno == errno.EBADF:
1722 return False
1723 raise
1724 else:
1725 return True
1726
1727 @classmethod
1728 def _writefd(cls, conn, data, create_dummy_fds=False):
1729 if create_dummy_fds:
1730 for i in range(0, 256):
1731 if not cls._is_fd_assigned(i):
1732 os.dup2(conn.fileno(), i)
1733 fd = reduction.recv_handle(conn)
1734 if msvcrt:
1735 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1736 os.write(fd, data)
1737 os.close(fd)
1738
Charles-François Natalif8413b22011-09-21 18:44:49 +02001739 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001740 def test_fd_transfer(self):
1741 if self.TYPE != 'processes':
1742 self.skipTest("only makes sense with processes")
1743 conn, child_conn = self.Pipe(duplex=True)
1744
1745 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001746 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001747 p.start()
1748 with open(test_support.TESTFN, "wb") as f:
1749 fd = f.fileno()
1750 if msvcrt:
1751 fd = msvcrt.get_osfhandle(fd)
1752 reduction.send_handle(conn, fd, p.pid)
1753 p.join()
1754 with open(test_support.TESTFN, "rb") as f:
1755 self.assertEqual(f.read(), b"foo")
1756
Charles-François Natalif8413b22011-09-21 18:44:49 +02001757 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001758 @unittest.skipIf(sys.platform == "win32",
1759 "test semantics don't make sense on Windows")
1760 @unittest.skipIf(MAXFD <= 256,
1761 "largest assignable fd number is too small")
1762 @unittest.skipUnless(hasattr(os, "dup2"),
1763 "test needs os.dup2()")
1764 def test_large_fd_transfer(self):
1765 # With fd > 256 (issue #11657)
1766 if self.TYPE != 'processes':
1767 self.skipTest("only makes sense with processes")
1768 conn, child_conn = self.Pipe(duplex=True)
1769
1770 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001771 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001772 p.start()
1773 with open(test_support.TESTFN, "wb") as f:
1774 fd = f.fileno()
1775 for newfd in range(256, MAXFD):
1776 if not self._is_fd_assigned(newfd):
1777 break
1778 else:
1779 self.fail("could not find an unassigned large file descriptor")
1780 os.dup2(fd, newfd)
1781 try:
1782 reduction.send_handle(conn, newfd, p.pid)
1783 finally:
1784 os.close(newfd)
1785 p.join()
1786 with open(test_support.TESTFN, "rb") as f:
1787 self.assertEqual(f.read(), b"bar")
1788
Jesus Ceac23484b2011-09-21 03:47:39 +02001789 @classmethod
1790 def _send_data_without_fd(self, conn):
1791 os.write(conn.fileno(), b"\0")
1792
Charles-François Natalif8413b22011-09-21 18:44:49 +02001793 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001794 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1795 def test_missing_fd_transfer(self):
1796 # Check that exception is raised when received data is not
1797 # accompanied by a file descriptor in ancillary data.
1798 if self.TYPE != 'processes':
1799 self.skipTest("only makes sense with processes")
1800 conn, child_conn = self.Pipe(duplex=True)
1801
1802 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1803 p.daemon = True
1804 p.start()
1805 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1806 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001807
Benjamin Petersondfd79492008-06-13 19:13:39 +00001808class _TestListenerClient(BaseTestCase):
1809
1810 ALLOWED_TYPES = ('processes', 'threads')
1811
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001812 @classmethod
1813 def _test(cls, address):
1814 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001815 conn.send('hello')
1816 conn.close()
1817
1818 def test_listener_client(self):
1819 for family in self.connection.families:
1820 l = self.connection.Listener(family=family)
1821 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001822 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001823 p.start()
1824 conn = l.accept()
1825 self.assertEqual(conn.recv(), 'hello')
1826 p.join()
1827 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001828
1829 def test_issue14725(self):
1830 l = self.connection.Listener()
1831 p = self.Process(target=self._test, args=(l.address,))
1832 p.daemon = True
1833 p.start()
1834 time.sleep(1)
1835 # On Windows the client process should by now have connected,
1836 # written data and closed the pipe handle by now. This causes
1837 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1838 # 14725.
1839 conn = l.accept()
1840 self.assertEqual(conn.recv(), 'hello')
1841 conn.close()
1842 p.join()
1843 l.close()
1844
Benjamin Petersondfd79492008-06-13 19:13:39 +00001845#
1846# Test of sending connection and socket objects between processes
1847#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001848"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001849class _TestPicklingConnections(BaseTestCase):
1850
1851 ALLOWED_TYPES = ('processes',)
1852
1853 def _listener(self, conn, families):
1854 for fam in families:
1855 l = self.connection.Listener(family=fam)
1856 conn.send(l.address)
1857 new_conn = l.accept()
1858 conn.send(new_conn)
1859
1860 if self.TYPE == 'processes':
1861 l = socket.socket()
1862 l.bind(('localhost', 0))
1863 conn.send(l.getsockname())
1864 l.listen(1)
1865 new_conn, addr = l.accept()
1866 conn.send(new_conn)
1867
1868 conn.recv()
1869
1870 def _remote(self, conn):
1871 for (address, msg) in iter(conn.recv, None):
1872 client = self.connection.Client(address)
1873 client.send(msg.upper())
1874 client.close()
1875
1876 if self.TYPE == 'processes':
1877 address, msg = conn.recv()
1878 client = socket.socket()
1879 client.connect(address)
1880 client.sendall(msg.upper())
1881 client.close()
1882
1883 conn.close()
1884
1885 def test_pickling(self):
1886 try:
1887 multiprocessing.allow_connection_pickling()
1888 except ImportError:
1889 return
1890
1891 families = self.connection.families
1892
1893 lconn, lconn0 = self.Pipe()
1894 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001895 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001896 lp.start()
1897 lconn0.close()
1898
1899 rconn, rconn0 = self.Pipe()
1900 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001901 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001902 rp.start()
1903 rconn0.close()
1904
1905 for fam in families:
1906 msg = ('This connection uses family %s' % fam).encode('ascii')
1907 address = lconn.recv()
1908 rconn.send((address, msg))
1909 new_conn = lconn.recv()
1910 self.assertEqual(new_conn.recv(), msg.upper())
1911
1912 rconn.send(None)
1913
1914 if self.TYPE == 'processes':
1915 msg = latin('This connection uses a normal socket')
1916 address = lconn.recv()
1917 rconn.send((address, msg))
1918 if hasattr(socket, 'fromfd'):
1919 new_conn = lconn.recv()
1920 self.assertEqual(new_conn.recv(100), msg.upper())
1921 else:
1922 # XXX On Windows with Py2.6 need to backport fromfd()
1923 discard = lconn.recv_bytes()
1924
1925 lconn.send(None)
1926
1927 rconn.close()
1928 lconn.close()
1929
1930 lp.join()
1931 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001932"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001933#
1934#
1935#
1936
1937class _TestHeap(BaseTestCase):
1938
1939 ALLOWED_TYPES = ('processes',)
1940
1941 def test_heap(self):
1942 iterations = 5000
1943 maxblocks = 50
1944 blocks = []
1945
1946 # create and destroy lots of blocks of different sizes
1947 for i in xrange(iterations):
1948 size = int(random.lognormvariate(0, 1) * 1000)
1949 b = multiprocessing.heap.BufferWrapper(size)
1950 blocks.append(b)
1951 if len(blocks) > maxblocks:
1952 i = random.randrange(maxblocks)
1953 del blocks[i]
1954
1955 # get the heap object
1956 heap = multiprocessing.heap.BufferWrapper._heap
1957
1958 # verify the state of the heap
1959 all = []
1960 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001961 heap._lock.acquire()
1962 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001963 for L in heap._len_to_seq.values():
1964 for arena, start, stop in L:
1965 all.append((heap._arenas.index(arena), start, stop,
1966 stop-start, 'free'))
1967 for arena, start, stop in heap._allocated_blocks:
1968 all.append((heap._arenas.index(arena), start, stop,
1969 stop-start, 'occupied'))
1970 occupied += (stop-start)
1971
1972 all.sort()
1973
1974 for i in range(len(all)-1):
1975 (arena, start, stop) = all[i][:3]
1976 (narena, nstart, nstop) = all[i+1][:3]
1977 self.assertTrue((arena != narena and nstart == 0) or
1978 (stop == nstart))
1979
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001980 def test_free_from_gc(self):
1981 # Check that freeing of blocks by the garbage collector doesn't deadlock
1982 # (issue #12352).
1983 # Make sure the GC is enabled, and set lower collection thresholds to
1984 # make collections more frequent (and increase the probability of
1985 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001986 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001987 gc.enable()
1988 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001989 thresholds = gc.get_threshold()
1990 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001991 gc.set_threshold(10)
1992
1993 # perform numerous block allocations, with cyclic references to make
1994 # sure objects are collected asynchronously by the gc
1995 for i in range(5000):
1996 a = multiprocessing.heap.BufferWrapper(1)
1997 b = multiprocessing.heap.BufferWrapper(1)
1998 # circular references
1999 a.buddy = b
2000 b.buddy = a
2001
Benjamin Petersondfd79492008-06-13 19:13:39 +00002002#
2003#
2004#
2005
Benjamin Petersondfd79492008-06-13 19:13:39 +00002006class _Foo(Structure):
2007 _fields_ = [
2008 ('x', c_int),
2009 ('y', c_double)
2010 ]
2011
2012class _TestSharedCTypes(BaseTestCase):
2013
2014 ALLOWED_TYPES = ('processes',)
2015
Antoine Pitrou55d935a2010-11-22 16:35:57 +00002016 def setUp(self):
2017 if not HAS_SHAREDCTYPES:
2018 self.skipTest("requires multiprocessing.sharedctypes")
2019
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002020 @classmethod
2021 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002022 x.value *= 2
2023 y.value *= 2
2024 foo.x *= 2
2025 foo.y *= 2
2026 string.value *= 2
2027 for i in range(len(arr)):
2028 arr[i] *= 2
2029
2030 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002031 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002032 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002033 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002034 arr = self.Array('d', range(10), lock=lock)
2035 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00002036 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002037
2038 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002039 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002040 p.start()
2041 p.join()
2042
2043 self.assertEqual(x.value, 14)
2044 self.assertAlmostEqual(y.value, 2.0/3.0)
2045 self.assertEqual(foo.x, 6)
2046 self.assertAlmostEqual(foo.y, 4.0)
2047 for i in range(10):
2048 self.assertAlmostEqual(arr[i], i*2)
2049 self.assertEqual(string.value, latin('hellohello'))
2050
2051 def test_synchronize(self):
2052 self.test_sharedctypes(lock=True)
2053
2054 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002055 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00002056 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002057 foo.x = 0
2058 foo.y = 0
2059 self.assertEqual(bar.x, 2)
2060 self.assertAlmostEqual(bar.y, 5.0)
2061
2062#
2063#
2064#
2065
2066class _TestFinalize(BaseTestCase):
2067
2068 ALLOWED_TYPES = ('processes',)
2069
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002070 @classmethod
2071 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002072 class Foo(object):
2073 pass
2074
2075 a = Foo()
2076 util.Finalize(a, conn.send, args=('a',))
2077 del a # triggers callback for a
2078
2079 b = Foo()
2080 close_b = util.Finalize(b, conn.send, args=('b',))
2081 close_b() # triggers callback for b
2082 close_b() # does nothing because callback has already been called
2083 del b # does nothing because callback has already been called
2084
2085 c = Foo()
2086 util.Finalize(c, conn.send, args=('c',))
2087
2088 d10 = Foo()
2089 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2090
2091 d01 = Foo()
2092 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2093 d02 = Foo()
2094 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2095 d03 = Foo()
2096 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2097
2098 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2099
2100 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2101
Ezio Melottic2077b02011-03-16 12:34:31 +02002102 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002103 # garbage collecting locals
2104 util._exit_function()
2105 conn.close()
2106 os._exit(0)
2107
2108 def test_finalize(self):
2109 conn, child_conn = self.Pipe()
2110
2111 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002112 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002113 p.start()
2114 p.join()
2115
2116 result = [obj for obj in iter(conn.recv, 'STOP')]
2117 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2118
2119#
2120# Test that from ... import * works for each module
2121#
2122
2123class _TestImportStar(BaseTestCase):
2124
2125 ALLOWED_TYPES = ('processes',)
2126
2127 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002128 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002129 'multiprocessing', 'multiprocessing.connection',
2130 'multiprocessing.heap', 'multiprocessing.managers',
2131 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002132 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002133 ]
2134
Charles-François Natalif8413b22011-09-21 18:44:49 +02002135 if HAS_REDUCTION:
2136 modules.append('multiprocessing.reduction')
2137
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002138 if c_int is not None:
2139 # This module requires _ctypes
2140 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002141
2142 for name in modules:
2143 __import__(name)
2144 mod = sys.modules[name]
2145
2146 for attr in getattr(mod, '__all__', ()):
2147 self.assertTrue(
2148 hasattr(mod, attr),
2149 '%r does not have attribute %r' % (mod, attr)
2150 )
2151
2152#
2153# Quick test that logging works -- does not test logging output
2154#
2155
2156class _TestLogging(BaseTestCase):
2157
2158 ALLOWED_TYPES = ('processes',)
2159
2160 def test_enable_logging(self):
2161 logger = multiprocessing.get_logger()
2162 logger.setLevel(util.SUBWARNING)
2163 self.assertTrue(logger is not None)
2164 logger.debug('this will not be printed')
2165 logger.info('nor will this')
2166 logger.setLevel(LOG_LEVEL)
2167
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002168 @classmethod
2169 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002170 logger = multiprocessing.get_logger()
2171 conn.send(logger.getEffectiveLevel())
2172
2173 def test_level(self):
2174 LEVEL1 = 32
2175 LEVEL2 = 37
2176
2177 logger = multiprocessing.get_logger()
2178 root_logger = logging.getLogger()
2179 root_level = root_logger.level
2180
2181 reader, writer = multiprocessing.Pipe(duplex=False)
2182
2183 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002184 p = self.Process(target=self._test_level, args=(writer,))
2185 p.daemon = True
2186 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002187 self.assertEqual(LEVEL1, reader.recv())
2188
2189 logger.setLevel(logging.NOTSET)
2190 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002191 p = self.Process(target=self._test_level, args=(writer,))
2192 p.daemon = True
2193 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002194 self.assertEqual(LEVEL2, reader.recv())
2195
2196 root_logger.setLevel(root_level)
2197 logger.setLevel(level=LOG_LEVEL)
2198
Jesse Noller814d02d2009-11-21 14:38:23 +00002199
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002200# class _TestLoggingProcessName(BaseTestCase):
2201#
2202# def handle(self, record):
2203# assert record.processName == multiprocessing.current_process().name
2204# self.__handled = True
2205#
2206# def test_logging(self):
2207# handler = logging.Handler()
2208# handler.handle = self.handle
2209# self.__handled = False
2210# # Bypass getLogger() and side-effects
2211# logger = logging.getLoggerClass()(
2212# 'multiprocessing.test.TestLoggingProcessName')
2213# logger.addHandler(handler)
2214# logger.propagate = False
2215#
2216# logger.warn('foo')
2217# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002218
Benjamin Petersondfd79492008-06-13 19:13:39 +00002219#
Richard Oudkerkba482642013-02-26 12:37:07 +00002220# Check that Process.join() retries if os.waitpid() fails with EINTR
2221#
2222
2223class _TestPollEintr(BaseTestCase):
2224
2225 ALLOWED_TYPES = ('processes',)
2226
2227 @classmethod
2228 def _killer(cls, pid):
2229 time.sleep(0.5)
2230 os.kill(pid, signal.SIGUSR1)
2231
2232 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2233 def test_poll_eintr(self):
2234 got_signal = [False]
2235 def record(*args):
2236 got_signal[0] = True
2237 pid = os.getpid()
2238 oldhandler = signal.signal(signal.SIGUSR1, record)
2239 try:
2240 killer = self.Process(target=self._killer, args=(pid,))
2241 killer.start()
2242 p = self.Process(target=time.sleep, args=(1,))
2243 p.start()
2244 p.join()
2245 self.assertTrue(got_signal[0])
2246 self.assertEqual(p.exitcode, 0)
2247 killer.join()
2248 finally:
2249 signal.signal(signal.SIGUSR1, oldhandler)
2250
2251#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002252# Test to verify handle verification, see issue 3321
2253#
2254
2255class TestInvalidHandle(unittest.TestCase):
2256
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002257 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002258 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002259 conn = _multiprocessing.Connection(44977608)
2260 self.assertRaises(IOError, conn.poll)
2261 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002262
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002263#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002264# Functions used to create test cases from the base ones in this module
2265#
2266
2267def get_attributes(Source, names):
2268 d = {}
2269 for name in names:
2270 obj = getattr(Source, name)
2271 if type(obj) == type(get_attributes):
2272 obj = staticmethod(obj)
2273 d[name] = obj
2274 return d
2275
2276def create_test_cases(Mixin, type):
2277 result = {}
2278 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002279 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002280
2281 for name in glob.keys():
2282 if name.startswith('_Test'):
2283 base = glob[name]
2284 if type in base.ALLOWED_TYPES:
2285 newname = 'With' + Type + name[1:]
2286 class Temp(base, unittest.TestCase, Mixin):
2287 pass
2288 result[newname] = Temp
2289 Temp.__name__ = newname
2290 Temp.__module__ = Mixin.__module__
2291 return result
2292
2293#
2294# Create test cases
2295#
2296
2297class ProcessesMixin(object):
2298 TYPE = 'processes'
2299 Process = multiprocessing.Process
2300 locals().update(get_attributes(multiprocessing, (
2301 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2302 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2303 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002304 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002305 )))
2306
2307testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2308globals().update(testcases_processes)
2309
2310
2311class ManagerMixin(object):
2312 TYPE = 'manager'
2313 Process = multiprocessing.Process
2314 manager = object.__new__(multiprocessing.managers.SyncManager)
2315 locals().update(get_attributes(manager, (
2316 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2317 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002318 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002319 )))
2320
2321testcases_manager = create_test_cases(ManagerMixin, type='manager')
2322globals().update(testcases_manager)
2323
2324
2325class ThreadsMixin(object):
2326 TYPE = 'threads'
2327 Process = multiprocessing.dummy.Process
2328 locals().update(get_attributes(multiprocessing.dummy, (
2329 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2330 'Condition', 'Event', 'Value', 'Array', 'current_process',
2331 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002332 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002333 )))
2334
2335testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2336globals().update(testcases_threads)
2337
Neal Norwitz0c519b32008-08-25 01:50:24 +00002338class OtherTest(unittest.TestCase):
2339 # TODO: add more tests for deliver/answer challenge.
2340 def test_deliver_challenge_auth_failure(self):
2341 class _FakeConnection(object):
2342 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002343 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002344 def send_bytes(self, data):
2345 pass
2346 self.assertRaises(multiprocessing.AuthenticationError,
2347 multiprocessing.connection.deliver_challenge,
2348 _FakeConnection(), b'abc')
2349
2350 def test_answer_challenge_auth_failure(self):
2351 class _FakeConnection(object):
2352 def __init__(self):
2353 self.count = 0
2354 def recv_bytes(self, size):
2355 self.count += 1
2356 if self.count == 1:
2357 return multiprocessing.connection.CHALLENGE
2358 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002359 return b'something bogus'
2360 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002361 def send_bytes(self, data):
2362 pass
2363 self.assertRaises(multiprocessing.AuthenticationError,
2364 multiprocessing.connection.answer_challenge,
2365 _FakeConnection(), b'abc')
2366
Jesse Noller7152f6d2009-04-02 05:17:26 +00002367#
2368# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2369#
2370
2371def initializer(ns):
2372 ns.test += 1
2373
2374class TestInitializers(unittest.TestCase):
2375 def setUp(self):
2376 self.mgr = multiprocessing.Manager()
2377 self.ns = self.mgr.Namespace()
2378 self.ns.test = 0
2379
2380 def tearDown(self):
2381 self.mgr.shutdown()
2382
2383 def test_manager_initializer(self):
2384 m = multiprocessing.managers.SyncManager()
2385 self.assertRaises(TypeError, m.start, 1)
2386 m.start(initializer, (self.ns,))
2387 self.assertEqual(self.ns.test, 1)
2388 m.shutdown()
2389
2390 def test_pool_initializer(self):
2391 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2392 p = multiprocessing.Pool(1, initializer, (self.ns,))
2393 p.close()
2394 p.join()
2395 self.assertEqual(self.ns.test, 1)
2396
Jesse Noller1b90efb2009-06-30 17:11:52 +00002397#
2398# Issue 5155, 5313, 5331: Test process in processes
2399# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2400#
2401
Richard Oudkerkc5496072013-09-29 17:10:40 +01002402def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002403 try:
2404 item = q.get(block=False)
2405 except Queue.Empty:
2406 pass
2407
Richard Oudkerkc5496072013-09-29 17:10:40 +01002408def _test_process(q):
2409 queue = multiprocessing.Queue()
2410 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2411 subProc.daemon = True
2412 subProc.start()
2413 subProc.join()
2414
Jesse Noller1b90efb2009-06-30 17:11:52 +00002415def _afunc(x):
2416 return x*x
2417
2418def pool_in_process():
2419 pool = multiprocessing.Pool(processes=4)
2420 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2421
2422class _file_like(object):
2423 def __init__(self, delegate):
2424 self._delegate = delegate
2425 self._pid = None
2426
2427 @property
2428 def cache(self):
2429 pid = os.getpid()
2430 # There are no race conditions since fork keeps only the running thread
2431 if pid != self._pid:
2432 self._pid = pid
2433 self._cache = []
2434 return self._cache
2435
2436 def write(self, data):
2437 self.cache.append(data)
2438
2439 def flush(self):
2440 self._delegate.write(''.join(self.cache))
2441 self._cache = []
2442
2443class TestStdinBadfiledescriptor(unittest.TestCase):
2444
2445 def test_queue_in_process(self):
2446 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002447 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002448 proc.start()
2449 proc.join()
2450
2451 def test_pool_in_process(self):
2452 p = multiprocessing.Process(target=pool_in_process)
2453 p.start()
2454 p.join()
2455
2456 def test_flushing(self):
2457 sio = StringIO()
2458 flike = _file_like(sio)
2459 flike.write('foo')
2460 proc = multiprocessing.Process(target=lambda: flike.flush())
2461 flike.flush()
2462 assert sio.getvalue() == 'foo'
2463
Richard Oudkerke4b99382012-07-27 14:05:46 +01002464#
2465# Test interaction with socket timeouts - see Issue #6056
2466#
2467
2468class TestTimeouts(unittest.TestCase):
2469 @classmethod
2470 def _test_timeout(cls, child, address):
2471 time.sleep(1)
2472 child.send(123)
2473 child.close()
2474 conn = multiprocessing.connection.Client(address)
2475 conn.send(456)
2476 conn.close()
2477
2478 def test_timeout(self):
2479 old_timeout = socket.getdefaulttimeout()
2480 try:
2481 socket.setdefaulttimeout(0.1)
2482 parent, child = multiprocessing.Pipe(duplex=True)
2483 l = multiprocessing.connection.Listener(family='AF_INET')
2484 p = multiprocessing.Process(target=self._test_timeout,
2485 args=(child, l.address))
2486 p.start()
2487 child.close()
2488 self.assertEqual(parent.recv(), 123)
2489 parent.close()
2490 conn = l.accept()
2491 self.assertEqual(conn.recv(), 456)
2492 conn.close()
2493 l.close()
2494 p.join(10)
2495 finally:
2496 socket.setdefaulttimeout(old_timeout)
2497
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002498#
2499# Test what happens with no "if __name__ == '__main__'"
2500#
2501
2502class TestNoForkBomb(unittest.TestCase):
2503 def test_noforkbomb(self):
2504 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2505 if WIN32:
2506 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002507 self.assertEqual(out, '')
2508 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002509 else:
2510 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002511 self.assertEqual(out.rstrip(), '123')
2512 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002513
2514#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002515# Issue 12098: check sys.flags of child matches that for parent
2516#
2517
2518class TestFlags(unittest.TestCase):
2519 @classmethod
2520 def run_in_grandchild(cls, conn):
2521 conn.send(tuple(sys.flags))
2522
2523 @classmethod
2524 def run_in_child(cls):
2525 import json
2526 r, w = multiprocessing.Pipe(duplex=False)
2527 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2528 p.start()
2529 grandchild_flags = r.recv()
2530 p.join()
2531 r.close()
2532 w.close()
2533 flags = (tuple(sys.flags), grandchild_flags)
2534 print(json.dumps(flags))
2535
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002536 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002537 def test_flags(self):
2538 import json, subprocess
2539 # start child process using unusual flags
2540 prog = ('from test.test_multiprocessing import TestFlags; ' +
2541 'TestFlags.run_in_child()')
2542 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002543 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002544 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2545 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002546
2547#
2548# Issue #17555: ForkAwareThreadLock
2549#
2550
2551class TestForkAwareThreadLock(unittest.TestCase):
2552 # We recurisvely start processes. Issue #17555 meant that the
2553 # after fork registry would get duplicate entries for the same
2554 # lock. The size of the registry at generation n was ~2**n.
2555
2556 @classmethod
2557 def child(cls, n, conn):
2558 if n > 1:
2559 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2560 p.start()
2561 p.join()
2562 else:
2563 conn.send(len(util._afterfork_registry))
2564 conn.close()
2565
2566 def test_lock(self):
2567 r, w = multiprocessing.Pipe(False)
2568 l = util.ForkAwareThreadLock()
2569 old_size = len(util._afterfork_registry)
2570 p = multiprocessing.Process(target=self.child, args=(5, w))
2571 p.start()
2572 new_size = r.recv()
2573 p.join()
2574 self.assertLessEqual(new_size, old_size)
2575
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002576#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002577# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2578#
2579
2580class TestIgnoreEINTR(unittest.TestCase):
2581
2582 @classmethod
2583 def _test_ignore(cls, conn):
2584 def handler(signum, frame):
2585 pass
2586 signal.signal(signal.SIGUSR1, handler)
2587 conn.send('ready')
2588 x = conn.recv()
2589 conn.send(x)
2590 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2591
2592 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2593 def test_ignore(self):
2594 conn, child_conn = multiprocessing.Pipe()
2595 try:
2596 p = multiprocessing.Process(target=self._test_ignore,
2597 args=(child_conn,))
2598 p.daemon = True
2599 p.start()
2600 child_conn.close()
2601 self.assertEqual(conn.recv(), 'ready')
2602 time.sleep(0.1)
2603 os.kill(p.pid, signal.SIGUSR1)
2604 time.sleep(0.1)
2605 conn.send(1234)
2606 self.assertEqual(conn.recv(), 1234)
2607 time.sleep(0.1)
2608 os.kill(p.pid, signal.SIGUSR1)
2609 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2610 time.sleep(0.1)
2611 p.join()
2612 finally:
2613 conn.close()
2614
2615 @classmethod
2616 def _test_ignore_listener(cls, conn):
2617 def handler(signum, frame):
2618 pass
2619 signal.signal(signal.SIGUSR1, handler)
2620 l = multiprocessing.connection.Listener()
2621 conn.send(l.address)
2622 a = l.accept()
2623 a.send('welcome')
2624
2625 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2626 def test_ignore_listener(self):
2627 conn, child_conn = multiprocessing.Pipe()
2628 try:
2629 p = multiprocessing.Process(target=self._test_ignore_listener,
2630 args=(child_conn,))
2631 p.daemon = True
2632 p.start()
2633 child_conn.close()
2634 address = conn.recv()
2635 time.sleep(0.1)
2636 os.kill(p.pid, signal.SIGUSR1)
2637 time.sleep(0.1)
2638 client = multiprocessing.connection.Client(address)
2639 self.assertEqual(client.recv(), 'welcome')
2640 p.join()
2641 finally:
2642 conn.close()
2643
2644#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002645#
2646#
2647
Jesse Noller1b90efb2009-06-30 17:11:52 +00002648testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002649 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002650 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002651
Benjamin Petersondfd79492008-06-13 19:13:39 +00002652#
2653#
2654#
2655
2656def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002657 if sys.platform.startswith("linux"):
2658 try:
2659 lock = multiprocessing.RLock()
2660 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002661 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002662
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002663 check_enough_semaphores()
2664
Benjamin Petersondfd79492008-06-13 19:13:39 +00002665 if run is None:
2666 from test.test_support import run_unittest as run
2667
2668 util.get_temp_dir() # creates temp directory for use by all processes
2669
2670 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2671
Jesse Noller146b7ab2008-07-02 16:44:09 +00002672 ProcessesMixin.pool = multiprocessing.Pool(4)
2673 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2674 ManagerMixin.manager.__init__()
2675 ManagerMixin.manager.start()
2676 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002677
2678 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002679 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2680 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002681 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2682 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002683 )
2684
2685 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2686 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002687 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2688 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002689 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002690 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002691 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002692 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2693 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2694 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002695 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002696
Jesse Noller146b7ab2008-07-02 16:44:09 +00002697 ThreadsMixin.pool.terminate()
2698 ProcessesMixin.pool.terminate()
2699 ManagerMixin.pool.terminate()
2700 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002701
Jesse Noller146b7ab2008-07-02 16:44:09 +00002702 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002703
2704def main():
2705 test_main(unittest.TextTestRunner(verbosity=2).run)
2706
2707if __name__ == '__main__':
2708 main()