blob: 163c42f1d10a17ced98084cbe4d0601db3e704df [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Antoine Pitrou5084ff72017-03-24 16:03:46 +010017import weakref
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010018import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
178class _TestProcess(BaseTestCase):
179
180 ALLOWED_TYPES = ('processes', 'threads')
181
182 def test_current(self):
183 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600184 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000185
186 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000191 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEqual(current.ident, os.getpid())
194 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 q.put(args)
200 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000223 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225
226 p.start()
227
Ezio Melotti2623a372010-11-21 13:34:58 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
Ezio Melotti2623a372010-11-21 13:34:58 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.join()
240
Ezio Melotti2623a372010-11-21 13:34:58 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600251 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000254 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000258 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
271
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288
Jesus Cea6f6016b2011-09-09 20:26:57 +0200289 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000290 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000291 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000292
293 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000294 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000295
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000296 @classmethod
297 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298 from multiprocessing import forking
299 wconn.send(id)
300 if len(id) < 2:
301 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 p = cls.Process(
303 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 )
305 p.start()
306 p.join()
307
308 def test_recursion(self):
309 rconn, wconn = self.Pipe(duplex=False)
310 self._test_recursion(wconn, [])
311
312 time.sleep(DELTA)
313 result = []
314 while rconn.poll():
315 result.append(rconn.recv())
316
317 expected = [
318 [],
319 [0],
320 [0, 0],
321 [0, 1],
322 [1],
323 [1, 0],
324 [1, 1]
325 ]
326 self.assertEqual(result, expected)
327
Richard Oudkerk2182e052012-06-06 19:01:14 +0100328 @classmethod
329 def _test_sys_exit(cls, reason, testfn):
330 sys.stderr = open(testfn, 'w')
331 sys.exit(reason)
332
333 def test_sys_exit(self):
334 # See Issue 13854
335 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600336 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100337
338 testfn = test_support.TESTFN
339 self.addCleanup(test_support.unlink, testfn)
340
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000341 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100342 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
343 p.daemon = True
344 p.start()
345 p.join(5)
346 self.assertEqual(p.exitcode, code)
347
348 with open(testfn, 'r') as f:
349 self.assertEqual(f.read().rstrip(), str(reason))
350
351 for reason in (True, False, 8):
352 p = self.Process(target=sys.exit, args=(reason,))
353 p.daemon = True
354 p.start()
355 p.join(5)
356 self.assertEqual(p.exitcode, reason)
357
Benjamin Petersondfd79492008-06-13 19:13:39 +0000358#
359#
360#
361
362class _UpperCaser(multiprocessing.Process):
363
364 def __init__(self):
365 multiprocessing.Process.__init__(self)
366 self.child_conn, self.parent_conn = multiprocessing.Pipe()
367
368 def run(self):
369 self.parent_conn.close()
370 for s in iter(self.child_conn.recv, None):
371 self.child_conn.send(s.upper())
372 self.child_conn.close()
373
374 def submit(self, s):
375 assert type(s) is str
376 self.parent_conn.send(s)
377 return self.parent_conn.recv()
378
379 def stop(self):
380 self.parent_conn.send(None)
381 self.parent_conn.close()
382 self.child_conn.close()
383
384class _TestSubclassingProcess(BaseTestCase):
385
386 ALLOWED_TYPES = ('processes',)
387
388 def test_subclassing(self):
389 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200390 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000391 uppercaser.start()
392 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
393 self.assertEqual(uppercaser.submit('world'), 'WORLD')
394 uppercaser.stop()
395 uppercaser.join()
396
397#
398#
399#
400
401def queue_empty(q):
402 if hasattr(q, 'empty'):
403 return q.empty()
404 else:
405 return q.qsize() == 0
406
407def queue_full(q, maxsize):
408 if hasattr(q, 'full'):
409 return q.full()
410 else:
411 return q.qsize() == maxsize
412
413
414class _TestQueue(BaseTestCase):
415
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
420 for i in range(6):
421 queue.get()
422 parent_can_continue.set()
423
424 def test_put(self):
425 MAXSIZE = 6
426 queue = self.Queue(maxsize=MAXSIZE)
427 child_can_start = self.Event()
428 parent_can_continue = self.Event()
429
430 proc = self.Process(
431 target=self._test_put,
432 args=(queue, child_can_start, parent_can_continue)
433 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000434 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000435 proc.start()
436
437 self.assertEqual(queue_empty(queue), True)
438 self.assertEqual(queue_full(queue, MAXSIZE), False)
439
440 queue.put(1)
441 queue.put(2, True)
442 queue.put(3, True, None)
443 queue.put(4, False)
444 queue.put(5, False, None)
445 queue.put_nowait(6)
446
447 # the values may be in buffer but not yet in pipe so sleep a bit
448 time.sleep(DELTA)
449
450 self.assertEqual(queue_empty(queue), False)
451 self.assertEqual(queue_full(queue, MAXSIZE), True)
452
453 put = TimingWrapper(queue.put)
454 put_nowait = TimingWrapper(queue.put_nowait)
455
456 self.assertRaises(Queue.Full, put, 7, False)
457 self.assertTimingAlmostEqual(put.elapsed, 0)
458
459 self.assertRaises(Queue.Full, put, 7, False, None)
460 self.assertTimingAlmostEqual(put.elapsed, 0)
461
462 self.assertRaises(Queue.Full, put_nowait, 7)
463 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
464
465 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
466 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
467
468 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
469 self.assertTimingAlmostEqual(put.elapsed, 0)
470
471 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
472 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
473
474 child_can_start.set()
475 parent_can_continue.wait()
476
477 self.assertEqual(queue_empty(queue), True)
478 self.assertEqual(queue_full(queue, MAXSIZE), False)
479
480 proc.join()
481
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000482 @classmethod
483 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000484 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000485 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000486 queue.put(2)
487 queue.put(3)
488 queue.put(4)
489 queue.put(5)
490 parent_can_continue.set()
491
492 def test_get(self):
493 queue = self.Queue()
494 child_can_start = self.Event()
495 parent_can_continue = self.Event()
496
497 proc = self.Process(
498 target=self._test_get,
499 args=(queue, child_can_start, parent_can_continue)
500 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000501 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000502 proc.start()
503
504 self.assertEqual(queue_empty(queue), True)
505
506 child_can_start.set()
507 parent_can_continue.wait()
508
509 time.sleep(DELTA)
510 self.assertEqual(queue_empty(queue), False)
511
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000512 # Hangs unexpectedly, remove for now
513 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000514 self.assertEqual(queue.get(True, None), 2)
515 self.assertEqual(queue.get(True), 3)
516 self.assertEqual(queue.get(timeout=1), 4)
517 self.assertEqual(queue.get_nowait(), 5)
518
519 self.assertEqual(queue_empty(queue), True)
520
521 get = TimingWrapper(queue.get)
522 get_nowait = TimingWrapper(queue.get_nowait)
523
524 self.assertRaises(Queue.Empty, get, False)
525 self.assertTimingAlmostEqual(get.elapsed, 0)
526
527 self.assertRaises(Queue.Empty, get, False, None)
528 self.assertTimingAlmostEqual(get.elapsed, 0)
529
530 self.assertRaises(Queue.Empty, get_nowait)
531 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
532
533 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
534 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
535
536 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
537 self.assertTimingAlmostEqual(get.elapsed, 0)
538
539 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
540 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
541
542 proc.join()
543
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000544 @classmethod
545 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000546 for i in range(10, 20):
547 queue.put(i)
548 # note that at this point the items may only be buffered, so the
549 # process cannot shutdown until the feeder thread has finished
550 # pushing items onto the pipe.
551
552 def test_fork(self):
553 # Old versions of Queue would fail to create a new feeder
554 # thread for a forked process if the original process had its
555 # own feeder thread. This test checks that this no longer
556 # happens.
557
558 queue = self.Queue()
559
560 # put items on queue so that main process starts a feeder thread
561 for i in range(10):
562 queue.put(i)
563
564 # wait to make sure thread starts before we fork a new process
565 time.sleep(DELTA)
566
567 # fork process
568 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200569 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000570 p.start()
571
572 # check that all expected items are in the queue
573 for i in range(20):
574 self.assertEqual(queue.get(), i)
575 self.assertRaises(Queue.Empty, queue.get, False)
576
577 p.join()
578
579 def test_qsize(self):
580 q = self.Queue()
581 try:
582 self.assertEqual(q.qsize(), 0)
583 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600584 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000585 q.put(1)
586 self.assertEqual(q.qsize(), 1)
587 q.put(5)
588 self.assertEqual(q.qsize(), 2)
589 q.get()
590 self.assertEqual(q.qsize(), 1)
591 q.get()
592 self.assertEqual(q.qsize(), 0)
593
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000594 @classmethod
595 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000596 for obj in iter(q.get, None):
597 time.sleep(DELTA)
598 q.task_done()
599
600 def test_task_done(self):
601 queue = self.JoinableQueue()
602
603 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000604 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000605
606 workers = [self.Process(target=self._test_task_done, args=(queue,))
607 for i in xrange(4)]
608
609 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200610 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000611 p.start()
612
613 for i in xrange(10):
614 queue.put(i)
615
616 queue.join()
617
618 for p in workers:
619 queue.put(None)
620
621 for p in workers:
622 p.join()
623
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200624 def test_no_import_lock_contention(self):
625 with test_support.temp_cwd():
626 module_name = 'imported_by_an_imported_module'
627 with open(module_name + '.py', 'w') as f:
628 f.write("""if 1:
629 import multiprocessing
630
631 q = multiprocessing.Queue()
632 q.put('knock knock')
633 q.get(timeout=3)
634 q.close()
635 """)
636
637 with test_support.DirsOnSysPath(os.getcwd()):
638 try:
639 __import__(module_name)
640 except Queue.Empty:
641 self.fail("Probable regression on import lock contention;"
642 " see Issue #22853")
643
Benjamin Petersondfd79492008-06-13 19:13:39 +0000644#
645#
646#
647
648class _TestLock(BaseTestCase):
649
650 def test_lock(self):
651 lock = self.Lock()
652 self.assertEqual(lock.acquire(), True)
653 self.assertEqual(lock.acquire(False), False)
654 self.assertEqual(lock.release(), None)
655 self.assertRaises((ValueError, threading.ThreadError), lock.release)
656
657 def test_rlock(self):
658 lock = self.RLock()
659 self.assertEqual(lock.acquire(), True)
660 self.assertEqual(lock.acquire(), True)
661 self.assertEqual(lock.acquire(), True)
662 self.assertEqual(lock.release(), None)
663 self.assertEqual(lock.release(), None)
664 self.assertEqual(lock.release(), None)
665 self.assertRaises((AssertionError, RuntimeError), lock.release)
666
Jesse Noller82eb5902009-03-30 23:29:31 +0000667 def test_lock_context(self):
668 with self.Lock():
669 pass
670
Benjamin Petersondfd79492008-06-13 19:13:39 +0000671
672class _TestSemaphore(BaseTestCase):
673
674 def _test_semaphore(self, sem):
675 self.assertReturnsIfImplemented(2, get_value, sem)
676 self.assertEqual(sem.acquire(), True)
677 self.assertReturnsIfImplemented(1, get_value, sem)
678 self.assertEqual(sem.acquire(), True)
679 self.assertReturnsIfImplemented(0, get_value, sem)
680 self.assertEqual(sem.acquire(False), False)
681 self.assertReturnsIfImplemented(0, get_value, sem)
682 self.assertEqual(sem.release(), None)
683 self.assertReturnsIfImplemented(1, get_value, sem)
684 self.assertEqual(sem.release(), None)
685 self.assertReturnsIfImplemented(2, get_value, sem)
686
687 def test_semaphore(self):
688 sem = self.Semaphore(2)
689 self._test_semaphore(sem)
690 self.assertEqual(sem.release(), None)
691 self.assertReturnsIfImplemented(3, get_value, sem)
692 self.assertEqual(sem.release(), None)
693 self.assertReturnsIfImplemented(4, get_value, sem)
694
695 def test_bounded_semaphore(self):
696 sem = self.BoundedSemaphore(2)
697 self._test_semaphore(sem)
698 # Currently fails on OS/X
699 #if HAVE_GETVALUE:
700 # self.assertRaises(ValueError, sem.release)
701 # self.assertReturnsIfImplemented(2, get_value, sem)
702
703 def test_timeout(self):
704 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600705 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000706
707 sem = self.Semaphore(0)
708 acquire = TimingWrapper(sem.acquire)
709
710 self.assertEqual(acquire(False), False)
711 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
712
713 self.assertEqual(acquire(False, None), False)
714 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
715
716 self.assertEqual(acquire(False, TIMEOUT1), False)
717 self.assertTimingAlmostEqual(acquire.elapsed, 0)
718
719 self.assertEqual(acquire(True, TIMEOUT2), False)
720 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
721
722 self.assertEqual(acquire(timeout=TIMEOUT3), False)
723 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
724
725
726class _TestCondition(BaseTestCase):
727
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000728 @classmethod
729 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000730 cond.acquire()
731 sleeping.release()
732 cond.wait(timeout)
733 woken.release()
734 cond.release()
735
736 def check_invariant(self, cond):
737 # this is only supposed to succeed when there are no sleepers
738 if self.TYPE == 'processes':
739 try:
740 sleepers = (cond._sleeping_count.get_value() -
741 cond._woken_count.get_value())
742 self.assertEqual(sleepers, 0)
743 self.assertEqual(cond._wait_semaphore.get_value(), 0)
744 except NotImplementedError:
745 pass
746
747 def test_notify(self):
748 cond = self.Condition()
749 sleeping = self.Semaphore(0)
750 woken = self.Semaphore(0)
751
752 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000753 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000754 p.start()
755
756 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000757 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000758 p.start()
759
760 # wait for both children to start sleeping
761 sleeping.acquire()
762 sleeping.acquire()
763
764 # check no process/thread has woken up
765 time.sleep(DELTA)
766 self.assertReturnsIfImplemented(0, get_value, woken)
767
768 # wake up one process/thread
769 cond.acquire()
770 cond.notify()
771 cond.release()
772
773 # check one process/thread has woken up
774 time.sleep(DELTA)
775 self.assertReturnsIfImplemented(1, get_value, woken)
776
777 # wake up another
778 cond.acquire()
779 cond.notify()
780 cond.release()
781
782 # check other has woken up
783 time.sleep(DELTA)
784 self.assertReturnsIfImplemented(2, get_value, woken)
785
786 # check state is not mucked up
787 self.check_invariant(cond)
788 p.join()
789
790 def test_notify_all(self):
791 cond = self.Condition()
792 sleeping = self.Semaphore(0)
793 woken = self.Semaphore(0)
794
795 # start some threads/processes which will timeout
796 for i in range(3):
797 p = self.Process(target=self.f,
798 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000799 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000800 p.start()
801
802 t = threading.Thread(target=self.f,
803 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000804 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000805 t.start()
806
807 # wait for them all to sleep
808 for i in xrange(6):
809 sleeping.acquire()
810
811 # check they have all timed out
812 for i in xrange(6):
813 woken.acquire()
814 self.assertReturnsIfImplemented(0, get_value, woken)
815
816 # check state is not mucked up
817 self.check_invariant(cond)
818
819 # start some more threads/processes
820 for i in range(3):
821 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000822 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000823 p.start()
824
825 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000826 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000827 t.start()
828
829 # wait for them to all sleep
830 for i in xrange(6):
831 sleeping.acquire()
832
833 # check no process/thread has woken up
834 time.sleep(DELTA)
835 self.assertReturnsIfImplemented(0, get_value, woken)
836
837 # wake them all up
838 cond.acquire()
839 cond.notify_all()
840 cond.release()
841
842 # check they have all woken
843 time.sleep(DELTA)
844 self.assertReturnsIfImplemented(6, get_value, woken)
845
846 # check state is not mucked up
847 self.check_invariant(cond)
848
849 def test_timeout(self):
850 cond = self.Condition()
851 wait = TimingWrapper(cond.wait)
852 cond.acquire()
853 res = wait(TIMEOUT1)
854 cond.release()
855 self.assertEqual(res, None)
856 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
857
858
859class _TestEvent(BaseTestCase):
860
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000861 @classmethod
862 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000863 time.sleep(TIMEOUT2)
864 event.set()
865
866 def test_event(self):
867 event = self.Event()
868 wait = TimingWrapper(event.wait)
869
Ezio Melottic2077b02011-03-16 12:34:31 +0200870 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000871 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000872 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000873
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000874 # Removed, threading.Event.wait() will return the value of the __flag
875 # instead of None. API Shear with the semaphore backed mp.Event
876 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000877 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000878 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000879 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
880
881 event.set()
882
883 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000884 self.assertEqual(event.is_set(), True)
885 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000886 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000887 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000888 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
889 # self.assertEqual(event.is_set(), True)
890
891 event.clear()
892
893 #self.assertEqual(event.is_set(), False)
894
Jesus Cea6f6016b2011-09-09 20:26:57 +0200895 p = self.Process(target=self._test_event, args=(event,))
896 p.daemon = True
897 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000898 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000899
900#
901#
902#
903
904class _TestValue(BaseTestCase):
905
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000906 ALLOWED_TYPES = ('processes',)
907
Benjamin Petersondfd79492008-06-13 19:13:39 +0000908 codes_values = [
909 ('i', 4343, 24234),
910 ('d', 3.625, -4.25),
911 ('h', -232, 234),
912 ('c', latin('x'), latin('y'))
913 ]
914
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000915 def setUp(self):
916 if not HAS_SHAREDCTYPES:
917 self.skipTest("requires multiprocessing.sharedctypes")
918
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000919 @classmethod
920 def _test(cls, values):
921 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000922 sv.value = cv[2]
923
924
925 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000926 if raw:
927 values = [self.RawValue(code, value)
928 for code, value, _ in self.codes_values]
929 else:
930 values = [self.Value(code, value)
931 for code, value, _ in self.codes_values]
932
933 for sv, cv in zip(values, self.codes_values):
934 self.assertEqual(sv.value, cv[1])
935
936 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200937 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000938 proc.start()
939 proc.join()
940
941 for sv, cv in zip(values, self.codes_values):
942 self.assertEqual(sv.value, cv[2])
943
944 def test_rawvalue(self):
945 self.test_value(raw=True)
946
947 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000948 val1 = self.Value('i', 5)
949 lock1 = val1.get_lock()
950 obj1 = val1.get_obj()
951
952 val2 = self.Value('i', 5, lock=None)
953 lock2 = val2.get_lock()
954 obj2 = val2.get_obj()
955
956 lock = self.Lock()
957 val3 = self.Value('i', 5, lock=lock)
958 lock3 = val3.get_lock()
959 obj3 = val3.get_obj()
960 self.assertEqual(lock, lock3)
961
Jesse Noller6ab22152009-01-18 02:45:38 +0000962 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000963 self.assertFalse(hasattr(arr4, 'get_lock'))
964 self.assertFalse(hasattr(arr4, 'get_obj'))
965
Jesse Noller6ab22152009-01-18 02:45:38 +0000966 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
967
968 arr5 = self.RawValue('i', 5)
969 self.assertFalse(hasattr(arr5, 'get_lock'))
970 self.assertFalse(hasattr(arr5, 'get_obj'))
971
Benjamin Petersondfd79492008-06-13 19:13:39 +0000972
973class _TestArray(BaseTestCase):
974
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000975 ALLOWED_TYPES = ('processes',)
976
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000977 @classmethod
978 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000979 for i in range(1, len(seq)):
980 seq[i] += seq[i-1]
981
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000982 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000983 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000984 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
985 if raw:
986 arr = self.RawArray('i', seq)
987 else:
988 arr = self.Array('i', seq)
989
990 self.assertEqual(len(arr), len(seq))
991 self.assertEqual(arr[3], seq[3])
992 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
993
994 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
995
996 self.assertEqual(list(arr[:]), seq)
997
998 self.f(seq)
999
1000 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001001 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001002 p.start()
1003 p.join()
1004
1005 self.assertEqual(list(arr[:]), seq)
1006
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001007 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001008 def test_array_from_size(self):
1009 size = 10
1010 # Test for zeroing (see issue #11675).
1011 # The repetition below strengthens the test by increasing the chances
1012 # of previously allocated non-zero memory being used for the new array
1013 # on the 2nd and 3rd loops.
1014 for _ in range(3):
1015 arr = self.Array('i', size)
1016 self.assertEqual(len(arr), size)
1017 self.assertEqual(list(arr), [0] * size)
1018 arr[:] = range(10)
1019 self.assertEqual(list(arr), range(10))
1020 del arr
1021
1022 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001023 def test_rawarray(self):
1024 self.test_array(raw=True)
1025
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001026 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001027 def test_array_accepts_long(self):
1028 arr = self.Array('i', 10L)
1029 self.assertEqual(len(arr), 10)
1030 raw_arr = self.RawArray('i', 10L)
1031 self.assertEqual(len(raw_arr), 10)
1032
1033 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001034 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001035 arr1 = self.Array('i', range(10))
1036 lock1 = arr1.get_lock()
1037 obj1 = arr1.get_obj()
1038
1039 arr2 = self.Array('i', range(10), lock=None)
1040 lock2 = arr2.get_lock()
1041 obj2 = arr2.get_obj()
1042
1043 lock = self.Lock()
1044 arr3 = self.Array('i', range(10), lock=lock)
1045 lock3 = arr3.get_lock()
1046 obj3 = arr3.get_obj()
1047 self.assertEqual(lock, lock3)
1048
Jesse Noller6ab22152009-01-18 02:45:38 +00001049 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001050 self.assertFalse(hasattr(arr4, 'get_lock'))
1051 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001052 self.assertRaises(AttributeError,
1053 self.Array, 'i', range(10), lock='notalock')
1054
1055 arr5 = self.RawArray('i', range(10))
1056 self.assertFalse(hasattr(arr5, 'get_lock'))
1057 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001058
1059#
1060#
1061#
1062
1063class _TestContainers(BaseTestCase):
1064
1065 ALLOWED_TYPES = ('manager',)
1066
1067 def test_list(self):
1068 a = self.list(range(10))
1069 self.assertEqual(a[:], range(10))
1070
1071 b = self.list()
1072 self.assertEqual(b[:], [])
1073
1074 b.extend(range(5))
1075 self.assertEqual(b[:], range(5))
1076
1077 self.assertEqual(b[2], 2)
1078 self.assertEqual(b[2:10], [2,3,4])
1079
1080 b *= 2
1081 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1082
1083 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1084
1085 self.assertEqual(a[:], range(10))
1086
1087 d = [a, b]
1088 e = self.list(d)
1089 self.assertEqual(
1090 e[:],
1091 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1092 )
1093
1094 f = self.list([a])
1095 a.append('hello')
1096 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1097
1098 def test_dict(self):
1099 d = self.dict()
1100 indices = range(65, 70)
1101 for i in indices:
1102 d[i] = chr(i)
1103 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1104 self.assertEqual(sorted(d.keys()), indices)
1105 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1106 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1107
1108 def test_namespace(self):
1109 n = self.Namespace()
1110 n.name = 'Bob'
1111 n.job = 'Builder'
1112 n._hidden = 'hidden'
1113 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1114 del n.job
1115 self.assertEqual(str(n), "Namespace(name='Bob')")
1116 self.assertTrue(hasattr(n, 'name'))
1117 self.assertTrue(not hasattr(n, 'job'))
1118
1119#
1120#
1121#
1122
1123def sqr(x, wait=0.0):
1124 time.sleep(wait)
1125 return x*x
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001126
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001127def identity(x):
1128 return x
1129
1130class CountedObject(object):
1131 n_instances = 0
1132
1133 def __new__(cls):
1134 cls.n_instances += 1
1135 return object.__new__(cls)
1136
1137 def __del__(self):
1138 type(self).n_instances -= 1
1139
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001140class SayWhenError(ValueError): pass
1141
1142def exception_throwing_generator(total, when):
1143 for i in range(total):
1144 if i == when:
1145 raise SayWhenError("Somebody said when")
1146 yield i
1147
Benjamin Petersondfd79492008-06-13 19:13:39 +00001148class _TestPool(BaseTestCase):
1149
1150 def test_apply(self):
1151 papply = self.pool.apply
1152 self.assertEqual(papply(sqr, (5,)), sqr(5))
1153 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1154
1155 def test_map(self):
1156 pmap = self.pool.map
1157 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1158 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1159 map(sqr, range(100)))
1160
Richard Oudkerk21aad972013-10-28 23:02:22 +00001161 def test_map_unplicklable(self):
1162 # Issue #19425 -- failure to pickle should not cause a hang
1163 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001164 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001165 class A(object):
1166 def __reduce__(self):
1167 raise RuntimeError('cannot pickle')
1168 with self.assertRaises(RuntimeError):
1169 self.pool.map(sqr, [A()]*10)
1170
Jesse Noller7530e472009-07-16 14:23:04 +00001171 def test_map_chunksize(self):
1172 try:
1173 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1174 except multiprocessing.TimeoutError:
1175 self.fail("pool.map_async with chunksize stalled on null list")
1176
Benjamin Petersondfd79492008-06-13 19:13:39 +00001177 def test_async(self):
1178 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1179 get = TimingWrapper(res.get)
1180 self.assertEqual(get(), 49)
1181 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1182
1183 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001184 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001185 get = TimingWrapper(res.get)
1186 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1187 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1188
1189 def test_imap(self):
1190 it = self.pool.imap(sqr, range(10))
1191 self.assertEqual(list(it), map(sqr, range(10)))
1192
1193 it = self.pool.imap(sqr, range(10))
1194 for i in range(10):
1195 self.assertEqual(it.next(), i*i)
1196 self.assertRaises(StopIteration, it.next)
1197
1198 it = self.pool.imap(sqr, range(1000), chunksize=100)
1199 for i in range(1000):
1200 self.assertEqual(it.next(), i*i)
1201 self.assertRaises(StopIteration, it.next)
1202
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001203 def test_imap_handle_iterable_exception(self):
1204 if self.TYPE == 'manager':
1205 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1206
1207 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1208 for i in range(3):
1209 self.assertEqual(next(it), i*i)
1210 self.assertRaises(SayWhenError, it.next)
1211
1212 # SayWhenError seen at start of problematic chunk's results
1213 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1214 for i in range(6):
1215 self.assertEqual(next(it), i*i)
1216 self.assertRaises(SayWhenError, it.next)
1217 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1218 for i in range(4):
1219 self.assertEqual(next(it), i*i)
1220 self.assertRaises(SayWhenError, it.next)
1221
Benjamin Petersondfd79492008-06-13 19:13:39 +00001222 def test_imap_unordered(self):
1223 it = self.pool.imap_unordered(sqr, range(1000))
1224 self.assertEqual(sorted(it), map(sqr, range(1000)))
1225
1226 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1227 self.assertEqual(sorted(it), map(sqr, range(1000)))
1228
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001229 def test_imap_unordered_handle_iterable_exception(self):
1230 if self.TYPE == 'manager':
1231 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1232
1233 it = self.pool.imap_unordered(sqr,
1234 exception_throwing_generator(10, 3),
1235 1)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001236 expected_values = map(sqr, range(10))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001237 with self.assertRaises(SayWhenError):
1238 # imap_unordered makes it difficult to anticipate the SayWhenError
1239 for i in range(10):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001240 value = next(it)
1241 self.assertIn(value, expected_values)
1242 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001243
1244 it = self.pool.imap_unordered(sqr,
1245 exception_throwing_generator(20, 7),
1246 2)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001247 expected_values = map(sqr, range(20))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001248 with self.assertRaises(SayWhenError):
1249 for i in range(20):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001250 value = next(it)
1251 self.assertIn(value, expected_values)
1252 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001253
Benjamin Petersondfd79492008-06-13 19:13:39 +00001254 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001255 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1256 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1257
Benjamin Petersondfd79492008-06-13 19:13:39 +00001258 p = multiprocessing.Pool(3)
1259 self.assertEqual(3, len(p._pool))
1260 p.close()
1261 p.join()
1262
1263 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001264 p = self.Pool(4)
1265 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001266 time.sleep, [0.1 for i in range(10000)], chunksize=1
1267 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001268 p.terminate()
1269 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001270 join()
1271 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001272
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001273 def test_empty_iterable(self):
1274 # See Issue 12157
1275 p = self.Pool(1)
1276
1277 self.assertEqual(p.map(sqr, []), [])
1278 self.assertEqual(list(p.imap(sqr, [])), [])
1279 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1280 self.assertEqual(p.map_async(sqr, []).get(), [])
1281
1282 p.close()
1283 p.join()
1284
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001285 def test_release_task_refs(self):
1286 # Issue #29861: task arguments and results should not be kept
1287 # alive after we are done with them.
1288 objs = list(CountedObject() for i in range(10))
1289 refs = list(weakref.ref(o) for o in objs)
1290 self.pool.map(identity, objs)
1291
1292 del objs
1293 self.assertEqual(set(wr() for wr in refs), {None})
1294 # With a process pool, copies of the objects are returned, check
1295 # they were released too.
1296 self.assertEqual(CountedObject.n_instances, 0)
1297
1298
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001299def unpickleable_result():
1300 return lambda: 42
1301
1302class _TestPoolWorkerErrors(BaseTestCase):
1303 ALLOWED_TYPES = ('processes', )
1304
1305 def test_unpickleable_result(self):
1306 from multiprocessing.pool import MaybeEncodingError
1307 p = multiprocessing.Pool(2)
1308
1309 # Make sure we don't lose pool processes because of encoding errors.
1310 for iteration in range(20):
1311 res = p.apply_async(unpickleable_result)
1312 self.assertRaises(MaybeEncodingError, res.get)
1313
1314 p.close()
1315 p.join()
1316
Jesse Noller654ade32010-01-27 03:05:57 +00001317class _TestPoolWorkerLifetime(BaseTestCase):
1318
1319 ALLOWED_TYPES = ('processes', )
1320 def test_pool_worker_lifetime(self):
1321 p = multiprocessing.Pool(3, maxtasksperchild=10)
1322 self.assertEqual(3, len(p._pool))
1323 origworkerpids = [w.pid for w in p._pool]
1324 # Run many tasks so each worker gets replaced (hopefully)
1325 results = []
1326 for i in range(100):
1327 results.append(p.apply_async(sqr, (i, )))
1328 # Fetch the results and verify we got the right answers,
1329 # also ensuring all the tasks have completed.
1330 for (j, res) in enumerate(results):
1331 self.assertEqual(res.get(), sqr(j))
1332 # Refill the pool
1333 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001334 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001335 # (countdown * DELTA = 5 seconds max startup process time)
1336 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001337 while countdown and not all(w.is_alive() for w in p._pool):
1338 countdown -= 1
1339 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001340 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001341 # All pids should be assigned. See issue #7805.
1342 self.assertNotIn(None, origworkerpids)
1343 self.assertNotIn(None, finalworkerpids)
1344 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001345 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1346 p.close()
1347 p.join()
1348
Charles-François Natali46f990e2011-10-24 18:43:51 +02001349 def test_pool_worker_lifetime_early_close(self):
1350 # Issue #10332: closing a pool whose workers have limited lifetimes
1351 # before all the tasks completed would make join() hang.
1352 p = multiprocessing.Pool(3, maxtasksperchild=1)
1353 results = []
1354 for i in range(6):
1355 results.append(p.apply_async(sqr, (i, 0.3)))
1356 p.close()
1357 p.join()
1358 # check the results
1359 for (j, res) in enumerate(results):
1360 self.assertEqual(res.get(), sqr(j))
1361
1362
Benjamin Petersondfd79492008-06-13 19:13:39 +00001363#
1364# Test that manager has expected number of shared objects left
1365#
1366
1367class _TestZZZNumberOfObjects(BaseTestCase):
1368 # Because test cases are sorted alphabetically, this one will get
1369 # run after all the other tests for the manager. It tests that
1370 # there have been no "reference leaks" for the manager's shared
1371 # objects. Note the comment in _TestPool.test_terminate().
1372 ALLOWED_TYPES = ('manager',)
1373
1374 def test_number_of_objects(self):
1375 EXPECTED_NUMBER = 1 # the pool object is still alive
1376 multiprocessing.active_children() # discard dead process objs
1377 gc.collect() # do garbage collection
1378 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001379 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001380 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001381 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001382 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001383
1384 self.assertEqual(refs, EXPECTED_NUMBER)
1385
1386#
1387# Test of creating a customized manager class
1388#
1389
1390from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1391
1392class FooBar(object):
1393 def f(self):
1394 return 'f()'
1395 def g(self):
1396 raise ValueError
1397 def _h(self):
1398 return '_h()'
1399
1400def baz():
1401 for i in xrange(10):
1402 yield i*i
1403
1404class IteratorProxy(BaseProxy):
1405 _exposed_ = ('next', '__next__')
1406 def __iter__(self):
1407 return self
1408 def next(self):
1409 return self._callmethod('next')
1410 def __next__(self):
1411 return self._callmethod('__next__')
1412
1413class MyManager(BaseManager):
1414 pass
1415
1416MyManager.register('Foo', callable=FooBar)
1417MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1418MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1419
1420
1421class _TestMyManager(BaseTestCase):
1422
1423 ALLOWED_TYPES = ('manager',)
1424
1425 def test_mymanager(self):
1426 manager = MyManager()
1427 manager.start()
1428
1429 foo = manager.Foo()
1430 bar = manager.Bar()
1431 baz = manager.baz()
1432
1433 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1434 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1435
1436 self.assertEqual(foo_methods, ['f', 'g'])
1437 self.assertEqual(bar_methods, ['f', '_h'])
1438
1439 self.assertEqual(foo.f(), 'f()')
1440 self.assertRaises(ValueError, foo.g)
1441 self.assertEqual(foo._callmethod('f'), 'f()')
1442 self.assertRaises(RemoteError, foo._callmethod, '_h')
1443
1444 self.assertEqual(bar.f(), 'f()')
1445 self.assertEqual(bar._h(), '_h()')
1446 self.assertEqual(bar._callmethod('f'), 'f()')
1447 self.assertEqual(bar._callmethod('_h'), '_h()')
1448
1449 self.assertEqual(list(baz), [i*i for i in range(10)])
1450
1451 manager.shutdown()
1452
1453#
1454# Test of connecting to a remote server and using xmlrpclib for serialization
1455#
1456
1457_queue = Queue.Queue()
1458def get_queue():
1459 return _queue
1460
1461class QueueManager(BaseManager):
1462 '''manager class used by server process'''
1463QueueManager.register('get_queue', callable=get_queue)
1464
1465class QueueManager2(BaseManager):
1466 '''manager class which specifies the same interface as QueueManager'''
1467QueueManager2.register('get_queue')
1468
1469
1470SERIALIZER = 'xmlrpclib'
1471
1472class _TestRemoteManager(BaseTestCase):
1473
1474 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001475 values = ['hello world', None, True, 2.25,
1476 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1477 ]
1478 result = values[:]
1479 if test_support.have_unicode:
1480 #result[-1] = u'hall\xe5 v\xe4rlden'
1481 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1482 r'\u0441\u0432\u0456\u0442')
1483 values.append(uvalue)
1484 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001485
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001486 @classmethod
1487 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001488 manager = QueueManager2(
1489 address=address, authkey=authkey, serializer=SERIALIZER
1490 )
1491 manager.connect()
1492 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001493 # Note that xmlrpclib will deserialize object as a list not a tuple
1494 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001495
1496 def test_remote(self):
1497 authkey = os.urandom(32)
1498
1499 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001500 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001501 )
1502 manager.start()
1503
1504 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001505 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001506 p.start()
1507
1508 manager2 = QueueManager2(
1509 address=manager.address, authkey=authkey, serializer=SERIALIZER
1510 )
1511 manager2.connect()
1512 queue = manager2.get_queue()
1513
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001514 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001515
1516 # Because we are using xmlrpclib for serialization instead of
1517 # pickle this will cause a serialization error.
1518 self.assertRaises(Exception, queue.put, time.sleep)
1519
1520 # Make queue finalizer run before the server is stopped
1521 del queue
1522 manager.shutdown()
1523
Jesse Noller459a6482009-03-30 15:50:42 +00001524class _TestManagerRestart(BaseTestCase):
1525
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001526 @classmethod
1527 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001528 manager = QueueManager(
1529 address=address, authkey=authkey, serializer=SERIALIZER)
1530 manager.connect()
1531 queue = manager.get_queue()
1532 queue.put('hello world')
1533
1534 def test_rapid_restart(self):
1535 authkey = os.urandom(32)
1536 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001537 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001538 srvr = manager.get_server()
1539 addr = srvr.address
1540 # Close the connection.Listener socket which gets opened as a part
1541 # of manager.get_server(). It's not needed for the test.
1542 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001543 manager.start()
1544
1545 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001546 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001547 p.start()
1548 queue = manager.get_queue()
1549 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001550 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001551 manager.shutdown()
1552 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001553 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001554 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001555 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001556
Benjamin Petersondfd79492008-06-13 19:13:39 +00001557#
1558#
1559#
1560
1561SENTINEL = latin('')
1562
1563class _TestConnection(BaseTestCase):
1564
1565 ALLOWED_TYPES = ('processes', 'threads')
1566
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001567 @classmethod
1568 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001569 for msg in iter(conn.recv_bytes, SENTINEL):
1570 conn.send_bytes(msg)
1571 conn.close()
1572
1573 def test_connection(self):
1574 conn, child_conn = self.Pipe()
1575
1576 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001577 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001578 p.start()
1579
1580 seq = [1, 2.25, None]
1581 msg = latin('hello world')
1582 longmsg = msg * 10
1583 arr = array.array('i', range(4))
1584
1585 if self.TYPE == 'processes':
1586 self.assertEqual(type(conn.fileno()), int)
1587
1588 self.assertEqual(conn.send(seq), None)
1589 self.assertEqual(conn.recv(), seq)
1590
1591 self.assertEqual(conn.send_bytes(msg), None)
1592 self.assertEqual(conn.recv_bytes(), msg)
1593
1594 if self.TYPE == 'processes':
1595 buffer = array.array('i', [0]*10)
1596 expected = list(arr) + [0] * (10 - len(arr))
1597 self.assertEqual(conn.send_bytes(arr), None)
1598 self.assertEqual(conn.recv_bytes_into(buffer),
1599 len(arr) * buffer.itemsize)
1600 self.assertEqual(list(buffer), expected)
1601
1602 buffer = array.array('i', [0]*10)
1603 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1604 self.assertEqual(conn.send_bytes(arr), None)
1605 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1606 len(arr) * buffer.itemsize)
1607 self.assertEqual(list(buffer), expected)
1608
1609 buffer = bytearray(latin(' ' * 40))
1610 self.assertEqual(conn.send_bytes(longmsg), None)
1611 try:
1612 res = conn.recv_bytes_into(buffer)
1613 except multiprocessing.BufferTooShort, e:
1614 self.assertEqual(e.args, (longmsg,))
1615 else:
1616 self.fail('expected BufferTooShort, got %s' % res)
1617
1618 poll = TimingWrapper(conn.poll)
1619
1620 self.assertEqual(poll(), False)
1621 self.assertTimingAlmostEqual(poll.elapsed, 0)
1622
1623 self.assertEqual(poll(TIMEOUT1), False)
1624 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1625
1626 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001627 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001628
1629 self.assertEqual(poll(TIMEOUT1), True)
1630 self.assertTimingAlmostEqual(poll.elapsed, 0)
1631
1632 self.assertEqual(conn.recv(), None)
1633
1634 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1635 conn.send_bytes(really_big_msg)
1636 self.assertEqual(conn.recv_bytes(), really_big_msg)
1637
1638 conn.send_bytes(SENTINEL) # tell child to quit
1639 child_conn.close()
1640
1641 if self.TYPE == 'processes':
1642 self.assertEqual(conn.readable, True)
1643 self.assertEqual(conn.writable, True)
1644 self.assertRaises(EOFError, conn.recv)
1645 self.assertRaises(EOFError, conn.recv_bytes)
1646
1647 p.join()
1648
1649 def test_duplex_false(self):
1650 reader, writer = self.Pipe(duplex=False)
1651 self.assertEqual(writer.send(1), None)
1652 self.assertEqual(reader.recv(), 1)
1653 if self.TYPE == 'processes':
1654 self.assertEqual(reader.readable, True)
1655 self.assertEqual(reader.writable, False)
1656 self.assertEqual(writer.readable, False)
1657 self.assertEqual(writer.writable, True)
1658 self.assertRaises(IOError, reader.send, 2)
1659 self.assertRaises(IOError, writer.recv)
1660 self.assertRaises(IOError, writer.poll)
1661
1662 def test_spawn_close(self):
1663 # We test that a pipe connection can be closed by parent
1664 # process immediately after child is spawned. On Windows this
1665 # would have sometimes failed on old versions because
1666 # child_conn would be closed before the child got a chance to
1667 # duplicate it.
1668 conn, child_conn = self.Pipe()
1669
1670 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001671 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001672 p.start()
1673 child_conn.close() # this might complete before child initializes
1674
1675 msg = latin('hello')
1676 conn.send_bytes(msg)
1677 self.assertEqual(conn.recv_bytes(), msg)
1678
1679 conn.send_bytes(SENTINEL)
1680 conn.close()
1681 p.join()
1682
1683 def test_sendbytes(self):
1684 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001685 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001686
1687 msg = latin('abcdefghijklmnopqrstuvwxyz')
1688 a, b = self.Pipe()
1689
1690 a.send_bytes(msg)
1691 self.assertEqual(b.recv_bytes(), msg)
1692
1693 a.send_bytes(msg, 5)
1694 self.assertEqual(b.recv_bytes(), msg[5:])
1695
1696 a.send_bytes(msg, 7, 8)
1697 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1698
1699 a.send_bytes(msg, 26)
1700 self.assertEqual(b.recv_bytes(), latin(''))
1701
1702 a.send_bytes(msg, 26, 0)
1703 self.assertEqual(b.recv_bytes(), latin(''))
1704
1705 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1706
1707 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1708
1709 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1710
1711 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1712
1713 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1714
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001715 @classmethod
1716 def _is_fd_assigned(cls, fd):
1717 try:
1718 os.fstat(fd)
1719 except OSError as e:
1720 if e.errno == errno.EBADF:
1721 return False
1722 raise
1723 else:
1724 return True
1725
1726 @classmethod
1727 def _writefd(cls, conn, data, create_dummy_fds=False):
1728 if create_dummy_fds:
1729 for i in range(0, 256):
1730 if not cls._is_fd_assigned(i):
1731 os.dup2(conn.fileno(), i)
1732 fd = reduction.recv_handle(conn)
1733 if msvcrt:
1734 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1735 os.write(fd, data)
1736 os.close(fd)
1737
Charles-François Natalif8413b22011-09-21 18:44:49 +02001738 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001739 def test_fd_transfer(self):
1740 if self.TYPE != 'processes':
1741 self.skipTest("only makes sense with processes")
1742 conn, child_conn = self.Pipe(duplex=True)
1743
1744 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001745 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001746 p.start()
1747 with open(test_support.TESTFN, "wb") as f:
1748 fd = f.fileno()
1749 if msvcrt:
1750 fd = msvcrt.get_osfhandle(fd)
1751 reduction.send_handle(conn, fd, p.pid)
1752 p.join()
1753 with open(test_support.TESTFN, "rb") as f:
1754 self.assertEqual(f.read(), b"foo")
1755
Charles-François Natalif8413b22011-09-21 18:44:49 +02001756 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001757 @unittest.skipIf(sys.platform == "win32",
1758 "test semantics don't make sense on Windows")
1759 @unittest.skipIf(MAXFD <= 256,
1760 "largest assignable fd number is too small")
1761 @unittest.skipUnless(hasattr(os, "dup2"),
1762 "test needs os.dup2()")
1763 def test_large_fd_transfer(self):
1764 # With fd > 256 (issue #11657)
1765 if self.TYPE != 'processes':
1766 self.skipTest("only makes sense with processes")
1767 conn, child_conn = self.Pipe(duplex=True)
1768
1769 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001770 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001771 p.start()
1772 with open(test_support.TESTFN, "wb") as f:
1773 fd = f.fileno()
1774 for newfd in range(256, MAXFD):
1775 if not self._is_fd_assigned(newfd):
1776 break
1777 else:
1778 self.fail("could not find an unassigned large file descriptor")
1779 os.dup2(fd, newfd)
1780 try:
1781 reduction.send_handle(conn, newfd, p.pid)
1782 finally:
1783 os.close(newfd)
1784 p.join()
1785 with open(test_support.TESTFN, "rb") as f:
1786 self.assertEqual(f.read(), b"bar")
1787
Jesus Ceac23484b2011-09-21 03:47:39 +02001788 @classmethod
1789 def _send_data_without_fd(self, conn):
1790 os.write(conn.fileno(), b"\0")
1791
Charles-François Natalif8413b22011-09-21 18:44:49 +02001792 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001793 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1794 def test_missing_fd_transfer(self):
1795 # Check that exception is raised when received data is not
1796 # accompanied by a file descriptor in ancillary data.
1797 if self.TYPE != 'processes':
1798 self.skipTest("only makes sense with processes")
1799 conn, child_conn = self.Pipe(duplex=True)
1800
1801 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1802 p.daemon = True
1803 p.start()
1804 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1805 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001806
Benjamin Petersondfd79492008-06-13 19:13:39 +00001807class _TestListenerClient(BaseTestCase):
1808
1809 ALLOWED_TYPES = ('processes', 'threads')
1810
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001811 @classmethod
1812 def _test(cls, address):
1813 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001814 conn.send('hello')
1815 conn.close()
1816
1817 def test_listener_client(self):
1818 for family in self.connection.families:
1819 l = self.connection.Listener(family=family)
1820 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001821 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001822 p.start()
1823 conn = l.accept()
1824 self.assertEqual(conn.recv(), 'hello')
1825 p.join()
1826 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001827
1828 def test_issue14725(self):
1829 l = self.connection.Listener()
1830 p = self.Process(target=self._test, args=(l.address,))
1831 p.daemon = True
1832 p.start()
1833 time.sleep(1)
1834 # On Windows the client process should by now have connected,
1835 # written data and closed the pipe handle by now. This causes
1836 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1837 # 14725.
1838 conn = l.accept()
1839 self.assertEqual(conn.recv(), 'hello')
1840 conn.close()
1841 p.join()
1842 l.close()
1843
Benjamin Petersondfd79492008-06-13 19:13:39 +00001844#
1845# Test of sending connection and socket objects between processes
1846#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001847"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001848class _TestPicklingConnections(BaseTestCase):
1849
1850 ALLOWED_TYPES = ('processes',)
1851
1852 def _listener(self, conn, families):
1853 for fam in families:
1854 l = self.connection.Listener(family=fam)
1855 conn.send(l.address)
1856 new_conn = l.accept()
1857 conn.send(new_conn)
1858
1859 if self.TYPE == 'processes':
1860 l = socket.socket()
1861 l.bind(('localhost', 0))
1862 conn.send(l.getsockname())
1863 l.listen(1)
1864 new_conn, addr = l.accept()
1865 conn.send(new_conn)
1866
1867 conn.recv()
1868
1869 def _remote(self, conn):
1870 for (address, msg) in iter(conn.recv, None):
1871 client = self.connection.Client(address)
1872 client.send(msg.upper())
1873 client.close()
1874
1875 if self.TYPE == 'processes':
1876 address, msg = conn.recv()
1877 client = socket.socket()
1878 client.connect(address)
1879 client.sendall(msg.upper())
1880 client.close()
1881
1882 conn.close()
1883
1884 def test_pickling(self):
1885 try:
1886 multiprocessing.allow_connection_pickling()
1887 except ImportError:
1888 return
1889
1890 families = self.connection.families
1891
1892 lconn, lconn0 = self.Pipe()
1893 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001894 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001895 lp.start()
1896 lconn0.close()
1897
1898 rconn, rconn0 = self.Pipe()
1899 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001900 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001901 rp.start()
1902 rconn0.close()
1903
1904 for fam in families:
1905 msg = ('This connection uses family %s' % fam).encode('ascii')
1906 address = lconn.recv()
1907 rconn.send((address, msg))
1908 new_conn = lconn.recv()
1909 self.assertEqual(new_conn.recv(), msg.upper())
1910
1911 rconn.send(None)
1912
1913 if self.TYPE == 'processes':
1914 msg = latin('This connection uses a normal socket')
1915 address = lconn.recv()
1916 rconn.send((address, msg))
1917 if hasattr(socket, 'fromfd'):
1918 new_conn = lconn.recv()
1919 self.assertEqual(new_conn.recv(100), msg.upper())
1920 else:
1921 # XXX On Windows with Py2.6 need to backport fromfd()
1922 discard = lconn.recv_bytes()
1923
1924 lconn.send(None)
1925
1926 rconn.close()
1927 lconn.close()
1928
1929 lp.join()
1930 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001931"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001932#
1933#
1934#
1935
1936class _TestHeap(BaseTestCase):
1937
1938 ALLOWED_TYPES = ('processes',)
1939
1940 def test_heap(self):
1941 iterations = 5000
1942 maxblocks = 50
1943 blocks = []
1944
1945 # create and destroy lots of blocks of different sizes
1946 for i in xrange(iterations):
1947 size = int(random.lognormvariate(0, 1) * 1000)
1948 b = multiprocessing.heap.BufferWrapper(size)
1949 blocks.append(b)
1950 if len(blocks) > maxblocks:
1951 i = random.randrange(maxblocks)
1952 del blocks[i]
1953
1954 # get the heap object
1955 heap = multiprocessing.heap.BufferWrapper._heap
1956
1957 # verify the state of the heap
1958 all = []
1959 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001960 heap._lock.acquire()
1961 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001962 for L in heap._len_to_seq.values():
1963 for arena, start, stop in L:
1964 all.append((heap._arenas.index(arena), start, stop,
1965 stop-start, 'free'))
1966 for arena, start, stop in heap._allocated_blocks:
1967 all.append((heap._arenas.index(arena), start, stop,
1968 stop-start, 'occupied'))
1969 occupied += (stop-start)
1970
1971 all.sort()
1972
1973 for i in range(len(all)-1):
1974 (arena, start, stop) = all[i][:3]
1975 (narena, nstart, nstop) = all[i+1][:3]
1976 self.assertTrue((arena != narena and nstart == 0) or
1977 (stop == nstart))
1978
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001979 def test_free_from_gc(self):
1980 # Check that freeing of blocks by the garbage collector doesn't deadlock
1981 # (issue #12352).
1982 # Make sure the GC is enabled, and set lower collection thresholds to
1983 # make collections more frequent (and increase the probability of
1984 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001985 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001986 gc.enable()
1987 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001988 thresholds = gc.get_threshold()
1989 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001990 gc.set_threshold(10)
1991
1992 # perform numerous block allocations, with cyclic references to make
1993 # sure objects are collected asynchronously by the gc
1994 for i in range(5000):
1995 a = multiprocessing.heap.BufferWrapper(1)
1996 b = multiprocessing.heap.BufferWrapper(1)
1997 # circular references
1998 a.buddy = b
1999 b.buddy = a
2000
Benjamin Petersondfd79492008-06-13 19:13:39 +00002001#
2002#
2003#
2004
Benjamin Petersondfd79492008-06-13 19:13:39 +00002005class _Foo(Structure):
2006 _fields_ = [
2007 ('x', c_int),
2008 ('y', c_double)
2009 ]
2010
2011class _TestSharedCTypes(BaseTestCase):
2012
2013 ALLOWED_TYPES = ('processes',)
2014
Antoine Pitrou55d935a2010-11-22 16:35:57 +00002015 def setUp(self):
2016 if not HAS_SHAREDCTYPES:
2017 self.skipTest("requires multiprocessing.sharedctypes")
2018
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002019 @classmethod
2020 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002021 x.value *= 2
2022 y.value *= 2
2023 foo.x *= 2
2024 foo.y *= 2
2025 string.value *= 2
2026 for i in range(len(arr)):
2027 arr[i] *= 2
2028
2029 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002030 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002031 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002032 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002033 arr = self.Array('d', range(10), lock=lock)
2034 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00002035 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002036
2037 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002038 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002039 p.start()
2040 p.join()
2041
2042 self.assertEqual(x.value, 14)
2043 self.assertAlmostEqual(y.value, 2.0/3.0)
2044 self.assertEqual(foo.x, 6)
2045 self.assertAlmostEqual(foo.y, 4.0)
2046 for i in range(10):
2047 self.assertAlmostEqual(arr[i], i*2)
2048 self.assertEqual(string.value, latin('hellohello'))
2049
2050 def test_synchronize(self):
2051 self.test_sharedctypes(lock=True)
2052
2053 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002054 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00002055 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002056 foo.x = 0
2057 foo.y = 0
2058 self.assertEqual(bar.x, 2)
2059 self.assertAlmostEqual(bar.y, 5.0)
2060
2061#
2062#
2063#
2064
2065class _TestFinalize(BaseTestCase):
2066
2067 ALLOWED_TYPES = ('processes',)
2068
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002069 @classmethod
2070 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002071 class Foo(object):
2072 pass
2073
2074 a = Foo()
2075 util.Finalize(a, conn.send, args=('a',))
2076 del a # triggers callback for a
2077
2078 b = Foo()
2079 close_b = util.Finalize(b, conn.send, args=('b',))
2080 close_b() # triggers callback for b
2081 close_b() # does nothing because callback has already been called
2082 del b # does nothing because callback has already been called
2083
2084 c = Foo()
2085 util.Finalize(c, conn.send, args=('c',))
2086
2087 d10 = Foo()
2088 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2089
2090 d01 = Foo()
2091 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2092 d02 = Foo()
2093 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2094 d03 = Foo()
2095 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2096
2097 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2098
2099 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2100
Ezio Melottic2077b02011-03-16 12:34:31 +02002101 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002102 # garbage collecting locals
2103 util._exit_function()
2104 conn.close()
2105 os._exit(0)
2106
2107 def test_finalize(self):
2108 conn, child_conn = self.Pipe()
2109
2110 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002111 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002112 p.start()
2113 p.join()
2114
2115 result = [obj for obj in iter(conn.recv, 'STOP')]
2116 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2117
2118#
2119# Test that from ... import * works for each module
2120#
2121
2122class _TestImportStar(BaseTestCase):
2123
2124 ALLOWED_TYPES = ('processes',)
2125
2126 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002127 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002128 'multiprocessing', 'multiprocessing.connection',
2129 'multiprocessing.heap', 'multiprocessing.managers',
2130 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002131 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002132 ]
2133
Charles-François Natalif8413b22011-09-21 18:44:49 +02002134 if HAS_REDUCTION:
2135 modules.append('multiprocessing.reduction')
2136
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002137 if c_int is not None:
2138 # This module requires _ctypes
2139 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002140
2141 for name in modules:
2142 __import__(name)
2143 mod = sys.modules[name]
2144
2145 for attr in getattr(mod, '__all__', ()):
2146 self.assertTrue(
2147 hasattr(mod, attr),
2148 '%r does not have attribute %r' % (mod, attr)
2149 )
2150
2151#
2152# Quick test that logging works -- does not test logging output
2153#
2154
2155class _TestLogging(BaseTestCase):
2156
2157 ALLOWED_TYPES = ('processes',)
2158
2159 def test_enable_logging(self):
2160 logger = multiprocessing.get_logger()
2161 logger.setLevel(util.SUBWARNING)
2162 self.assertTrue(logger is not None)
2163 logger.debug('this will not be printed')
2164 logger.info('nor will this')
2165 logger.setLevel(LOG_LEVEL)
2166
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002167 @classmethod
2168 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002169 logger = multiprocessing.get_logger()
2170 conn.send(logger.getEffectiveLevel())
2171
2172 def test_level(self):
2173 LEVEL1 = 32
2174 LEVEL2 = 37
2175
2176 logger = multiprocessing.get_logger()
2177 root_logger = logging.getLogger()
2178 root_level = root_logger.level
2179
2180 reader, writer = multiprocessing.Pipe(duplex=False)
2181
2182 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002183 p = self.Process(target=self._test_level, args=(writer,))
2184 p.daemon = True
2185 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002186 self.assertEqual(LEVEL1, reader.recv())
2187
2188 logger.setLevel(logging.NOTSET)
2189 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002190 p = self.Process(target=self._test_level, args=(writer,))
2191 p.daemon = True
2192 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002193 self.assertEqual(LEVEL2, reader.recv())
2194
2195 root_logger.setLevel(root_level)
2196 logger.setLevel(level=LOG_LEVEL)
2197
Jesse Noller814d02d2009-11-21 14:38:23 +00002198
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002199# class _TestLoggingProcessName(BaseTestCase):
2200#
2201# def handle(self, record):
2202# assert record.processName == multiprocessing.current_process().name
2203# self.__handled = True
2204#
2205# def test_logging(self):
2206# handler = logging.Handler()
2207# handler.handle = self.handle
2208# self.__handled = False
2209# # Bypass getLogger() and side-effects
2210# logger = logging.getLoggerClass()(
2211# 'multiprocessing.test.TestLoggingProcessName')
2212# logger.addHandler(handler)
2213# logger.propagate = False
2214#
2215# logger.warn('foo')
2216# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002217
Benjamin Petersondfd79492008-06-13 19:13:39 +00002218#
Richard Oudkerkba482642013-02-26 12:37:07 +00002219# Check that Process.join() retries if os.waitpid() fails with EINTR
2220#
2221
2222class _TestPollEintr(BaseTestCase):
2223
2224 ALLOWED_TYPES = ('processes',)
2225
2226 @classmethod
2227 def _killer(cls, pid):
2228 time.sleep(0.5)
2229 os.kill(pid, signal.SIGUSR1)
2230
2231 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2232 def test_poll_eintr(self):
2233 got_signal = [False]
2234 def record(*args):
2235 got_signal[0] = True
2236 pid = os.getpid()
2237 oldhandler = signal.signal(signal.SIGUSR1, record)
2238 try:
2239 killer = self.Process(target=self._killer, args=(pid,))
2240 killer.start()
2241 p = self.Process(target=time.sleep, args=(1,))
2242 p.start()
2243 p.join()
2244 self.assertTrue(got_signal[0])
2245 self.assertEqual(p.exitcode, 0)
2246 killer.join()
2247 finally:
2248 signal.signal(signal.SIGUSR1, oldhandler)
2249
2250#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002251# Test to verify handle verification, see issue 3321
2252#
2253
2254class TestInvalidHandle(unittest.TestCase):
2255
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002256 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002257 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002258 conn = _multiprocessing.Connection(44977608)
2259 self.assertRaises(IOError, conn.poll)
2260 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002261
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002262#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002263# Functions used to create test cases from the base ones in this module
2264#
2265
2266def get_attributes(Source, names):
2267 d = {}
2268 for name in names:
2269 obj = getattr(Source, name)
2270 if type(obj) == type(get_attributes):
2271 obj = staticmethod(obj)
2272 d[name] = obj
2273 return d
2274
2275def create_test_cases(Mixin, type):
2276 result = {}
2277 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002278 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002279
2280 for name in glob.keys():
2281 if name.startswith('_Test'):
2282 base = glob[name]
2283 if type in base.ALLOWED_TYPES:
2284 newname = 'With' + Type + name[1:]
2285 class Temp(base, unittest.TestCase, Mixin):
2286 pass
2287 result[newname] = Temp
2288 Temp.__name__ = newname
2289 Temp.__module__ = Mixin.__module__
2290 return result
2291
2292#
2293# Create test cases
2294#
2295
2296class ProcessesMixin(object):
2297 TYPE = 'processes'
2298 Process = multiprocessing.Process
2299 locals().update(get_attributes(multiprocessing, (
2300 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2301 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2302 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002303 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002304 )))
2305
2306testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2307globals().update(testcases_processes)
2308
2309
2310class ManagerMixin(object):
2311 TYPE = 'manager'
2312 Process = multiprocessing.Process
2313 manager = object.__new__(multiprocessing.managers.SyncManager)
2314 locals().update(get_attributes(manager, (
2315 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2316 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002317 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002318 )))
2319
2320testcases_manager = create_test_cases(ManagerMixin, type='manager')
2321globals().update(testcases_manager)
2322
2323
2324class ThreadsMixin(object):
2325 TYPE = 'threads'
2326 Process = multiprocessing.dummy.Process
2327 locals().update(get_attributes(multiprocessing.dummy, (
2328 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2329 'Condition', 'Event', 'Value', 'Array', 'current_process',
2330 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002331 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002332 )))
2333
2334testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2335globals().update(testcases_threads)
2336
Neal Norwitz0c519b32008-08-25 01:50:24 +00002337class OtherTest(unittest.TestCase):
2338 # TODO: add more tests for deliver/answer challenge.
2339 def test_deliver_challenge_auth_failure(self):
2340 class _FakeConnection(object):
2341 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002342 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002343 def send_bytes(self, data):
2344 pass
2345 self.assertRaises(multiprocessing.AuthenticationError,
2346 multiprocessing.connection.deliver_challenge,
2347 _FakeConnection(), b'abc')
2348
2349 def test_answer_challenge_auth_failure(self):
2350 class _FakeConnection(object):
2351 def __init__(self):
2352 self.count = 0
2353 def recv_bytes(self, size):
2354 self.count += 1
2355 if self.count == 1:
2356 return multiprocessing.connection.CHALLENGE
2357 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002358 return b'something bogus'
2359 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002360 def send_bytes(self, data):
2361 pass
2362 self.assertRaises(multiprocessing.AuthenticationError,
2363 multiprocessing.connection.answer_challenge,
2364 _FakeConnection(), b'abc')
2365
Jesse Noller7152f6d2009-04-02 05:17:26 +00002366#
2367# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2368#
2369
2370def initializer(ns):
2371 ns.test += 1
2372
2373class TestInitializers(unittest.TestCase):
2374 def setUp(self):
2375 self.mgr = multiprocessing.Manager()
2376 self.ns = self.mgr.Namespace()
2377 self.ns.test = 0
2378
2379 def tearDown(self):
2380 self.mgr.shutdown()
2381
2382 def test_manager_initializer(self):
2383 m = multiprocessing.managers.SyncManager()
2384 self.assertRaises(TypeError, m.start, 1)
2385 m.start(initializer, (self.ns,))
2386 self.assertEqual(self.ns.test, 1)
2387 m.shutdown()
2388
2389 def test_pool_initializer(self):
2390 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2391 p = multiprocessing.Pool(1, initializer, (self.ns,))
2392 p.close()
2393 p.join()
2394 self.assertEqual(self.ns.test, 1)
2395
Jesse Noller1b90efb2009-06-30 17:11:52 +00002396#
2397# Issue 5155, 5313, 5331: Test process in processes
2398# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2399#
2400
Richard Oudkerkc5496072013-09-29 17:10:40 +01002401def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002402 try:
2403 item = q.get(block=False)
2404 except Queue.Empty:
2405 pass
2406
Richard Oudkerkc5496072013-09-29 17:10:40 +01002407def _test_process(q):
2408 queue = multiprocessing.Queue()
2409 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2410 subProc.daemon = True
2411 subProc.start()
2412 subProc.join()
2413
Jesse Noller1b90efb2009-06-30 17:11:52 +00002414def _afunc(x):
2415 return x*x
2416
2417def pool_in_process():
2418 pool = multiprocessing.Pool(processes=4)
2419 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2420
2421class _file_like(object):
2422 def __init__(self, delegate):
2423 self._delegate = delegate
2424 self._pid = None
2425
2426 @property
2427 def cache(self):
2428 pid = os.getpid()
2429 # There are no race conditions since fork keeps only the running thread
2430 if pid != self._pid:
2431 self._pid = pid
2432 self._cache = []
2433 return self._cache
2434
2435 def write(self, data):
2436 self.cache.append(data)
2437
2438 def flush(self):
2439 self._delegate.write(''.join(self.cache))
2440 self._cache = []
2441
2442class TestStdinBadfiledescriptor(unittest.TestCase):
2443
2444 def test_queue_in_process(self):
2445 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002446 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002447 proc.start()
2448 proc.join()
2449
2450 def test_pool_in_process(self):
2451 p = multiprocessing.Process(target=pool_in_process)
2452 p.start()
2453 p.join()
2454
2455 def test_flushing(self):
2456 sio = StringIO()
2457 flike = _file_like(sio)
2458 flike.write('foo')
2459 proc = multiprocessing.Process(target=lambda: flike.flush())
2460 flike.flush()
2461 assert sio.getvalue() == 'foo'
2462
Richard Oudkerke4b99382012-07-27 14:05:46 +01002463#
2464# Test interaction with socket timeouts - see Issue #6056
2465#
2466
2467class TestTimeouts(unittest.TestCase):
2468 @classmethod
2469 def _test_timeout(cls, child, address):
2470 time.sleep(1)
2471 child.send(123)
2472 child.close()
2473 conn = multiprocessing.connection.Client(address)
2474 conn.send(456)
2475 conn.close()
2476
2477 def test_timeout(self):
2478 old_timeout = socket.getdefaulttimeout()
2479 try:
2480 socket.setdefaulttimeout(0.1)
2481 parent, child = multiprocessing.Pipe(duplex=True)
2482 l = multiprocessing.connection.Listener(family='AF_INET')
2483 p = multiprocessing.Process(target=self._test_timeout,
2484 args=(child, l.address))
2485 p.start()
2486 child.close()
2487 self.assertEqual(parent.recv(), 123)
2488 parent.close()
2489 conn = l.accept()
2490 self.assertEqual(conn.recv(), 456)
2491 conn.close()
2492 l.close()
2493 p.join(10)
2494 finally:
2495 socket.setdefaulttimeout(old_timeout)
2496
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002497#
2498# Test what happens with no "if __name__ == '__main__'"
2499#
2500
2501class TestNoForkBomb(unittest.TestCase):
2502 def test_noforkbomb(self):
2503 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2504 if WIN32:
2505 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002506 self.assertEqual(out, '')
2507 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002508 else:
2509 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002510 self.assertEqual(out.rstrip(), '123')
2511 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002512
2513#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002514# Issue 12098: check sys.flags of child matches that for parent
2515#
2516
2517class TestFlags(unittest.TestCase):
2518 @classmethod
2519 def run_in_grandchild(cls, conn):
2520 conn.send(tuple(sys.flags))
2521
2522 @classmethod
2523 def run_in_child(cls):
2524 import json
2525 r, w = multiprocessing.Pipe(duplex=False)
2526 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2527 p.start()
2528 grandchild_flags = r.recv()
2529 p.join()
2530 r.close()
2531 w.close()
2532 flags = (tuple(sys.flags), grandchild_flags)
2533 print(json.dumps(flags))
2534
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002535 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002536 def test_flags(self):
2537 import json, subprocess
2538 # start child process using unusual flags
2539 prog = ('from test.test_multiprocessing import TestFlags; ' +
2540 'TestFlags.run_in_child()')
2541 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002542 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002543 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2544 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002545
2546#
2547# Issue #17555: ForkAwareThreadLock
2548#
2549
2550class TestForkAwareThreadLock(unittest.TestCase):
2551 # We recurisvely start processes. Issue #17555 meant that the
2552 # after fork registry would get duplicate entries for the same
2553 # lock. The size of the registry at generation n was ~2**n.
2554
2555 @classmethod
2556 def child(cls, n, conn):
2557 if n > 1:
2558 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2559 p.start()
2560 p.join()
2561 else:
2562 conn.send(len(util._afterfork_registry))
2563 conn.close()
2564
2565 def test_lock(self):
2566 r, w = multiprocessing.Pipe(False)
2567 l = util.ForkAwareThreadLock()
2568 old_size = len(util._afterfork_registry)
2569 p = multiprocessing.Process(target=self.child, args=(5, w))
2570 p.start()
2571 new_size = r.recv()
2572 p.join()
2573 self.assertLessEqual(new_size, old_size)
2574
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002575#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002576# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2577#
2578
2579class TestIgnoreEINTR(unittest.TestCase):
2580
2581 @classmethod
2582 def _test_ignore(cls, conn):
2583 def handler(signum, frame):
2584 pass
2585 signal.signal(signal.SIGUSR1, handler)
2586 conn.send('ready')
2587 x = conn.recv()
2588 conn.send(x)
2589 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2590
2591 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2592 def test_ignore(self):
2593 conn, child_conn = multiprocessing.Pipe()
2594 try:
2595 p = multiprocessing.Process(target=self._test_ignore,
2596 args=(child_conn,))
2597 p.daemon = True
2598 p.start()
2599 child_conn.close()
2600 self.assertEqual(conn.recv(), 'ready')
2601 time.sleep(0.1)
2602 os.kill(p.pid, signal.SIGUSR1)
2603 time.sleep(0.1)
2604 conn.send(1234)
2605 self.assertEqual(conn.recv(), 1234)
2606 time.sleep(0.1)
2607 os.kill(p.pid, signal.SIGUSR1)
2608 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2609 time.sleep(0.1)
2610 p.join()
2611 finally:
2612 conn.close()
2613
2614 @classmethod
2615 def _test_ignore_listener(cls, conn):
2616 def handler(signum, frame):
2617 pass
2618 signal.signal(signal.SIGUSR1, handler)
2619 l = multiprocessing.connection.Listener()
2620 conn.send(l.address)
2621 a = l.accept()
2622 a.send('welcome')
2623
2624 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2625 def test_ignore_listener(self):
2626 conn, child_conn = multiprocessing.Pipe()
2627 try:
2628 p = multiprocessing.Process(target=self._test_ignore_listener,
2629 args=(child_conn,))
2630 p.daemon = True
2631 p.start()
2632 child_conn.close()
2633 address = conn.recv()
2634 time.sleep(0.1)
2635 os.kill(p.pid, signal.SIGUSR1)
2636 time.sleep(0.1)
2637 client = multiprocessing.connection.Client(address)
2638 self.assertEqual(client.recv(), 'welcome')
2639 p.join()
2640 finally:
2641 conn.close()
2642
2643#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002644#
2645#
2646
Jesse Noller1b90efb2009-06-30 17:11:52 +00002647testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002648 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002649 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002650
Benjamin Petersondfd79492008-06-13 19:13:39 +00002651#
2652#
2653#
2654
2655def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002656 if sys.platform.startswith("linux"):
2657 try:
2658 lock = multiprocessing.RLock()
2659 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002660 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002661
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002662 check_enough_semaphores()
2663
Benjamin Petersondfd79492008-06-13 19:13:39 +00002664 if run is None:
2665 from test.test_support import run_unittest as run
2666
2667 util.get_temp_dir() # creates temp directory for use by all processes
2668
2669 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2670
Jesse Noller146b7ab2008-07-02 16:44:09 +00002671 ProcessesMixin.pool = multiprocessing.Pool(4)
2672 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2673 ManagerMixin.manager.__init__()
2674 ManagerMixin.manager.start()
2675 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002676
2677 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002678 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2679 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002680 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2681 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002682 )
2683
2684 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2685 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002686 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2687 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002688 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002689 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002690 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002691 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2692 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2693 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002694 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002695
Jesse Noller146b7ab2008-07-02 16:44:09 +00002696 ThreadsMixin.pool.terminate()
2697 ProcessesMixin.pool.terminate()
2698 ManagerMixin.pool.terminate()
2699 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002700
Jesse Noller146b7ab2008-07-02 16:44:09 +00002701 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002702
2703def main():
2704 test_main(unittest.TextTestRunner(verbosity=2).run)
2705
2706if __name__ == '__main__':
2707 main()