blob: 4d39501ccb073ab1c4816ffb6c6c4df2b876a99e [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Antoine Pitrou5084ff72017-03-24 16:03:46 +010017import weakref
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010018import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
178class _TestProcess(BaseTestCase):
179
180 ALLOWED_TYPES = ('processes', 'threads')
181
182 def test_current(self):
183 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600184 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000185
186 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000191 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEqual(current.ident, os.getpid())
194 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 q.put(args)
200 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000223 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225
226 p.start()
227
Ezio Melotti2623a372010-11-21 13:34:58 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
Ezio Melotti2623a372010-11-21 13:34:58 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.join()
240
Ezio Melotti2623a372010-11-21 13:34:58 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600251 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000254 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000258 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
271
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288
Jesus Cea6f6016b2011-09-09 20:26:57 +0200289 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000290 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000291 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000292
293 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000294 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000295
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000296 @classmethod
297 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298 from multiprocessing import forking
299 wconn.send(id)
300 if len(id) < 2:
301 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 p = cls.Process(
303 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 )
305 p.start()
306 p.join()
307
308 def test_recursion(self):
309 rconn, wconn = self.Pipe(duplex=False)
310 self._test_recursion(wconn, [])
311
312 time.sleep(DELTA)
313 result = []
314 while rconn.poll():
315 result.append(rconn.recv())
316
317 expected = [
318 [],
319 [0],
320 [0, 0],
321 [0, 1],
322 [1],
323 [1, 0],
324 [1, 1]
325 ]
326 self.assertEqual(result, expected)
327
Richard Oudkerk2182e052012-06-06 19:01:14 +0100328 @classmethod
329 def _test_sys_exit(cls, reason, testfn):
330 sys.stderr = open(testfn, 'w')
331 sys.exit(reason)
332
333 def test_sys_exit(self):
334 # See Issue 13854
335 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600336 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100337
338 testfn = test_support.TESTFN
339 self.addCleanup(test_support.unlink, testfn)
340
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000341 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100342 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
343 p.daemon = True
344 p.start()
345 p.join(5)
346 self.assertEqual(p.exitcode, code)
347
348 with open(testfn, 'r') as f:
349 self.assertEqual(f.read().rstrip(), str(reason))
350
351 for reason in (True, False, 8):
352 p = self.Process(target=sys.exit, args=(reason,))
353 p.daemon = True
354 p.start()
355 p.join(5)
356 self.assertEqual(p.exitcode, reason)
357
Benjamin Petersondfd79492008-06-13 19:13:39 +0000358#
359#
360#
361
362class _UpperCaser(multiprocessing.Process):
363
364 def __init__(self):
365 multiprocessing.Process.__init__(self)
366 self.child_conn, self.parent_conn = multiprocessing.Pipe()
367
368 def run(self):
369 self.parent_conn.close()
370 for s in iter(self.child_conn.recv, None):
371 self.child_conn.send(s.upper())
372 self.child_conn.close()
373
374 def submit(self, s):
375 assert type(s) is str
376 self.parent_conn.send(s)
377 return self.parent_conn.recv()
378
379 def stop(self):
380 self.parent_conn.send(None)
381 self.parent_conn.close()
382 self.child_conn.close()
383
384class _TestSubclassingProcess(BaseTestCase):
385
386 ALLOWED_TYPES = ('processes',)
387
388 def test_subclassing(self):
389 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200390 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000391 uppercaser.start()
392 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
393 self.assertEqual(uppercaser.submit('world'), 'WORLD')
394 uppercaser.stop()
395 uppercaser.join()
396
397#
398#
399#
400
401def queue_empty(q):
402 if hasattr(q, 'empty'):
403 return q.empty()
404 else:
405 return q.qsize() == 0
406
407def queue_full(q, maxsize):
408 if hasattr(q, 'full'):
409 return q.full()
410 else:
411 return q.qsize() == maxsize
412
413
414class _TestQueue(BaseTestCase):
415
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
420 for i in range(6):
421 queue.get()
422 parent_can_continue.set()
423
424 def test_put(self):
425 MAXSIZE = 6
426 queue = self.Queue(maxsize=MAXSIZE)
427 child_can_start = self.Event()
428 parent_can_continue = self.Event()
429
430 proc = self.Process(
431 target=self._test_put,
432 args=(queue, child_can_start, parent_can_continue)
433 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000434 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000435 proc.start()
436
437 self.assertEqual(queue_empty(queue), True)
438 self.assertEqual(queue_full(queue, MAXSIZE), False)
439
440 queue.put(1)
441 queue.put(2, True)
442 queue.put(3, True, None)
443 queue.put(4, False)
444 queue.put(5, False, None)
445 queue.put_nowait(6)
446
447 # the values may be in buffer but not yet in pipe so sleep a bit
448 time.sleep(DELTA)
449
450 self.assertEqual(queue_empty(queue), False)
451 self.assertEqual(queue_full(queue, MAXSIZE), True)
452
453 put = TimingWrapper(queue.put)
454 put_nowait = TimingWrapper(queue.put_nowait)
455
456 self.assertRaises(Queue.Full, put, 7, False)
457 self.assertTimingAlmostEqual(put.elapsed, 0)
458
459 self.assertRaises(Queue.Full, put, 7, False, None)
460 self.assertTimingAlmostEqual(put.elapsed, 0)
461
462 self.assertRaises(Queue.Full, put_nowait, 7)
463 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
464
465 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
466 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
467
468 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
469 self.assertTimingAlmostEqual(put.elapsed, 0)
470
471 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
472 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
473
474 child_can_start.set()
475 parent_can_continue.wait()
476
477 self.assertEqual(queue_empty(queue), True)
478 self.assertEqual(queue_full(queue, MAXSIZE), False)
479
480 proc.join()
481
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000482 @classmethod
483 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000484 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000485 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000486 queue.put(2)
487 queue.put(3)
488 queue.put(4)
489 queue.put(5)
490 parent_can_continue.set()
491
492 def test_get(self):
493 queue = self.Queue()
494 child_can_start = self.Event()
495 parent_can_continue = self.Event()
496
497 proc = self.Process(
498 target=self._test_get,
499 args=(queue, child_can_start, parent_can_continue)
500 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000501 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000502 proc.start()
503
504 self.assertEqual(queue_empty(queue), True)
505
506 child_can_start.set()
507 parent_can_continue.wait()
508
509 time.sleep(DELTA)
510 self.assertEqual(queue_empty(queue), False)
511
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000512 # Hangs unexpectedly, remove for now
513 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000514 self.assertEqual(queue.get(True, None), 2)
515 self.assertEqual(queue.get(True), 3)
516 self.assertEqual(queue.get(timeout=1), 4)
517 self.assertEqual(queue.get_nowait(), 5)
518
519 self.assertEqual(queue_empty(queue), True)
520
521 get = TimingWrapper(queue.get)
522 get_nowait = TimingWrapper(queue.get_nowait)
523
524 self.assertRaises(Queue.Empty, get, False)
525 self.assertTimingAlmostEqual(get.elapsed, 0)
526
527 self.assertRaises(Queue.Empty, get, False, None)
528 self.assertTimingAlmostEqual(get.elapsed, 0)
529
530 self.assertRaises(Queue.Empty, get_nowait)
531 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
532
533 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
534 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
535
536 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
537 self.assertTimingAlmostEqual(get.elapsed, 0)
538
539 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
540 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
541
542 proc.join()
543
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000544 @classmethod
545 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000546 for i in range(10, 20):
547 queue.put(i)
548 # note that at this point the items may only be buffered, so the
549 # process cannot shutdown until the feeder thread has finished
550 # pushing items onto the pipe.
551
552 def test_fork(self):
553 # Old versions of Queue would fail to create a new feeder
554 # thread for a forked process if the original process had its
555 # own feeder thread. This test checks that this no longer
556 # happens.
557
558 queue = self.Queue()
559
560 # put items on queue so that main process starts a feeder thread
561 for i in range(10):
562 queue.put(i)
563
564 # wait to make sure thread starts before we fork a new process
565 time.sleep(DELTA)
566
567 # fork process
568 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200569 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000570 p.start()
571
572 # check that all expected items are in the queue
573 for i in range(20):
574 self.assertEqual(queue.get(), i)
575 self.assertRaises(Queue.Empty, queue.get, False)
576
577 p.join()
578
579 def test_qsize(self):
580 q = self.Queue()
581 try:
582 self.assertEqual(q.qsize(), 0)
583 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600584 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000585 q.put(1)
586 self.assertEqual(q.qsize(), 1)
587 q.put(5)
588 self.assertEqual(q.qsize(), 2)
589 q.get()
590 self.assertEqual(q.qsize(), 1)
591 q.get()
592 self.assertEqual(q.qsize(), 0)
593
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000594 @classmethod
595 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000596 for obj in iter(q.get, None):
597 time.sleep(DELTA)
598 q.task_done()
599
600 def test_task_done(self):
601 queue = self.JoinableQueue()
602
603 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000604 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000605
606 workers = [self.Process(target=self._test_task_done, args=(queue,))
607 for i in xrange(4)]
608
609 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200610 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000611 p.start()
612
613 for i in xrange(10):
614 queue.put(i)
615
616 queue.join()
617
618 for p in workers:
619 queue.put(None)
620
621 for p in workers:
622 p.join()
623
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200624 def test_no_import_lock_contention(self):
625 with test_support.temp_cwd():
626 module_name = 'imported_by_an_imported_module'
627 with open(module_name + '.py', 'w') as f:
628 f.write("""if 1:
629 import multiprocessing
630
631 q = multiprocessing.Queue()
632 q.put('knock knock')
633 q.get(timeout=3)
634 q.close()
635 """)
636
637 with test_support.DirsOnSysPath(os.getcwd()):
638 try:
639 __import__(module_name)
640 except Queue.Empty:
641 self.fail("Probable regression on import lock contention;"
642 " see Issue #22853")
643
Benjamin Petersondfd79492008-06-13 19:13:39 +0000644#
645#
646#
647
648class _TestLock(BaseTestCase):
649
650 def test_lock(self):
651 lock = self.Lock()
652 self.assertEqual(lock.acquire(), True)
653 self.assertEqual(lock.acquire(False), False)
654 self.assertEqual(lock.release(), None)
655 self.assertRaises((ValueError, threading.ThreadError), lock.release)
656
657 def test_rlock(self):
658 lock = self.RLock()
659 self.assertEqual(lock.acquire(), True)
660 self.assertEqual(lock.acquire(), True)
661 self.assertEqual(lock.acquire(), True)
662 self.assertEqual(lock.release(), None)
663 self.assertEqual(lock.release(), None)
664 self.assertEqual(lock.release(), None)
665 self.assertRaises((AssertionError, RuntimeError), lock.release)
666
Jesse Noller82eb5902009-03-30 23:29:31 +0000667 def test_lock_context(self):
668 with self.Lock():
669 pass
670
Benjamin Petersondfd79492008-06-13 19:13:39 +0000671
672class _TestSemaphore(BaseTestCase):
673
674 def _test_semaphore(self, sem):
675 self.assertReturnsIfImplemented(2, get_value, sem)
676 self.assertEqual(sem.acquire(), True)
677 self.assertReturnsIfImplemented(1, get_value, sem)
678 self.assertEqual(sem.acquire(), True)
679 self.assertReturnsIfImplemented(0, get_value, sem)
680 self.assertEqual(sem.acquire(False), False)
681 self.assertReturnsIfImplemented(0, get_value, sem)
682 self.assertEqual(sem.release(), None)
683 self.assertReturnsIfImplemented(1, get_value, sem)
684 self.assertEqual(sem.release(), None)
685 self.assertReturnsIfImplemented(2, get_value, sem)
686
687 def test_semaphore(self):
688 sem = self.Semaphore(2)
689 self._test_semaphore(sem)
690 self.assertEqual(sem.release(), None)
691 self.assertReturnsIfImplemented(3, get_value, sem)
692 self.assertEqual(sem.release(), None)
693 self.assertReturnsIfImplemented(4, get_value, sem)
694
695 def test_bounded_semaphore(self):
696 sem = self.BoundedSemaphore(2)
697 self._test_semaphore(sem)
698 # Currently fails on OS/X
699 #if HAVE_GETVALUE:
700 # self.assertRaises(ValueError, sem.release)
701 # self.assertReturnsIfImplemented(2, get_value, sem)
702
703 def test_timeout(self):
704 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600705 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000706
707 sem = self.Semaphore(0)
708 acquire = TimingWrapper(sem.acquire)
709
710 self.assertEqual(acquire(False), False)
711 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
712
713 self.assertEqual(acquire(False, None), False)
714 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
715
716 self.assertEqual(acquire(False, TIMEOUT1), False)
717 self.assertTimingAlmostEqual(acquire.elapsed, 0)
718
719 self.assertEqual(acquire(True, TIMEOUT2), False)
720 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
721
722 self.assertEqual(acquire(timeout=TIMEOUT3), False)
723 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
724
725
726class _TestCondition(BaseTestCase):
727
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000728 @classmethod
729 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000730 cond.acquire()
731 sleeping.release()
732 cond.wait(timeout)
733 woken.release()
734 cond.release()
735
736 def check_invariant(self, cond):
737 # this is only supposed to succeed when there are no sleepers
738 if self.TYPE == 'processes':
739 try:
740 sleepers = (cond._sleeping_count.get_value() -
741 cond._woken_count.get_value())
742 self.assertEqual(sleepers, 0)
743 self.assertEqual(cond._wait_semaphore.get_value(), 0)
744 except NotImplementedError:
745 pass
746
747 def test_notify(self):
748 cond = self.Condition()
749 sleeping = self.Semaphore(0)
750 woken = self.Semaphore(0)
751
752 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000753 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000754 p.start()
755
756 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000757 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000758 p.start()
759
760 # wait for both children to start sleeping
761 sleeping.acquire()
762 sleeping.acquire()
763
764 # check no process/thread has woken up
765 time.sleep(DELTA)
766 self.assertReturnsIfImplemented(0, get_value, woken)
767
768 # wake up one process/thread
769 cond.acquire()
770 cond.notify()
771 cond.release()
772
773 # check one process/thread has woken up
774 time.sleep(DELTA)
775 self.assertReturnsIfImplemented(1, get_value, woken)
776
777 # wake up another
778 cond.acquire()
779 cond.notify()
780 cond.release()
781
782 # check other has woken up
783 time.sleep(DELTA)
784 self.assertReturnsIfImplemented(2, get_value, woken)
785
786 # check state is not mucked up
787 self.check_invariant(cond)
788 p.join()
789
790 def test_notify_all(self):
791 cond = self.Condition()
792 sleeping = self.Semaphore(0)
793 woken = self.Semaphore(0)
794
795 # start some threads/processes which will timeout
796 for i in range(3):
797 p = self.Process(target=self.f,
798 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000799 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000800 p.start()
801
802 t = threading.Thread(target=self.f,
803 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000804 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000805 t.start()
806
807 # wait for them all to sleep
808 for i in xrange(6):
809 sleeping.acquire()
810
811 # check they have all timed out
812 for i in xrange(6):
813 woken.acquire()
814 self.assertReturnsIfImplemented(0, get_value, woken)
815
816 # check state is not mucked up
817 self.check_invariant(cond)
818
819 # start some more threads/processes
820 for i in range(3):
821 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000822 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000823 p.start()
824
825 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000826 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000827 t.start()
828
829 # wait for them to all sleep
830 for i in xrange(6):
831 sleeping.acquire()
832
833 # check no process/thread has woken up
834 time.sleep(DELTA)
835 self.assertReturnsIfImplemented(0, get_value, woken)
836
837 # wake them all up
838 cond.acquire()
839 cond.notify_all()
840 cond.release()
841
842 # check they have all woken
Victor Stinner9d1983b2017-05-15 17:32:14 +0200843 for i in range(10):
844 try:
845 if get_value(woken) == 6:
846 break
847 except NotImplementedError:
848 break
849 time.sleep(DELTA)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000850 self.assertReturnsIfImplemented(6, get_value, woken)
851
852 # check state is not mucked up
853 self.check_invariant(cond)
854
855 def test_timeout(self):
856 cond = self.Condition()
857 wait = TimingWrapper(cond.wait)
858 cond.acquire()
859 res = wait(TIMEOUT1)
860 cond.release()
861 self.assertEqual(res, None)
862 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
863
864
865class _TestEvent(BaseTestCase):
866
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000867 @classmethod
868 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000869 time.sleep(TIMEOUT2)
870 event.set()
871
872 def test_event(self):
873 event = self.Event()
874 wait = TimingWrapper(event.wait)
875
Ezio Melottic2077b02011-03-16 12:34:31 +0200876 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000877 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000878 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000879
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000880 # Removed, threading.Event.wait() will return the value of the __flag
881 # instead of None. API Shear with the semaphore backed mp.Event
882 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000883 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000884 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000885 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
886
887 event.set()
888
889 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000890 self.assertEqual(event.is_set(), True)
891 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000892 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000893 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000894 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
895 # self.assertEqual(event.is_set(), True)
896
897 event.clear()
898
899 #self.assertEqual(event.is_set(), False)
900
Jesus Cea6f6016b2011-09-09 20:26:57 +0200901 p = self.Process(target=self._test_event, args=(event,))
902 p.daemon = True
903 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000904 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000905
906#
907#
908#
909
910class _TestValue(BaseTestCase):
911
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000912 ALLOWED_TYPES = ('processes',)
913
Benjamin Petersondfd79492008-06-13 19:13:39 +0000914 codes_values = [
915 ('i', 4343, 24234),
916 ('d', 3.625, -4.25),
917 ('h', -232, 234),
918 ('c', latin('x'), latin('y'))
919 ]
920
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000921 def setUp(self):
922 if not HAS_SHAREDCTYPES:
923 self.skipTest("requires multiprocessing.sharedctypes")
924
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000925 @classmethod
926 def _test(cls, values):
927 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000928 sv.value = cv[2]
929
930
931 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000932 if raw:
933 values = [self.RawValue(code, value)
934 for code, value, _ in self.codes_values]
935 else:
936 values = [self.Value(code, value)
937 for code, value, _ in self.codes_values]
938
939 for sv, cv in zip(values, self.codes_values):
940 self.assertEqual(sv.value, cv[1])
941
942 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200943 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000944 proc.start()
945 proc.join()
946
947 for sv, cv in zip(values, self.codes_values):
948 self.assertEqual(sv.value, cv[2])
949
950 def test_rawvalue(self):
951 self.test_value(raw=True)
952
953 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000954 val1 = self.Value('i', 5)
955 lock1 = val1.get_lock()
956 obj1 = val1.get_obj()
957
958 val2 = self.Value('i', 5, lock=None)
959 lock2 = val2.get_lock()
960 obj2 = val2.get_obj()
961
962 lock = self.Lock()
963 val3 = self.Value('i', 5, lock=lock)
964 lock3 = val3.get_lock()
965 obj3 = val3.get_obj()
966 self.assertEqual(lock, lock3)
967
Jesse Noller6ab22152009-01-18 02:45:38 +0000968 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000969 self.assertFalse(hasattr(arr4, 'get_lock'))
970 self.assertFalse(hasattr(arr4, 'get_obj'))
971
Jesse Noller6ab22152009-01-18 02:45:38 +0000972 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
973
974 arr5 = self.RawValue('i', 5)
975 self.assertFalse(hasattr(arr5, 'get_lock'))
976 self.assertFalse(hasattr(arr5, 'get_obj'))
977
Benjamin Petersondfd79492008-06-13 19:13:39 +0000978
979class _TestArray(BaseTestCase):
980
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000981 ALLOWED_TYPES = ('processes',)
982
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000983 @classmethod
984 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000985 for i in range(1, len(seq)):
986 seq[i] += seq[i-1]
987
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000988 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000989 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000990 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
991 if raw:
992 arr = self.RawArray('i', seq)
993 else:
994 arr = self.Array('i', seq)
995
996 self.assertEqual(len(arr), len(seq))
997 self.assertEqual(arr[3], seq[3])
998 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
999
1000 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1001
1002 self.assertEqual(list(arr[:]), seq)
1003
1004 self.f(seq)
1005
1006 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001007 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001008 p.start()
1009 p.join()
1010
1011 self.assertEqual(list(arr[:]), seq)
1012
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001013 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001014 def test_array_from_size(self):
1015 size = 10
1016 # Test for zeroing (see issue #11675).
1017 # The repetition below strengthens the test by increasing the chances
1018 # of previously allocated non-zero memory being used for the new array
1019 # on the 2nd and 3rd loops.
1020 for _ in range(3):
1021 arr = self.Array('i', size)
1022 self.assertEqual(len(arr), size)
1023 self.assertEqual(list(arr), [0] * size)
1024 arr[:] = range(10)
1025 self.assertEqual(list(arr), range(10))
1026 del arr
1027
1028 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001029 def test_rawarray(self):
1030 self.test_array(raw=True)
1031
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001032 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001033 def test_array_accepts_long(self):
1034 arr = self.Array('i', 10L)
1035 self.assertEqual(len(arr), 10)
1036 raw_arr = self.RawArray('i', 10L)
1037 self.assertEqual(len(raw_arr), 10)
1038
1039 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001040 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001041 arr1 = self.Array('i', range(10))
1042 lock1 = arr1.get_lock()
1043 obj1 = arr1.get_obj()
1044
1045 arr2 = self.Array('i', range(10), lock=None)
1046 lock2 = arr2.get_lock()
1047 obj2 = arr2.get_obj()
1048
1049 lock = self.Lock()
1050 arr3 = self.Array('i', range(10), lock=lock)
1051 lock3 = arr3.get_lock()
1052 obj3 = arr3.get_obj()
1053 self.assertEqual(lock, lock3)
1054
Jesse Noller6ab22152009-01-18 02:45:38 +00001055 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001056 self.assertFalse(hasattr(arr4, 'get_lock'))
1057 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001058 self.assertRaises(AttributeError,
1059 self.Array, 'i', range(10), lock='notalock')
1060
1061 arr5 = self.RawArray('i', range(10))
1062 self.assertFalse(hasattr(arr5, 'get_lock'))
1063 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001064
1065#
1066#
1067#
1068
1069class _TestContainers(BaseTestCase):
1070
1071 ALLOWED_TYPES = ('manager',)
1072
1073 def test_list(self):
1074 a = self.list(range(10))
1075 self.assertEqual(a[:], range(10))
1076
1077 b = self.list()
1078 self.assertEqual(b[:], [])
1079
1080 b.extend(range(5))
1081 self.assertEqual(b[:], range(5))
1082
1083 self.assertEqual(b[2], 2)
1084 self.assertEqual(b[2:10], [2,3,4])
1085
1086 b *= 2
1087 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1088
1089 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1090
1091 self.assertEqual(a[:], range(10))
1092
1093 d = [a, b]
1094 e = self.list(d)
1095 self.assertEqual(
1096 e[:],
1097 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1098 )
1099
1100 f = self.list([a])
1101 a.append('hello')
1102 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1103
1104 def test_dict(self):
1105 d = self.dict()
1106 indices = range(65, 70)
1107 for i in indices:
1108 d[i] = chr(i)
1109 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1110 self.assertEqual(sorted(d.keys()), indices)
1111 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1112 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1113
1114 def test_namespace(self):
1115 n = self.Namespace()
1116 n.name = 'Bob'
1117 n.job = 'Builder'
1118 n._hidden = 'hidden'
1119 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1120 del n.job
1121 self.assertEqual(str(n), "Namespace(name='Bob')")
1122 self.assertTrue(hasattr(n, 'name'))
1123 self.assertTrue(not hasattr(n, 'job'))
1124
1125#
1126#
1127#
1128
1129def sqr(x, wait=0.0):
1130 time.sleep(wait)
1131 return x*x
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001132
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001133def identity(x):
1134 return x
1135
1136class CountedObject(object):
1137 n_instances = 0
1138
1139 def __new__(cls):
1140 cls.n_instances += 1
1141 return object.__new__(cls)
1142
1143 def __del__(self):
1144 type(self).n_instances -= 1
1145
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001146class SayWhenError(ValueError): pass
1147
1148def exception_throwing_generator(total, when):
1149 for i in range(total):
1150 if i == when:
1151 raise SayWhenError("Somebody said when")
1152 yield i
1153
Benjamin Petersondfd79492008-06-13 19:13:39 +00001154class _TestPool(BaseTestCase):
1155
1156 def test_apply(self):
1157 papply = self.pool.apply
1158 self.assertEqual(papply(sqr, (5,)), sqr(5))
1159 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1160
1161 def test_map(self):
1162 pmap = self.pool.map
1163 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1164 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1165 map(sqr, range(100)))
1166
Richard Oudkerk21aad972013-10-28 23:02:22 +00001167 def test_map_unplicklable(self):
1168 # Issue #19425 -- failure to pickle should not cause a hang
1169 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001170 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001171 class A(object):
1172 def __reduce__(self):
1173 raise RuntimeError('cannot pickle')
1174 with self.assertRaises(RuntimeError):
1175 self.pool.map(sqr, [A()]*10)
1176
Jesse Noller7530e472009-07-16 14:23:04 +00001177 def test_map_chunksize(self):
1178 try:
1179 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1180 except multiprocessing.TimeoutError:
1181 self.fail("pool.map_async with chunksize stalled on null list")
1182
Benjamin Petersondfd79492008-06-13 19:13:39 +00001183 def test_async(self):
1184 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1185 get = TimingWrapper(res.get)
1186 self.assertEqual(get(), 49)
1187 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1188
1189 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001190 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001191 get = TimingWrapper(res.get)
1192 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1193 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1194
1195 def test_imap(self):
1196 it = self.pool.imap(sqr, range(10))
1197 self.assertEqual(list(it), map(sqr, range(10)))
1198
1199 it = self.pool.imap(sqr, range(10))
1200 for i in range(10):
1201 self.assertEqual(it.next(), i*i)
1202 self.assertRaises(StopIteration, it.next)
1203
1204 it = self.pool.imap(sqr, range(1000), chunksize=100)
1205 for i in range(1000):
1206 self.assertEqual(it.next(), i*i)
1207 self.assertRaises(StopIteration, it.next)
1208
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001209 def test_imap_handle_iterable_exception(self):
1210 if self.TYPE == 'manager':
1211 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1212
1213 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1214 for i in range(3):
1215 self.assertEqual(next(it), i*i)
1216 self.assertRaises(SayWhenError, it.next)
1217
1218 # SayWhenError seen at start of problematic chunk's results
1219 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1220 for i in range(6):
1221 self.assertEqual(next(it), i*i)
1222 self.assertRaises(SayWhenError, it.next)
1223 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1224 for i in range(4):
1225 self.assertEqual(next(it), i*i)
1226 self.assertRaises(SayWhenError, it.next)
1227
Benjamin Petersondfd79492008-06-13 19:13:39 +00001228 def test_imap_unordered(self):
1229 it = self.pool.imap_unordered(sqr, range(1000))
1230 self.assertEqual(sorted(it), map(sqr, range(1000)))
1231
1232 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1233 self.assertEqual(sorted(it), map(sqr, range(1000)))
1234
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001235 def test_imap_unordered_handle_iterable_exception(self):
1236 if self.TYPE == 'manager':
1237 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1238
1239 it = self.pool.imap_unordered(sqr,
1240 exception_throwing_generator(10, 3),
1241 1)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001242 expected_values = map(sqr, range(10))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001243 with self.assertRaises(SayWhenError):
1244 # imap_unordered makes it difficult to anticipate the SayWhenError
1245 for i in range(10):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001246 value = next(it)
1247 self.assertIn(value, expected_values)
1248 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001249
1250 it = self.pool.imap_unordered(sqr,
1251 exception_throwing_generator(20, 7),
1252 2)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001253 expected_values = map(sqr, range(20))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001254 with self.assertRaises(SayWhenError):
1255 for i in range(20):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001256 value = next(it)
1257 self.assertIn(value, expected_values)
1258 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001259
Benjamin Petersondfd79492008-06-13 19:13:39 +00001260 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001261 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1262 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1263
Benjamin Petersondfd79492008-06-13 19:13:39 +00001264 p = multiprocessing.Pool(3)
1265 self.assertEqual(3, len(p._pool))
1266 p.close()
1267 p.join()
1268
1269 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001270 p = self.Pool(4)
1271 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001272 time.sleep, [0.1 for i in range(10000)], chunksize=1
1273 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001274 p.terminate()
1275 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001276 join()
1277 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001278
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001279 def test_empty_iterable(self):
1280 # See Issue 12157
1281 p = self.Pool(1)
1282
1283 self.assertEqual(p.map(sqr, []), [])
1284 self.assertEqual(list(p.imap(sqr, [])), [])
1285 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1286 self.assertEqual(p.map_async(sqr, []).get(), [])
1287
1288 p.close()
1289 p.join()
1290
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001291 def test_release_task_refs(self):
1292 # Issue #29861: task arguments and results should not be kept
1293 # alive after we are done with them.
1294 objs = list(CountedObject() for i in range(10))
1295 refs = list(weakref.ref(o) for o in objs)
1296 self.pool.map(identity, objs)
1297
1298 del objs
Victor Stinnerfd6094c2017-05-05 09:47:11 +02001299 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001300 self.assertEqual(set(wr() for wr in refs), {None})
1301 # With a process pool, copies of the objects are returned, check
1302 # they were released too.
1303 self.assertEqual(CountedObject.n_instances, 0)
1304
1305
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001306def unpickleable_result():
1307 return lambda: 42
1308
1309class _TestPoolWorkerErrors(BaseTestCase):
1310 ALLOWED_TYPES = ('processes', )
1311
1312 def test_unpickleable_result(self):
1313 from multiprocessing.pool import MaybeEncodingError
1314 p = multiprocessing.Pool(2)
1315
1316 # Make sure we don't lose pool processes because of encoding errors.
1317 for iteration in range(20):
1318 res = p.apply_async(unpickleable_result)
1319 self.assertRaises(MaybeEncodingError, res.get)
1320
1321 p.close()
1322 p.join()
1323
Jesse Noller654ade32010-01-27 03:05:57 +00001324class _TestPoolWorkerLifetime(BaseTestCase):
1325
1326 ALLOWED_TYPES = ('processes', )
1327 def test_pool_worker_lifetime(self):
1328 p = multiprocessing.Pool(3, maxtasksperchild=10)
1329 self.assertEqual(3, len(p._pool))
1330 origworkerpids = [w.pid for w in p._pool]
1331 # Run many tasks so each worker gets replaced (hopefully)
1332 results = []
1333 for i in range(100):
1334 results.append(p.apply_async(sqr, (i, )))
1335 # Fetch the results and verify we got the right answers,
1336 # also ensuring all the tasks have completed.
1337 for (j, res) in enumerate(results):
1338 self.assertEqual(res.get(), sqr(j))
1339 # Refill the pool
1340 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001341 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001342 # (countdown * DELTA = 5 seconds max startup process time)
1343 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001344 while countdown and not all(w.is_alive() for w in p._pool):
1345 countdown -= 1
1346 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001347 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001348 # All pids should be assigned. See issue #7805.
1349 self.assertNotIn(None, origworkerpids)
1350 self.assertNotIn(None, finalworkerpids)
1351 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001352 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1353 p.close()
1354 p.join()
1355
Charles-François Natali46f990e2011-10-24 18:43:51 +02001356 def test_pool_worker_lifetime_early_close(self):
1357 # Issue #10332: closing a pool whose workers have limited lifetimes
1358 # before all the tasks completed would make join() hang.
1359 p = multiprocessing.Pool(3, maxtasksperchild=1)
1360 results = []
1361 for i in range(6):
1362 results.append(p.apply_async(sqr, (i, 0.3)))
1363 p.close()
1364 p.join()
1365 # check the results
1366 for (j, res) in enumerate(results):
1367 self.assertEqual(res.get(), sqr(j))
1368
1369
Benjamin Petersondfd79492008-06-13 19:13:39 +00001370#
1371# Test that manager has expected number of shared objects left
1372#
1373
1374class _TestZZZNumberOfObjects(BaseTestCase):
1375 # Because test cases are sorted alphabetically, this one will get
1376 # run after all the other tests for the manager. It tests that
1377 # there have been no "reference leaks" for the manager's shared
1378 # objects. Note the comment in _TestPool.test_terminate().
1379 ALLOWED_TYPES = ('manager',)
1380
1381 def test_number_of_objects(self):
1382 EXPECTED_NUMBER = 1 # the pool object is still alive
1383 multiprocessing.active_children() # discard dead process objs
1384 gc.collect() # do garbage collection
1385 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001386 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001387 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001388 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001389 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001390
1391 self.assertEqual(refs, EXPECTED_NUMBER)
1392
1393#
1394# Test of creating a customized manager class
1395#
1396
1397from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1398
1399class FooBar(object):
1400 def f(self):
1401 return 'f()'
1402 def g(self):
1403 raise ValueError
1404 def _h(self):
1405 return '_h()'
1406
1407def baz():
1408 for i in xrange(10):
1409 yield i*i
1410
1411class IteratorProxy(BaseProxy):
1412 _exposed_ = ('next', '__next__')
1413 def __iter__(self):
1414 return self
1415 def next(self):
1416 return self._callmethod('next')
1417 def __next__(self):
1418 return self._callmethod('__next__')
1419
1420class MyManager(BaseManager):
1421 pass
1422
1423MyManager.register('Foo', callable=FooBar)
1424MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1425MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1426
1427
1428class _TestMyManager(BaseTestCase):
1429
1430 ALLOWED_TYPES = ('manager',)
1431
1432 def test_mymanager(self):
1433 manager = MyManager()
1434 manager.start()
1435
1436 foo = manager.Foo()
1437 bar = manager.Bar()
1438 baz = manager.baz()
1439
1440 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1441 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1442
1443 self.assertEqual(foo_methods, ['f', 'g'])
1444 self.assertEqual(bar_methods, ['f', '_h'])
1445
1446 self.assertEqual(foo.f(), 'f()')
1447 self.assertRaises(ValueError, foo.g)
1448 self.assertEqual(foo._callmethod('f'), 'f()')
1449 self.assertRaises(RemoteError, foo._callmethod, '_h')
1450
1451 self.assertEqual(bar.f(), 'f()')
1452 self.assertEqual(bar._h(), '_h()')
1453 self.assertEqual(bar._callmethod('f'), 'f()')
1454 self.assertEqual(bar._callmethod('_h'), '_h()')
1455
1456 self.assertEqual(list(baz), [i*i for i in range(10)])
1457
1458 manager.shutdown()
1459
1460#
1461# Test of connecting to a remote server and using xmlrpclib for serialization
1462#
1463
1464_queue = Queue.Queue()
1465def get_queue():
1466 return _queue
1467
1468class QueueManager(BaseManager):
1469 '''manager class used by server process'''
1470QueueManager.register('get_queue', callable=get_queue)
1471
1472class QueueManager2(BaseManager):
1473 '''manager class which specifies the same interface as QueueManager'''
1474QueueManager2.register('get_queue')
1475
1476
1477SERIALIZER = 'xmlrpclib'
1478
1479class _TestRemoteManager(BaseTestCase):
1480
1481 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001482 values = ['hello world', None, True, 2.25,
1483 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1484 ]
1485 result = values[:]
1486 if test_support.have_unicode:
1487 #result[-1] = u'hall\xe5 v\xe4rlden'
1488 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1489 r'\u0441\u0432\u0456\u0442')
1490 values.append(uvalue)
1491 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001492
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001493 @classmethod
1494 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001495 manager = QueueManager2(
1496 address=address, authkey=authkey, serializer=SERIALIZER
1497 )
1498 manager.connect()
1499 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001500 # Note that xmlrpclib will deserialize object as a list not a tuple
1501 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001502
1503 def test_remote(self):
1504 authkey = os.urandom(32)
1505
1506 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001507 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001508 )
1509 manager.start()
1510
1511 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001512 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001513 p.start()
1514
1515 manager2 = QueueManager2(
1516 address=manager.address, authkey=authkey, serializer=SERIALIZER
1517 )
1518 manager2.connect()
1519 queue = manager2.get_queue()
1520
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001521 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001522
1523 # Because we are using xmlrpclib for serialization instead of
1524 # pickle this will cause a serialization error.
1525 self.assertRaises(Exception, queue.put, time.sleep)
1526
1527 # Make queue finalizer run before the server is stopped
1528 del queue
1529 manager.shutdown()
1530
Jesse Noller459a6482009-03-30 15:50:42 +00001531class _TestManagerRestart(BaseTestCase):
1532
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001533 @classmethod
1534 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001535 manager = QueueManager(
1536 address=address, authkey=authkey, serializer=SERIALIZER)
1537 manager.connect()
1538 queue = manager.get_queue()
1539 queue.put('hello world')
1540
1541 def test_rapid_restart(self):
1542 authkey = os.urandom(32)
1543 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001544 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001545 srvr = manager.get_server()
1546 addr = srvr.address
1547 # Close the connection.Listener socket which gets opened as a part
1548 # of manager.get_server(). It's not needed for the test.
1549 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001550 manager.start()
1551
1552 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001553 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001554 p.start()
1555 queue = manager.get_queue()
1556 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001557 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001558 manager.shutdown()
1559 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001560 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001561 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001562 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001563
Benjamin Petersondfd79492008-06-13 19:13:39 +00001564#
1565#
1566#
1567
1568SENTINEL = latin('')
1569
1570class _TestConnection(BaseTestCase):
1571
1572 ALLOWED_TYPES = ('processes', 'threads')
1573
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001574 @classmethod
1575 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001576 for msg in iter(conn.recv_bytes, SENTINEL):
1577 conn.send_bytes(msg)
1578 conn.close()
1579
1580 def test_connection(self):
1581 conn, child_conn = self.Pipe()
1582
1583 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001584 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001585 p.start()
1586
1587 seq = [1, 2.25, None]
1588 msg = latin('hello world')
1589 longmsg = msg * 10
1590 arr = array.array('i', range(4))
1591
1592 if self.TYPE == 'processes':
1593 self.assertEqual(type(conn.fileno()), int)
1594
1595 self.assertEqual(conn.send(seq), None)
1596 self.assertEqual(conn.recv(), seq)
1597
1598 self.assertEqual(conn.send_bytes(msg), None)
1599 self.assertEqual(conn.recv_bytes(), msg)
1600
1601 if self.TYPE == 'processes':
1602 buffer = array.array('i', [0]*10)
1603 expected = list(arr) + [0] * (10 - len(arr))
1604 self.assertEqual(conn.send_bytes(arr), None)
1605 self.assertEqual(conn.recv_bytes_into(buffer),
1606 len(arr) * buffer.itemsize)
1607 self.assertEqual(list(buffer), expected)
1608
1609 buffer = array.array('i', [0]*10)
1610 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1611 self.assertEqual(conn.send_bytes(arr), None)
1612 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1613 len(arr) * buffer.itemsize)
1614 self.assertEqual(list(buffer), expected)
1615
1616 buffer = bytearray(latin(' ' * 40))
1617 self.assertEqual(conn.send_bytes(longmsg), None)
1618 try:
1619 res = conn.recv_bytes_into(buffer)
1620 except multiprocessing.BufferTooShort, e:
1621 self.assertEqual(e.args, (longmsg,))
1622 else:
1623 self.fail('expected BufferTooShort, got %s' % res)
1624
1625 poll = TimingWrapper(conn.poll)
1626
1627 self.assertEqual(poll(), False)
1628 self.assertTimingAlmostEqual(poll.elapsed, 0)
1629
1630 self.assertEqual(poll(TIMEOUT1), False)
1631 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1632
1633 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001634 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001635
1636 self.assertEqual(poll(TIMEOUT1), True)
1637 self.assertTimingAlmostEqual(poll.elapsed, 0)
1638
1639 self.assertEqual(conn.recv(), None)
1640
1641 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1642 conn.send_bytes(really_big_msg)
1643 self.assertEqual(conn.recv_bytes(), really_big_msg)
1644
1645 conn.send_bytes(SENTINEL) # tell child to quit
1646 child_conn.close()
1647
1648 if self.TYPE == 'processes':
1649 self.assertEqual(conn.readable, True)
1650 self.assertEqual(conn.writable, True)
1651 self.assertRaises(EOFError, conn.recv)
1652 self.assertRaises(EOFError, conn.recv_bytes)
1653
1654 p.join()
1655
1656 def test_duplex_false(self):
1657 reader, writer = self.Pipe(duplex=False)
1658 self.assertEqual(writer.send(1), None)
1659 self.assertEqual(reader.recv(), 1)
1660 if self.TYPE == 'processes':
1661 self.assertEqual(reader.readable, True)
1662 self.assertEqual(reader.writable, False)
1663 self.assertEqual(writer.readable, False)
1664 self.assertEqual(writer.writable, True)
1665 self.assertRaises(IOError, reader.send, 2)
1666 self.assertRaises(IOError, writer.recv)
1667 self.assertRaises(IOError, writer.poll)
1668
1669 def test_spawn_close(self):
1670 # We test that a pipe connection can be closed by parent
1671 # process immediately after child is spawned. On Windows this
1672 # would have sometimes failed on old versions because
1673 # child_conn would be closed before the child got a chance to
1674 # duplicate it.
1675 conn, child_conn = self.Pipe()
1676
1677 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001678 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001679 p.start()
1680 child_conn.close() # this might complete before child initializes
1681
1682 msg = latin('hello')
1683 conn.send_bytes(msg)
1684 self.assertEqual(conn.recv_bytes(), msg)
1685
1686 conn.send_bytes(SENTINEL)
1687 conn.close()
1688 p.join()
1689
1690 def test_sendbytes(self):
1691 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001692 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001693
1694 msg = latin('abcdefghijklmnopqrstuvwxyz')
1695 a, b = self.Pipe()
1696
1697 a.send_bytes(msg)
1698 self.assertEqual(b.recv_bytes(), msg)
1699
1700 a.send_bytes(msg, 5)
1701 self.assertEqual(b.recv_bytes(), msg[5:])
1702
1703 a.send_bytes(msg, 7, 8)
1704 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1705
1706 a.send_bytes(msg, 26)
1707 self.assertEqual(b.recv_bytes(), latin(''))
1708
1709 a.send_bytes(msg, 26, 0)
1710 self.assertEqual(b.recv_bytes(), latin(''))
1711
1712 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1713
1714 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1715
1716 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1717
1718 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1719
1720 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1721
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001722 @classmethod
1723 def _is_fd_assigned(cls, fd):
1724 try:
1725 os.fstat(fd)
1726 except OSError as e:
1727 if e.errno == errno.EBADF:
1728 return False
1729 raise
1730 else:
1731 return True
1732
1733 @classmethod
1734 def _writefd(cls, conn, data, create_dummy_fds=False):
1735 if create_dummy_fds:
1736 for i in range(0, 256):
1737 if not cls._is_fd_assigned(i):
1738 os.dup2(conn.fileno(), i)
1739 fd = reduction.recv_handle(conn)
1740 if msvcrt:
1741 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1742 os.write(fd, data)
1743 os.close(fd)
1744
Charles-François Natalif8413b22011-09-21 18:44:49 +02001745 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001746 def test_fd_transfer(self):
1747 if self.TYPE != 'processes':
1748 self.skipTest("only makes sense with processes")
1749 conn, child_conn = self.Pipe(duplex=True)
1750
1751 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001752 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001753 p.start()
1754 with open(test_support.TESTFN, "wb") as f:
1755 fd = f.fileno()
1756 if msvcrt:
1757 fd = msvcrt.get_osfhandle(fd)
1758 reduction.send_handle(conn, fd, p.pid)
1759 p.join()
1760 with open(test_support.TESTFN, "rb") as f:
1761 self.assertEqual(f.read(), b"foo")
1762
Charles-François Natalif8413b22011-09-21 18:44:49 +02001763 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001764 @unittest.skipIf(sys.platform == "win32",
1765 "test semantics don't make sense on Windows")
1766 @unittest.skipIf(MAXFD <= 256,
1767 "largest assignable fd number is too small")
1768 @unittest.skipUnless(hasattr(os, "dup2"),
1769 "test needs os.dup2()")
1770 def test_large_fd_transfer(self):
1771 # With fd > 256 (issue #11657)
1772 if self.TYPE != 'processes':
1773 self.skipTest("only makes sense with processes")
1774 conn, child_conn = self.Pipe(duplex=True)
1775
1776 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001777 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001778 p.start()
1779 with open(test_support.TESTFN, "wb") as f:
1780 fd = f.fileno()
1781 for newfd in range(256, MAXFD):
1782 if not self._is_fd_assigned(newfd):
1783 break
1784 else:
1785 self.fail("could not find an unassigned large file descriptor")
1786 os.dup2(fd, newfd)
1787 try:
1788 reduction.send_handle(conn, newfd, p.pid)
1789 finally:
1790 os.close(newfd)
1791 p.join()
1792 with open(test_support.TESTFN, "rb") as f:
1793 self.assertEqual(f.read(), b"bar")
1794
Jesus Ceac23484b2011-09-21 03:47:39 +02001795 @classmethod
1796 def _send_data_without_fd(self, conn):
1797 os.write(conn.fileno(), b"\0")
1798
Charles-François Natalif8413b22011-09-21 18:44:49 +02001799 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001800 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1801 def test_missing_fd_transfer(self):
1802 # Check that exception is raised when received data is not
1803 # accompanied by a file descriptor in ancillary data.
1804 if self.TYPE != 'processes':
1805 self.skipTest("only makes sense with processes")
1806 conn, child_conn = self.Pipe(duplex=True)
1807
1808 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1809 p.daemon = True
1810 p.start()
1811 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1812 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001813
Benjamin Petersondfd79492008-06-13 19:13:39 +00001814class _TestListenerClient(BaseTestCase):
1815
1816 ALLOWED_TYPES = ('processes', 'threads')
1817
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001818 @classmethod
1819 def _test(cls, address):
1820 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001821 conn.send('hello')
1822 conn.close()
1823
1824 def test_listener_client(self):
1825 for family in self.connection.families:
1826 l = self.connection.Listener(family=family)
1827 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001828 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001829 p.start()
1830 conn = l.accept()
1831 self.assertEqual(conn.recv(), 'hello')
1832 p.join()
1833 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001834
1835 def test_issue14725(self):
1836 l = self.connection.Listener()
1837 p = self.Process(target=self._test, args=(l.address,))
1838 p.daemon = True
1839 p.start()
1840 time.sleep(1)
1841 # On Windows the client process should by now have connected,
1842 # written data and closed the pipe handle by now. This causes
1843 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1844 # 14725.
1845 conn = l.accept()
1846 self.assertEqual(conn.recv(), 'hello')
1847 conn.close()
1848 p.join()
1849 l.close()
1850
Benjamin Petersondfd79492008-06-13 19:13:39 +00001851#
1852# Test of sending connection and socket objects between processes
1853#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001854"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001855class _TestPicklingConnections(BaseTestCase):
1856
1857 ALLOWED_TYPES = ('processes',)
1858
1859 def _listener(self, conn, families):
1860 for fam in families:
1861 l = self.connection.Listener(family=fam)
1862 conn.send(l.address)
1863 new_conn = l.accept()
1864 conn.send(new_conn)
1865
1866 if self.TYPE == 'processes':
1867 l = socket.socket()
1868 l.bind(('localhost', 0))
1869 conn.send(l.getsockname())
1870 l.listen(1)
1871 new_conn, addr = l.accept()
1872 conn.send(new_conn)
1873
1874 conn.recv()
1875
1876 def _remote(self, conn):
1877 for (address, msg) in iter(conn.recv, None):
1878 client = self.connection.Client(address)
1879 client.send(msg.upper())
1880 client.close()
1881
1882 if self.TYPE == 'processes':
1883 address, msg = conn.recv()
1884 client = socket.socket()
1885 client.connect(address)
1886 client.sendall(msg.upper())
1887 client.close()
1888
1889 conn.close()
1890
1891 def test_pickling(self):
1892 try:
1893 multiprocessing.allow_connection_pickling()
1894 except ImportError:
1895 return
1896
1897 families = self.connection.families
1898
1899 lconn, lconn0 = self.Pipe()
1900 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001901 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001902 lp.start()
1903 lconn0.close()
1904
1905 rconn, rconn0 = self.Pipe()
1906 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001907 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001908 rp.start()
1909 rconn0.close()
1910
1911 for fam in families:
1912 msg = ('This connection uses family %s' % fam).encode('ascii')
1913 address = lconn.recv()
1914 rconn.send((address, msg))
1915 new_conn = lconn.recv()
1916 self.assertEqual(new_conn.recv(), msg.upper())
1917
1918 rconn.send(None)
1919
1920 if self.TYPE == 'processes':
1921 msg = latin('This connection uses a normal socket')
1922 address = lconn.recv()
1923 rconn.send((address, msg))
1924 if hasattr(socket, 'fromfd'):
1925 new_conn = lconn.recv()
1926 self.assertEqual(new_conn.recv(100), msg.upper())
1927 else:
1928 # XXX On Windows with Py2.6 need to backport fromfd()
1929 discard = lconn.recv_bytes()
1930
1931 lconn.send(None)
1932
1933 rconn.close()
1934 lconn.close()
1935
1936 lp.join()
1937 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001938"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001939#
1940#
1941#
1942
1943class _TestHeap(BaseTestCase):
1944
1945 ALLOWED_TYPES = ('processes',)
1946
1947 def test_heap(self):
1948 iterations = 5000
1949 maxblocks = 50
1950 blocks = []
1951
1952 # create and destroy lots of blocks of different sizes
1953 for i in xrange(iterations):
1954 size = int(random.lognormvariate(0, 1) * 1000)
1955 b = multiprocessing.heap.BufferWrapper(size)
1956 blocks.append(b)
1957 if len(blocks) > maxblocks:
1958 i = random.randrange(maxblocks)
1959 del blocks[i]
1960
1961 # get the heap object
1962 heap = multiprocessing.heap.BufferWrapper._heap
1963
1964 # verify the state of the heap
1965 all = []
1966 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001967 heap._lock.acquire()
1968 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001969 for L in heap._len_to_seq.values():
1970 for arena, start, stop in L:
1971 all.append((heap._arenas.index(arena), start, stop,
1972 stop-start, 'free'))
1973 for arena, start, stop in heap._allocated_blocks:
1974 all.append((heap._arenas.index(arena), start, stop,
1975 stop-start, 'occupied'))
1976 occupied += (stop-start)
1977
1978 all.sort()
1979
1980 for i in range(len(all)-1):
1981 (arena, start, stop) = all[i][:3]
1982 (narena, nstart, nstop) = all[i+1][:3]
1983 self.assertTrue((arena != narena and nstart == 0) or
1984 (stop == nstart))
1985
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001986 def test_free_from_gc(self):
1987 # Check that freeing of blocks by the garbage collector doesn't deadlock
1988 # (issue #12352).
1989 # Make sure the GC is enabled, and set lower collection thresholds to
1990 # make collections more frequent (and increase the probability of
1991 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001992 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001993 gc.enable()
1994 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001995 thresholds = gc.get_threshold()
1996 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001997 gc.set_threshold(10)
1998
1999 # perform numerous block allocations, with cyclic references to make
2000 # sure objects are collected asynchronously by the gc
2001 for i in range(5000):
2002 a = multiprocessing.heap.BufferWrapper(1)
2003 b = multiprocessing.heap.BufferWrapper(1)
2004 # circular references
2005 a.buddy = b
2006 b.buddy = a
2007
Benjamin Petersondfd79492008-06-13 19:13:39 +00002008#
2009#
2010#
2011
Benjamin Petersondfd79492008-06-13 19:13:39 +00002012class _Foo(Structure):
2013 _fields_ = [
2014 ('x', c_int),
2015 ('y', c_double)
2016 ]
2017
2018class _TestSharedCTypes(BaseTestCase):
2019
2020 ALLOWED_TYPES = ('processes',)
2021
Antoine Pitrou55d935a2010-11-22 16:35:57 +00002022 def setUp(self):
2023 if not HAS_SHAREDCTYPES:
2024 self.skipTest("requires multiprocessing.sharedctypes")
2025
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002026 @classmethod
2027 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002028 x.value *= 2
2029 y.value *= 2
2030 foo.x *= 2
2031 foo.y *= 2
2032 string.value *= 2
2033 for i in range(len(arr)):
2034 arr[i] *= 2
2035
2036 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002037 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002038 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002039 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002040 arr = self.Array('d', range(10), lock=lock)
2041 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00002042 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002043
2044 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002045 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002046 p.start()
2047 p.join()
2048
2049 self.assertEqual(x.value, 14)
2050 self.assertAlmostEqual(y.value, 2.0/3.0)
2051 self.assertEqual(foo.x, 6)
2052 self.assertAlmostEqual(foo.y, 4.0)
2053 for i in range(10):
2054 self.assertAlmostEqual(arr[i], i*2)
2055 self.assertEqual(string.value, latin('hellohello'))
2056
2057 def test_synchronize(self):
2058 self.test_sharedctypes(lock=True)
2059
2060 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002061 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00002062 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002063 foo.x = 0
2064 foo.y = 0
2065 self.assertEqual(bar.x, 2)
2066 self.assertAlmostEqual(bar.y, 5.0)
2067
2068#
2069#
2070#
2071
2072class _TestFinalize(BaseTestCase):
2073
2074 ALLOWED_TYPES = ('processes',)
2075
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002076 @classmethod
2077 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002078 class Foo(object):
2079 pass
2080
2081 a = Foo()
2082 util.Finalize(a, conn.send, args=('a',))
2083 del a # triggers callback for a
2084
2085 b = Foo()
2086 close_b = util.Finalize(b, conn.send, args=('b',))
2087 close_b() # triggers callback for b
2088 close_b() # does nothing because callback has already been called
2089 del b # does nothing because callback has already been called
2090
2091 c = Foo()
2092 util.Finalize(c, conn.send, args=('c',))
2093
2094 d10 = Foo()
2095 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2096
2097 d01 = Foo()
2098 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2099 d02 = Foo()
2100 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2101 d03 = Foo()
2102 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2103
2104 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2105
2106 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2107
Ezio Melottic2077b02011-03-16 12:34:31 +02002108 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002109 # garbage collecting locals
2110 util._exit_function()
2111 conn.close()
2112 os._exit(0)
2113
2114 def test_finalize(self):
2115 conn, child_conn = self.Pipe()
2116
2117 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002118 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002119 p.start()
2120 p.join()
2121
2122 result = [obj for obj in iter(conn.recv, 'STOP')]
2123 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2124
2125#
2126# Test that from ... import * works for each module
2127#
2128
2129class _TestImportStar(BaseTestCase):
2130
2131 ALLOWED_TYPES = ('processes',)
2132
2133 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002134 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002135 'multiprocessing', 'multiprocessing.connection',
2136 'multiprocessing.heap', 'multiprocessing.managers',
2137 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002138 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002139 ]
2140
Charles-François Natalif8413b22011-09-21 18:44:49 +02002141 if HAS_REDUCTION:
2142 modules.append('multiprocessing.reduction')
2143
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002144 if c_int is not None:
2145 # This module requires _ctypes
2146 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002147
2148 for name in modules:
2149 __import__(name)
2150 mod = sys.modules[name]
2151
2152 for attr in getattr(mod, '__all__', ()):
2153 self.assertTrue(
2154 hasattr(mod, attr),
2155 '%r does not have attribute %r' % (mod, attr)
2156 )
2157
2158#
2159# Quick test that logging works -- does not test logging output
2160#
2161
2162class _TestLogging(BaseTestCase):
2163
2164 ALLOWED_TYPES = ('processes',)
2165
2166 def test_enable_logging(self):
2167 logger = multiprocessing.get_logger()
2168 logger.setLevel(util.SUBWARNING)
2169 self.assertTrue(logger is not None)
2170 logger.debug('this will not be printed')
2171 logger.info('nor will this')
2172 logger.setLevel(LOG_LEVEL)
2173
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002174 @classmethod
2175 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002176 logger = multiprocessing.get_logger()
2177 conn.send(logger.getEffectiveLevel())
2178
2179 def test_level(self):
2180 LEVEL1 = 32
2181 LEVEL2 = 37
2182
2183 logger = multiprocessing.get_logger()
2184 root_logger = logging.getLogger()
2185 root_level = root_logger.level
2186
2187 reader, writer = multiprocessing.Pipe(duplex=False)
2188
2189 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002190 p = self.Process(target=self._test_level, args=(writer,))
2191 p.daemon = True
2192 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002193 self.assertEqual(LEVEL1, reader.recv())
2194
2195 logger.setLevel(logging.NOTSET)
2196 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002197 p = self.Process(target=self._test_level, args=(writer,))
2198 p.daemon = True
2199 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002200 self.assertEqual(LEVEL2, reader.recv())
2201
2202 root_logger.setLevel(root_level)
2203 logger.setLevel(level=LOG_LEVEL)
2204
Jesse Noller814d02d2009-11-21 14:38:23 +00002205
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002206# class _TestLoggingProcessName(BaseTestCase):
2207#
2208# def handle(self, record):
2209# assert record.processName == multiprocessing.current_process().name
2210# self.__handled = True
2211#
2212# def test_logging(self):
2213# handler = logging.Handler()
2214# handler.handle = self.handle
2215# self.__handled = False
2216# # Bypass getLogger() and side-effects
2217# logger = logging.getLoggerClass()(
2218# 'multiprocessing.test.TestLoggingProcessName')
2219# logger.addHandler(handler)
2220# logger.propagate = False
2221#
2222# logger.warn('foo')
2223# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002224
Benjamin Petersondfd79492008-06-13 19:13:39 +00002225#
Richard Oudkerkba482642013-02-26 12:37:07 +00002226# Check that Process.join() retries if os.waitpid() fails with EINTR
2227#
2228
2229class _TestPollEintr(BaseTestCase):
2230
2231 ALLOWED_TYPES = ('processes',)
2232
2233 @classmethod
2234 def _killer(cls, pid):
2235 time.sleep(0.5)
2236 os.kill(pid, signal.SIGUSR1)
2237
2238 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2239 def test_poll_eintr(self):
2240 got_signal = [False]
2241 def record(*args):
2242 got_signal[0] = True
2243 pid = os.getpid()
2244 oldhandler = signal.signal(signal.SIGUSR1, record)
2245 try:
2246 killer = self.Process(target=self._killer, args=(pid,))
2247 killer.start()
2248 p = self.Process(target=time.sleep, args=(1,))
2249 p.start()
2250 p.join()
2251 self.assertTrue(got_signal[0])
2252 self.assertEqual(p.exitcode, 0)
2253 killer.join()
2254 finally:
2255 signal.signal(signal.SIGUSR1, oldhandler)
2256
2257#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002258# Test to verify handle verification, see issue 3321
2259#
2260
2261class TestInvalidHandle(unittest.TestCase):
2262
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002263 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002264 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002265 conn = _multiprocessing.Connection(44977608)
2266 self.assertRaises(IOError, conn.poll)
2267 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002268
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002269#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002270# Functions used to create test cases from the base ones in this module
2271#
2272
2273def get_attributes(Source, names):
2274 d = {}
2275 for name in names:
2276 obj = getattr(Source, name)
2277 if type(obj) == type(get_attributes):
2278 obj = staticmethod(obj)
2279 d[name] = obj
2280 return d
2281
2282def create_test_cases(Mixin, type):
2283 result = {}
2284 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002285 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002286
2287 for name in glob.keys():
2288 if name.startswith('_Test'):
2289 base = glob[name]
2290 if type in base.ALLOWED_TYPES:
2291 newname = 'With' + Type + name[1:]
2292 class Temp(base, unittest.TestCase, Mixin):
2293 pass
2294 result[newname] = Temp
2295 Temp.__name__ = newname
2296 Temp.__module__ = Mixin.__module__
2297 return result
2298
2299#
2300# Create test cases
2301#
2302
2303class ProcessesMixin(object):
2304 TYPE = 'processes'
2305 Process = multiprocessing.Process
2306 locals().update(get_attributes(multiprocessing, (
2307 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2308 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2309 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002310 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002311 )))
2312
2313testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2314globals().update(testcases_processes)
2315
2316
2317class ManagerMixin(object):
2318 TYPE = 'manager'
2319 Process = multiprocessing.Process
2320 manager = object.__new__(multiprocessing.managers.SyncManager)
2321 locals().update(get_attributes(manager, (
2322 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2323 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002324 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002325 )))
2326
2327testcases_manager = create_test_cases(ManagerMixin, type='manager')
2328globals().update(testcases_manager)
2329
2330
2331class ThreadsMixin(object):
2332 TYPE = 'threads'
2333 Process = multiprocessing.dummy.Process
2334 locals().update(get_attributes(multiprocessing.dummy, (
2335 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2336 'Condition', 'Event', 'Value', 'Array', 'current_process',
2337 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002338 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002339 )))
2340
2341testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2342globals().update(testcases_threads)
2343
Neal Norwitz0c519b32008-08-25 01:50:24 +00002344class OtherTest(unittest.TestCase):
2345 # TODO: add more tests for deliver/answer challenge.
2346 def test_deliver_challenge_auth_failure(self):
2347 class _FakeConnection(object):
2348 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002349 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002350 def send_bytes(self, data):
2351 pass
2352 self.assertRaises(multiprocessing.AuthenticationError,
2353 multiprocessing.connection.deliver_challenge,
2354 _FakeConnection(), b'abc')
2355
2356 def test_answer_challenge_auth_failure(self):
2357 class _FakeConnection(object):
2358 def __init__(self):
2359 self.count = 0
2360 def recv_bytes(self, size):
2361 self.count += 1
2362 if self.count == 1:
2363 return multiprocessing.connection.CHALLENGE
2364 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002365 return b'something bogus'
2366 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002367 def send_bytes(self, data):
2368 pass
2369 self.assertRaises(multiprocessing.AuthenticationError,
2370 multiprocessing.connection.answer_challenge,
2371 _FakeConnection(), b'abc')
2372
Jesse Noller7152f6d2009-04-02 05:17:26 +00002373#
2374# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2375#
2376
2377def initializer(ns):
2378 ns.test += 1
2379
2380class TestInitializers(unittest.TestCase):
2381 def setUp(self):
2382 self.mgr = multiprocessing.Manager()
2383 self.ns = self.mgr.Namespace()
2384 self.ns.test = 0
2385
2386 def tearDown(self):
2387 self.mgr.shutdown()
2388
2389 def test_manager_initializer(self):
2390 m = multiprocessing.managers.SyncManager()
2391 self.assertRaises(TypeError, m.start, 1)
2392 m.start(initializer, (self.ns,))
2393 self.assertEqual(self.ns.test, 1)
2394 m.shutdown()
2395
2396 def test_pool_initializer(self):
2397 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2398 p = multiprocessing.Pool(1, initializer, (self.ns,))
2399 p.close()
2400 p.join()
2401 self.assertEqual(self.ns.test, 1)
2402
Jesse Noller1b90efb2009-06-30 17:11:52 +00002403#
2404# Issue 5155, 5313, 5331: Test process in processes
2405# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2406#
2407
Richard Oudkerkc5496072013-09-29 17:10:40 +01002408def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002409 try:
2410 item = q.get(block=False)
2411 except Queue.Empty:
2412 pass
2413
Richard Oudkerkc5496072013-09-29 17:10:40 +01002414def _test_process(q):
2415 queue = multiprocessing.Queue()
2416 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2417 subProc.daemon = True
2418 subProc.start()
2419 subProc.join()
2420
Jesse Noller1b90efb2009-06-30 17:11:52 +00002421def _afunc(x):
2422 return x*x
2423
2424def pool_in_process():
2425 pool = multiprocessing.Pool(processes=4)
2426 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2427
2428class _file_like(object):
2429 def __init__(self, delegate):
2430 self._delegate = delegate
2431 self._pid = None
2432
2433 @property
2434 def cache(self):
2435 pid = os.getpid()
2436 # There are no race conditions since fork keeps only the running thread
2437 if pid != self._pid:
2438 self._pid = pid
2439 self._cache = []
2440 return self._cache
2441
2442 def write(self, data):
2443 self.cache.append(data)
2444
2445 def flush(self):
2446 self._delegate.write(''.join(self.cache))
2447 self._cache = []
2448
2449class TestStdinBadfiledescriptor(unittest.TestCase):
2450
2451 def test_queue_in_process(self):
2452 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002453 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002454 proc.start()
2455 proc.join()
2456
2457 def test_pool_in_process(self):
2458 p = multiprocessing.Process(target=pool_in_process)
2459 p.start()
2460 p.join()
2461
2462 def test_flushing(self):
2463 sio = StringIO()
2464 flike = _file_like(sio)
2465 flike.write('foo')
2466 proc = multiprocessing.Process(target=lambda: flike.flush())
2467 flike.flush()
2468 assert sio.getvalue() == 'foo'
2469
Richard Oudkerke4b99382012-07-27 14:05:46 +01002470#
2471# Test interaction with socket timeouts - see Issue #6056
2472#
2473
2474class TestTimeouts(unittest.TestCase):
2475 @classmethod
2476 def _test_timeout(cls, child, address):
2477 time.sleep(1)
2478 child.send(123)
2479 child.close()
2480 conn = multiprocessing.connection.Client(address)
2481 conn.send(456)
2482 conn.close()
2483
2484 def test_timeout(self):
2485 old_timeout = socket.getdefaulttimeout()
2486 try:
2487 socket.setdefaulttimeout(0.1)
2488 parent, child = multiprocessing.Pipe(duplex=True)
2489 l = multiprocessing.connection.Listener(family='AF_INET')
2490 p = multiprocessing.Process(target=self._test_timeout,
2491 args=(child, l.address))
2492 p.start()
2493 child.close()
2494 self.assertEqual(parent.recv(), 123)
2495 parent.close()
2496 conn = l.accept()
2497 self.assertEqual(conn.recv(), 456)
2498 conn.close()
2499 l.close()
2500 p.join(10)
2501 finally:
2502 socket.setdefaulttimeout(old_timeout)
2503
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002504#
2505# Test what happens with no "if __name__ == '__main__'"
2506#
2507
2508class TestNoForkBomb(unittest.TestCase):
2509 def test_noforkbomb(self):
2510 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2511 if WIN32:
2512 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002513 self.assertEqual(out, '')
2514 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002515 else:
2516 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002517 self.assertEqual(out.rstrip(), '123')
2518 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002519
2520#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002521# Issue 12098: check sys.flags of child matches that for parent
2522#
2523
2524class TestFlags(unittest.TestCase):
2525 @classmethod
2526 def run_in_grandchild(cls, conn):
2527 conn.send(tuple(sys.flags))
2528
2529 @classmethod
2530 def run_in_child(cls):
2531 import json
2532 r, w = multiprocessing.Pipe(duplex=False)
2533 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2534 p.start()
2535 grandchild_flags = r.recv()
2536 p.join()
2537 r.close()
2538 w.close()
2539 flags = (tuple(sys.flags), grandchild_flags)
2540 print(json.dumps(flags))
2541
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002542 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002543 def test_flags(self):
2544 import json, subprocess
2545 # start child process using unusual flags
2546 prog = ('from test.test_multiprocessing import TestFlags; ' +
2547 'TestFlags.run_in_child()')
2548 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002549 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002550 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2551 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002552
2553#
2554# Issue #17555: ForkAwareThreadLock
2555#
2556
2557class TestForkAwareThreadLock(unittest.TestCase):
2558 # We recurisvely start processes. Issue #17555 meant that the
2559 # after fork registry would get duplicate entries for the same
2560 # lock. The size of the registry at generation n was ~2**n.
2561
2562 @classmethod
2563 def child(cls, n, conn):
2564 if n > 1:
2565 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2566 p.start()
2567 p.join()
2568 else:
2569 conn.send(len(util._afterfork_registry))
2570 conn.close()
2571
2572 def test_lock(self):
2573 r, w = multiprocessing.Pipe(False)
2574 l = util.ForkAwareThreadLock()
2575 old_size = len(util._afterfork_registry)
2576 p = multiprocessing.Process(target=self.child, args=(5, w))
2577 p.start()
2578 new_size = r.recv()
2579 p.join()
2580 self.assertLessEqual(new_size, old_size)
2581
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002582#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002583# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2584#
2585
2586class TestIgnoreEINTR(unittest.TestCase):
2587
2588 @classmethod
2589 def _test_ignore(cls, conn):
2590 def handler(signum, frame):
2591 pass
2592 signal.signal(signal.SIGUSR1, handler)
2593 conn.send('ready')
2594 x = conn.recv()
2595 conn.send(x)
2596 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2597
2598 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2599 def test_ignore(self):
2600 conn, child_conn = multiprocessing.Pipe()
2601 try:
2602 p = multiprocessing.Process(target=self._test_ignore,
2603 args=(child_conn,))
2604 p.daemon = True
2605 p.start()
2606 child_conn.close()
2607 self.assertEqual(conn.recv(), 'ready')
2608 time.sleep(0.1)
2609 os.kill(p.pid, signal.SIGUSR1)
2610 time.sleep(0.1)
2611 conn.send(1234)
2612 self.assertEqual(conn.recv(), 1234)
2613 time.sleep(0.1)
2614 os.kill(p.pid, signal.SIGUSR1)
2615 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2616 time.sleep(0.1)
2617 p.join()
2618 finally:
2619 conn.close()
2620
2621 @classmethod
2622 def _test_ignore_listener(cls, conn):
2623 def handler(signum, frame):
2624 pass
2625 signal.signal(signal.SIGUSR1, handler)
2626 l = multiprocessing.connection.Listener()
2627 conn.send(l.address)
2628 a = l.accept()
2629 a.send('welcome')
2630
2631 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2632 def test_ignore_listener(self):
2633 conn, child_conn = multiprocessing.Pipe()
2634 try:
2635 p = multiprocessing.Process(target=self._test_ignore_listener,
2636 args=(child_conn,))
2637 p.daemon = True
2638 p.start()
2639 child_conn.close()
2640 address = conn.recv()
2641 time.sleep(0.1)
2642 os.kill(p.pid, signal.SIGUSR1)
2643 time.sleep(0.1)
2644 client = multiprocessing.connection.Client(address)
2645 self.assertEqual(client.recv(), 'welcome')
2646 p.join()
2647 finally:
2648 conn.close()
2649
2650#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002651#
2652#
2653
Jesse Noller1b90efb2009-06-30 17:11:52 +00002654testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002655 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002656 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002657
Benjamin Petersondfd79492008-06-13 19:13:39 +00002658#
2659#
2660#
2661
2662def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002663 if sys.platform.startswith("linux"):
2664 try:
2665 lock = multiprocessing.RLock()
2666 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002667 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002668
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002669 check_enough_semaphores()
2670
Benjamin Petersondfd79492008-06-13 19:13:39 +00002671 if run is None:
2672 from test.test_support import run_unittest as run
2673
2674 util.get_temp_dir() # creates temp directory for use by all processes
2675
2676 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2677
Jesse Noller146b7ab2008-07-02 16:44:09 +00002678 ProcessesMixin.pool = multiprocessing.Pool(4)
2679 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2680 ManagerMixin.manager.__init__()
2681 ManagerMixin.manager.start()
2682 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002683
2684 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002685 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2686 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002687 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2688 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002689 )
2690
2691 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2692 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002693 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2694 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002695 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002696 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002697 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002698 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2699 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2700 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002701 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002702
Jesse Noller146b7ab2008-07-02 16:44:09 +00002703 ThreadsMixin.pool.terminate()
2704 ProcessesMixin.pool.terminate()
2705 ManagerMixin.pool.terminate()
2706 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002707
Jesse Noller146b7ab2008-07-02 16:44:09 +00002708 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002709
2710def main():
2711 test_main(unittest.TextTestRunner(verbosity=2).run)
2712
2713if __name__ == '__main__':
2714 main()