blob: 8a6bb780f540ae0d895e2fe620962a2286829a8d [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Antoine Pitrou5084ff72017-03-24 16:03:46 +010017import weakref
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010018import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
178class _TestProcess(BaseTestCase):
179
180 ALLOWED_TYPES = ('processes', 'threads')
181
182 def test_current(self):
183 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600184 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000185
186 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000191 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEqual(current.ident, os.getpid())
194 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 q.put(args)
200 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000223 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225
226 p.start()
227
Ezio Melotti2623a372010-11-21 13:34:58 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
Ezio Melotti2623a372010-11-21 13:34:58 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.join()
240
Ezio Melotti2623a372010-11-21 13:34:58 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600251 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000254 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000258 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
271
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288
Jesus Cea6f6016b2011-09-09 20:26:57 +0200289 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000290 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000291 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000292
293 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000294 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000295
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000296 @classmethod
297 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298 from multiprocessing import forking
299 wconn.send(id)
300 if len(id) < 2:
301 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 p = cls.Process(
303 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 )
305 p.start()
306 p.join()
307
308 def test_recursion(self):
309 rconn, wconn = self.Pipe(duplex=False)
310 self._test_recursion(wconn, [])
311
312 time.sleep(DELTA)
313 result = []
314 while rconn.poll():
315 result.append(rconn.recv())
316
317 expected = [
318 [],
319 [0],
320 [0, 0],
321 [0, 1],
322 [1],
323 [1, 0],
324 [1, 1]
325 ]
326 self.assertEqual(result, expected)
327
Richard Oudkerk2182e052012-06-06 19:01:14 +0100328 @classmethod
329 def _test_sys_exit(cls, reason, testfn):
330 sys.stderr = open(testfn, 'w')
331 sys.exit(reason)
332
333 def test_sys_exit(self):
334 # See Issue 13854
335 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600336 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100337
338 testfn = test_support.TESTFN
339 self.addCleanup(test_support.unlink, testfn)
340
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000341 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100342 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
343 p.daemon = True
344 p.start()
345 p.join(5)
346 self.assertEqual(p.exitcode, code)
347
348 with open(testfn, 'r') as f:
349 self.assertEqual(f.read().rstrip(), str(reason))
350
351 for reason in (True, False, 8):
352 p = self.Process(target=sys.exit, args=(reason,))
353 p.daemon = True
354 p.start()
355 p.join(5)
356 self.assertEqual(p.exitcode, reason)
357
Benjamin Petersondfd79492008-06-13 19:13:39 +0000358#
359#
360#
361
362class _UpperCaser(multiprocessing.Process):
363
364 def __init__(self):
365 multiprocessing.Process.__init__(self)
366 self.child_conn, self.parent_conn = multiprocessing.Pipe()
367
368 def run(self):
369 self.parent_conn.close()
370 for s in iter(self.child_conn.recv, None):
371 self.child_conn.send(s.upper())
372 self.child_conn.close()
373
374 def submit(self, s):
375 assert type(s) is str
376 self.parent_conn.send(s)
377 return self.parent_conn.recv()
378
379 def stop(self):
380 self.parent_conn.send(None)
381 self.parent_conn.close()
382 self.child_conn.close()
383
384class _TestSubclassingProcess(BaseTestCase):
385
386 ALLOWED_TYPES = ('processes',)
387
388 def test_subclassing(self):
389 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200390 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000391 uppercaser.start()
392 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
393 self.assertEqual(uppercaser.submit('world'), 'WORLD')
394 uppercaser.stop()
395 uppercaser.join()
396
397#
398#
399#
400
401def queue_empty(q):
402 if hasattr(q, 'empty'):
403 return q.empty()
404 else:
405 return q.qsize() == 0
406
407def queue_full(q, maxsize):
408 if hasattr(q, 'full'):
409 return q.full()
410 else:
411 return q.qsize() == maxsize
412
413
414class _TestQueue(BaseTestCase):
415
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
420 for i in range(6):
421 queue.get()
422 parent_can_continue.set()
423
424 def test_put(self):
425 MAXSIZE = 6
426 queue = self.Queue(maxsize=MAXSIZE)
427 child_can_start = self.Event()
428 parent_can_continue = self.Event()
429
430 proc = self.Process(
431 target=self._test_put,
432 args=(queue, child_can_start, parent_can_continue)
433 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000434 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000435 proc.start()
436
437 self.assertEqual(queue_empty(queue), True)
438 self.assertEqual(queue_full(queue, MAXSIZE), False)
439
440 queue.put(1)
441 queue.put(2, True)
442 queue.put(3, True, None)
443 queue.put(4, False)
444 queue.put(5, False, None)
445 queue.put_nowait(6)
446
447 # the values may be in buffer but not yet in pipe so sleep a bit
448 time.sleep(DELTA)
449
450 self.assertEqual(queue_empty(queue), False)
451 self.assertEqual(queue_full(queue, MAXSIZE), True)
452
453 put = TimingWrapper(queue.put)
454 put_nowait = TimingWrapper(queue.put_nowait)
455
456 self.assertRaises(Queue.Full, put, 7, False)
457 self.assertTimingAlmostEqual(put.elapsed, 0)
458
459 self.assertRaises(Queue.Full, put, 7, False, None)
460 self.assertTimingAlmostEqual(put.elapsed, 0)
461
462 self.assertRaises(Queue.Full, put_nowait, 7)
463 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
464
465 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
466 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
467
468 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
469 self.assertTimingAlmostEqual(put.elapsed, 0)
470
471 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
472 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
473
474 child_can_start.set()
475 parent_can_continue.wait()
476
477 self.assertEqual(queue_empty(queue), True)
478 self.assertEqual(queue_full(queue, MAXSIZE), False)
479
480 proc.join()
481
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000482 @classmethod
483 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000484 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000485 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000486 queue.put(2)
487 queue.put(3)
488 queue.put(4)
489 queue.put(5)
490 parent_can_continue.set()
491
492 def test_get(self):
493 queue = self.Queue()
494 child_can_start = self.Event()
495 parent_can_continue = self.Event()
496
497 proc = self.Process(
498 target=self._test_get,
499 args=(queue, child_can_start, parent_can_continue)
500 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000501 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000502 proc.start()
503
504 self.assertEqual(queue_empty(queue), True)
505
506 child_can_start.set()
507 parent_can_continue.wait()
508
509 time.sleep(DELTA)
510 self.assertEqual(queue_empty(queue), False)
511
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000512 # Hangs unexpectedly, remove for now
513 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000514 self.assertEqual(queue.get(True, None), 2)
515 self.assertEqual(queue.get(True), 3)
516 self.assertEqual(queue.get(timeout=1), 4)
517 self.assertEqual(queue.get_nowait(), 5)
518
519 self.assertEqual(queue_empty(queue), True)
520
521 get = TimingWrapper(queue.get)
522 get_nowait = TimingWrapper(queue.get_nowait)
523
524 self.assertRaises(Queue.Empty, get, False)
525 self.assertTimingAlmostEqual(get.elapsed, 0)
526
527 self.assertRaises(Queue.Empty, get, False, None)
528 self.assertTimingAlmostEqual(get.elapsed, 0)
529
530 self.assertRaises(Queue.Empty, get_nowait)
531 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
532
533 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
534 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
535
536 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
537 self.assertTimingAlmostEqual(get.elapsed, 0)
538
539 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
540 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
541
542 proc.join()
543
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000544 @classmethod
545 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000546 for i in range(10, 20):
547 queue.put(i)
548 # note that at this point the items may only be buffered, so the
549 # process cannot shutdown until the feeder thread has finished
550 # pushing items onto the pipe.
551
552 def test_fork(self):
553 # Old versions of Queue would fail to create a new feeder
554 # thread for a forked process if the original process had its
555 # own feeder thread. This test checks that this no longer
556 # happens.
557
558 queue = self.Queue()
559
560 # put items on queue so that main process starts a feeder thread
561 for i in range(10):
562 queue.put(i)
563
564 # wait to make sure thread starts before we fork a new process
565 time.sleep(DELTA)
566
567 # fork process
568 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200569 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000570 p.start()
571
572 # check that all expected items are in the queue
573 for i in range(20):
574 self.assertEqual(queue.get(), i)
575 self.assertRaises(Queue.Empty, queue.get, False)
576
577 p.join()
578
579 def test_qsize(self):
580 q = self.Queue()
581 try:
582 self.assertEqual(q.qsize(), 0)
583 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600584 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000585 q.put(1)
586 self.assertEqual(q.qsize(), 1)
587 q.put(5)
588 self.assertEqual(q.qsize(), 2)
589 q.get()
590 self.assertEqual(q.qsize(), 1)
591 q.get()
592 self.assertEqual(q.qsize(), 0)
593
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000594 @classmethod
595 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000596 for obj in iter(q.get, None):
597 time.sleep(DELTA)
598 q.task_done()
599
600 def test_task_done(self):
601 queue = self.JoinableQueue()
602
603 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000604 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000605
606 workers = [self.Process(target=self._test_task_done, args=(queue,))
607 for i in xrange(4)]
608
609 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200610 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000611 p.start()
612
613 for i in xrange(10):
614 queue.put(i)
615
616 queue.join()
617
618 for p in workers:
619 queue.put(None)
620
621 for p in workers:
622 p.join()
623
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200624 def test_no_import_lock_contention(self):
625 with test_support.temp_cwd():
626 module_name = 'imported_by_an_imported_module'
627 with open(module_name + '.py', 'w') as f:
628 f.write("""if 1:
629 import multiprocessing
630
631 q = multiprocessing.Queue()
632 q.put('knock knock')
633 q.get(timeout=3)
634 q.close()
635 """)
636
637 with test_support.DirsOnSysPath(os.getcwd()):
638 try:
639 __import__(module_name)
640 except Queue.Empty:
641 self.fail("Probable regression on import lock contention;"
642 " see Issue #22853")
643
Antoine Pitroubdd96472017-05-25 17:53:04 +0200644 def test_queue_feeder_donot_stop_onexc(self):
645 # bpo-30414: verify feeder handles exceptions correctly
646 if self.TYPE != 'processes':
647 self.skipTest('test not appropriate for {}'.format(self.TYPE))
648
649 class NotSerializable(object):
650 def __reduce__(self):
651 raise AttributeError
652 with test.support.captured_stderr():
653 q = self.Queue()
654 q.put(NotSerializable())
655 q.put(True)
656 self.assertTrue(q.get(timeout=0.1))
657
658
Benjamin Petersondfd79492008-06-13 19:13:39 +0000659#
660#
661#
662
663class _TestLock(BaseTestCase):
664
665 def test_lock(self):
666 lock = self.Lock()
667 self.assertEqual(lock.acquire(), True)
668 self.assertEqual(lock.acquire(False), False)
669 self.assertEqual(lock.release(), None)
670 self.assertRaises((ValueError, threading.ThreadError), lock.release)
671
672 def test_rlock(self):
673 lock = self.RLock()
674 self.assertEqual(lock.acquire(), True)
675 self.assertEqual(lock.acquire(), True)
676 self.assertEqual(lock.acquire(), True)
677 self.assertEqual(lock.release(), None)
678 self.assertEqual(lock.release(), None)
679 self.assertEqual(lock.release(), None)
680 self.assertRaises((AssertionError, RuntimeError), lock.release)
681
Jesse Noller82eb5902009-03-30 23:29:31 +0000682 def test_lock_context(self):
683 with self.Lock():
684 pass
685
Benjamin Petersondfd79492008-06-13 19:13:39 +0000686
687class _TestSemaphore(BaseTestCase):
688
689 def _test_semaphore(self, sem):
690 self.assertReturnsIfImplemented(2, get_value, sem)
691 self.assertEqual(sem.acquire(), True)
692 self.assertReturnsIfImplemented(1, get_value, sem)
693 self.assertEqual(sem.acquire(), True)
694 self.assertReturnsIfImplemented(0, get_value, sem)
695 self.assertEqual(sem.acquire(False), False)
696 self.assertReturnsIfImplemented(0, get_value, sem)
697 self.assertEqual(sem.release(), None)
698 self.assertReturnsIfImplemented(1, get_value, sem)
699 self.assertEqual(sem.release(), None)
700 self.assertReturnsIfImplemented(2, get_value, sem)
701
702 def test_semaphore(self):
703 sem = self.Semaphore(2)
704 self._test_semaphore(sem)
705 self.assertEqual(sem.release(), None)
706 self.assertReturnsIfImplemented(3, get_value, sem)
707 self.assertEqual(sem.release(), None)
708 self.assertReturnsIfImplemented(4, get_value, sem)
709
710 def test_bounded_semaphore(self):
711 sem = self.BoundedSemaphore(2)
712 self._test_semaphore(sem)
713 # Currently fails on OS/X
714 #if HAVE_GETVALUE:
715 # self.assertRaises(ValueError, sem.release)
716 # self.assertReturnsIfImplemented(2, get_value, sem)
717
718 def test_timeout(self):
719 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600720 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000721
722 sem = self.Semaphore(0)
723 acquire = TimingWrapper(sem.acquire)
724
725 self.assertEqual(acquire(False), False)
726 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
727
728 self.assertEqual(acquire(False, None), False)
729 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
730
731 self.assertEqual(acquire(False, TIMEOUT1), False)
732 self.assertTimingAlmostEqual(acquire.elapsed, 0)
733
734 self.assertEqual(acquire(True, TIMEOUT2), False)
735 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
736
737 self.assertEqual(acquire(timeout=TIMEOUT3), False)
738 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
739
740
741class _TestCondition(BaseTestCase):
742
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000743 @classmethod
744 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000745 cond.acquire()
746 sleeping.release()
747 cond.wait(timeout)
748 woken.release()
749 cond.release()
750
751 def check_invariant(self, cond):
752 # this is only supposed to succeed when there are no sleepers
753 if self.TYPE == 'processes':
754 try:
755 sleepers = (cond._sleeping_count.get_value() -
756 cond._woken_count.get_value())
757 self.assertEqual(sleepers, 0)
758 self.assertEqual(cond._wait_semaphore.get_value(), 0)
759 except NotImplementedError:
760 pass
761
762 def test_notify(self):
763 cond = self.Condition()
764 sleeping = self.Semaphore(0)
765 woken = self.Semaphore(0)
766
767 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000768 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000769 p.start()
770
771 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000772 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000773 p.start()
774
775 # wait for both children to start sleeping
776 sleeping.acquire()
777 sleeping.acquire()
778
779 # check no process/thread has woken up
780 time.sleep(DELTA)
781 self.assertReturnsIfImplemented(0, get_value, woken)
782
783 # wake up one process/thread
784 cond.acquire()
785 cond.notify()
786 cond.release()
787
788 # check one process/thread has woken up
789 time.sleep(DELTA)
790 self.assertReturnsIfImplemented(1, get_value, woken)
791
792 # wake up another
793 cond.acquire()
794 cond.notify()
795 cond.release()
796
797 # check other has woken up
798 time.sleep(DELTA)
799 self.assertReturnsIfImplemented(2, get_value, woken)
800
801 # check state is not mucked up
802 self.check_invariant(cond)
803 p.join()
804
805 def test_notify_all(self):
806 cond = self.Condition()
807 sleeping = self.Semaphore(0)
808 woken = self.Semaphore(0)
809
810 # start some threads/processes which will timeout
811 for i in range(3):
812 p = self.Process(target=self.f,
813 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000814 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000815 p.start()
816
817 t = threading.Thread(target=self.f,
818 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000819 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000820 t.start()
821
822 # wait for them all to sleep
823 for i in xrange(6):
824 sleeping.acquire()
825
826 # check they have all timed out
827 for i in xrange(6):
828 woken.acquire()
829 self.assertReturnsIfImplemented(0, get_value, woken)
830
831 # check state is not mucked up
832 self.check_invariant(cond)
833
834 # start some more threads/processes
835 for i in range(3):
836 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000837 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000838 p.start()
839
840 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000841 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000842 t.start()
843
844 # wait for them to all sleep
845 for i in xrange(6):
846 sleeping.acquire()
847
848 # check no process/thread has woken up
849 time.sleep(DELTA)
850 self.assertReturnsIfImplemented(0, get_value, woken)
851
852 # wake them all up
853 cond.acquire()
854 cond.notify_all()
855 cond.release()
856
857 # check they have all woken
Victor Stinner9d1983b2017-05-15 17:32:14 +0200858 for i in range(10):
859 try:
860 if get_value(woken) == 6:
861 break
862 except NotImplementedError:
863 break
864 time.sleep(DELTA)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000865 self.assertReturnsIfImplemented(6, get_value, woken)
866
867 # check state is not mucked up
868 self.check_invariant(cond)
869
870 def test_timeout(self):
871 cond = self.Condition()
872 wait = TimingWrapper(cond.wait)
873 cond.acquire()
874 res = wait(TIMEOUT1)
875 cond.release()
876 self.assertEqual(res, None)
877 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
878
879
880class _TestEvent(BaseTestCase):
881
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000882 @classmethod
883 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000884 time.sleep(TIMEOUT2)
885 event.set()
886
887 def test_event(self):
888 event = self.Event()
889 wait = TimingWrapper(event.wait)
890
Ezio Melottic2077b02011-03-16 12:34:31 +0200891 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000892 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000893 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000894
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000895 # Removed, threading.Event.wait() will return the value of the __flag
896 # instead of None. API Shear with the semaphore backed mp.Event
897 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000898 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000899 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000900 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
901
902 event.set()
903
904 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000905 self.assertEqual(event.is_set(), True)
906 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000908 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000909 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
910 # self.assertEqual(event.is_set(), True)
911
912 event.clear()
913
914 #self.assertEqual(event.is_set(), False)
915
Jesus Cea6f6016b2011-09-09 20:26:57 +0200916 p = self.Process(target=self._test_event, args=(event,))
917 p.daemon = True
918 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000919 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000920
921#
922#
923#
924
925class _TestValue(BaseTestCase):
926
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000927 ALLOWED_TYPES = ('processes',)
928
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 codes_values = [
930 ('i', 4343, 24234),
931 ('d', 3.625, -4.25),
932 ('h', -232, 234),
933 ('c', latin('x'), latin('y'))
934 ]
935
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000936 def setUp(self):
937 if not HAS_SHAREDCTYPES:
938 self.skipTest("requires multiprocessing.sharedctypes")
939
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000940 @classmethod
941 def _test(cls, values):
942 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000943 sv.value = cv[2]
944
945
946 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000947 if raw:
948 values = [self.RawValue(code, value)
949 for code, value, _ in self.codes_values]
950 else:
951 values = [self.Value(code, value)
952 for code, value, _ in self.codes_values]
953
954 for sv, cv in zip(values, self.codes_values):
955 self.assertEqual(sv.value, cv[1])
956
957 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200958 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000959 proc.start()
960 proc.join()
961
962 for sv, cv in zip(values, self.codes_values):
963 self.assertEqual(sv.value, cv[2])
964
965 def test_rawvalue(self):
966 self.test_value(raw=True)
967
968 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000969 val1 = self.Value('i', 5)
970 lock1 = val1.get_lock()
971 obj1 = val1.get_obj()
972
973 val2 = self.Value('i', 5, lock=None)
974 lock2 = val2.get_lock()
975 obj2 = val2.get_obj()
976
977 lock = self.Lock()
978 val3 = self.Value('i', 5, lock=lock)
979 lock3 = val3.get_lock()
980 obj3 = val3.get_obj()
981 self.assertEqual(lock, lock3)
982
Jesse Noller6ab22152009-01-18 02:45:38 +0000983 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000984 self.assertFalse(hasattr(arr4, 'get_lock'))
985 self.assertFalse(hasattr(arr4, 'get_obj'))
986
Jesse Noller6ab22152009-01-18 02:45:38 +0000987 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
988
989 arr5 = self.RawValue('i', 5)
990 self.assertFalse(hasattr(arr5, 'get_lock'))
991 self.assertFalse(hasattr(arr5, 'get_obj'))
992
Benjamin Petersondfd79492008-06-13 19:13:39 +0000993
994class _TestArray(BaseTestCase):
995
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000996 ALLOWED_TYPES = ('processes',)
997
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000998 @classmethod
999 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001000 for i in range(1, len(seq)):
1001 seq[i] += seq[i-1]
1002
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001003 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001004 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001005 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1006 if raw:
1007 arr = self.RawArray('i', seq)
1008 else:
1009 arr = self.Array('i', seq)
1010
1011 self.assertEqual(len(arr), len(seq))
1012 self.assertEqual(arr[3], seq[3])
1013 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1014
1015 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1016
1017 self.assertEqual(list(arr[:]), seq)
1018
1019 self.f(seq)
1020
1021 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001022 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001023 p.start()
1024 p.join()
1025
1026 self.assertEqual(list(arr[:]), seq)
1027
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001028 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001029 def test_array_from_size(self):
1030 size = 10
1031 # Test for zeroing (see issue #11675).
1032 # The repetition below strengthens the test by increasing the chances
1033 # of previously allocated non-zero memory being used for the new array
1034 # on the 2nd and 3rd loops.
1035 for _ in range(3):
1036 arr = self.Array('i', size)
1037 self.assertEqual(len(arr), size)
1038 self.assertEqual(list(arr), [0] * size)
1039 arr[:] = range(10)
1040 self.assertEqual(list(arr), range(10))
1041 del arr
1042
1043 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001044 def test_rawarray(self):
1045 self.test_array(raw=True)
1046
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001047 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001048 def test_array_accepts_long(self):
1049 arr = self.Array('i', 10L)
1050 self.assertEqual(len(arr), 10)
1051 raw_arr = self.RawArray('i', 10L)
1052 self.assertEqual(len(raw_arr), 10)
1053
1054 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001055 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001056 arr1 = self.Array('i', range(10))
1057 lock1 = arr1.get_lock()
1058 obj1 = arr1.get_obj()
1059
1060 arr2 = self.Array('i', range(10), lock=None)
1061 lock2 = arr2.get_lock()
1062 obj2 = arr2.get_obj()
1063
1064 lock = self.Lock()
1065 arr3 = self.Array('i', range(10), lock=lock)
1066 lock3 = arr3.get_lock()
1067 obj3 = arr3.get_obj()
1068 self.assertEqual(lock, lock3)
1069
Jesse Noller6ab22152009-01-18 02:45:38 +00001070 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001071 self.assertFalse(hasattr(arr4, 'get_lock'))
1072 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001073 self.assertRaises(AttributeError,
1074 self.Array, 'i', range(10), lock='notalock')
1075
1076 arr5 = self.RawArray('i', range(10))
1077 self.assertFalse(hasattr(arr5, 'get_lock'))
1078 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001079
1080#
1081#
1082#
1083
1084class _TestContainers(BaseTestCase):
1085
1086 ALLOWED_TYPES = ('manager',)
1087
1088 def test_list(self):
1089 a = self.list(range(10))
1090 self.assertEqual(a[:], range(10))
1091
1092 b = self.list()
1093 self.assertEqual(b[:], [])
1094
1095 b.extend(range(5))
1096 self.assertEqual(b[:], range(5))
1097
1098 self.assertEqual(b[2], 2)
1099 self.assertEqual(b[2:10], [2,3,4])
1100
1101 b *= 2
1102 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1103
1104 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1105
1106 self.assertEqual(a[:], range(10))
1107
1108 d = [a, b]
1109 e = self.list(d)
1110 self.assertEqual(
1111 e[:],
1112 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1113 )
1114
1115 f = self.list([a])
1116 a.append('hello')
1117 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1118
1119 def test_dict(self):
1120 d = self.dict()
1121 indices = range(65, 70)
1122 for i in indices:
1123 d[i] = chr(i)
1124 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1125 self.assertEqual(sorted(d.keys()), indices)
1126 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1127 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1128
1129 def test_namespace(self):
1130 n = self.Namespace()
1131 n.name = 'Bob'
1132 n.job = 'Builder'
1133 n._hidden = 'hidden'
1134 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1135 del n.job
1136 self.assertEqual(str(n), "Namespace(name='Bob')")
1137 self.assertTrue(hasattr(n, 'name'))
1138 self.assertTrue(not hasattr(n, 'job'))
1139
1140#
1141#
1142#
1143
1144def sqr(x, wait=0.0):
1145 time.sleep(wait)
1146 return x*x
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001147
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001148def identity(x):
1149 return x
1150
1151class CountedObject(object):
1152 n_instances = 0
1153
1154 def __new__(cls):
1155 cls.n_instances += 1
1156 return object.__new__(cls)
1157
1158 def __del__(self):
1159 type(self).n_instances -= 1
1160
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001161class SayWhenError(ValueError): pass
1162
1163def exception_throwing_generator(total, when):
1164 for i in range(total):
1165 if i == when:
1166 raise SayWhenError("Somebody said when")
1167 yield i
1168
Benjamin Petersondfd79492008-06-13 19:13:39 +00001169class _TestPool(BaseTestCase):
1170
1171 def test_apply(self):
1172 papply = self.pool.apply
1173 self.assertEqual(papply(sqr, (5,)), sqr(5))
1174 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1175
1176 def test_map(self):
1177 pmap = self.pool.map
1178 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1179 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1180 map(sqr, range(100)))
1181
Richard Oudkerk21aad972013-10-28 23:02:22 +00001182 def test_map_unplicklable(self):
1183 # Issue #19425 -- failure to pickle should not cause a hang
1184 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001185 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001186 class A(object):
1187 def __reduce__(self):
1188 raise RuntimeError('cannot pickle')
1189 with self.assertRaises(RuntimeError):
1190 self.pool.map(sqr, [A()]*10)
1191
Jesse Noller7530e472009-07-16 14:23:04 +00001192 def test_map_chunksize(self):
1193 try:
1194 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1195 except multiprocessing.TimeoutError:
1196 self.fail("pool.map_async with chunksize stalled on null list")
1197
Benjamin Petersondfd79492008-06-13 19:13:39 +00001198 def test_async(self):
1199 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1200 get = TimingWrapper(res.get)
1201 self.assertEqual(get(), 49)
1202 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1203
1204 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001205 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001206 get = TimingWrapper(res.get)
1207 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1208 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1209
1210 def test_imap(self):
1211 it = self.pool.imap(sqr, range(10))
1212 self.assertEqual(list(it), map(sqr, range(10)))
1213
1214 it = self.pool.imap(sqr, range(10))
1215 for i in range(10):
1216 self.assertEqual(it.next(), i*i)
1217 self.assertRaises(StopIteration, it.next)
1218
1219 it = self.pool.imap(sqr, range(1000), chunksize=100)
1220 for i in range(1000):
1221 self.assertEqual(it.next(), i*i)
1222 self.assertRaises(StopIteration, it.next)
1223
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001224 def test_imap_handle_iterable_exception(self):
1225 if self.TYPE == 'manager':
1226 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1227
1228 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1229 for i in range(3):
1230 self.assertEqual(next(it), i*i)
1231 self.assertRaises(SayWhenError, it.next)
1232
1233 # SayWhenError seen at start of problematic chunk's results
1234 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1235 for i in range(6):
1236 self.assertEqual(next(it), i*i)
1237 self.assertRaises(SayWhenError, it.next)
1238 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1239 for i in range(4):
1240 self.assertEqual(next(it), i*i)
1241 self.assertRaises(SayWhenError, it.next)
1242
Benjamin Petersondfd79492008-06-13 19:13:39 +00001243 def test_imap_unordered(self):
1244 it = self.pool.imap_unordered(sqr, range(1000))
1245 self.assertEqual(sorted(it), map(sqr, range(1000)))
1246
1247 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1248 self.assertEqual(sorted(it), map(sqr, range(1000)))
1249
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001250 def test_imap_unordered_handle_iterable_exception(self):
1251 if self.TYPE == 'manager':
1252 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1253
1254 it = self.pool.imap_unordered(sqr,
1255 exception_throwing_generator(10, 3),
1256 1)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001257 expected_values = map(sqr, range(10))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001258 with self.assertRaises(SayWhenError):
1259 # imap_unordered makes it difficult to anticipate the SayWhenError
1260 for i in range(10):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001261 value = next(it)
1262 self.assertIn(value, expected_values)
1263 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001264
1265 it = self.pool.imap_unordered(sqr,
1266 exception_throwing_generator(20, 7),
1267 2)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001268 expected_values = map(sqr, range(20))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001269 with self.assertRaises(SayWhenError):
1270 for i in range(20):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001271 value = next(it)
1272 self.assertIn(value, expected_values)
1273 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001274
Benjamin Petersondfd79492008-06-13 19:13:39 +00001275 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001276 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1277 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1278
Benjamin Petersondfd79492008-06-13 19:13:39 +00001279 p = multiprocessing.Pool(3)
1280 self.assertEqual(3, len(p._pool))
1281 p.close()
1282 p.join()
1283
1284 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001285 p = self.Pool(4)
1286 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001287 time.sleep, [0.1 for i in range(10000)], chunksize=1
1288 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001289 p.terminate()
1290 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001291 join()
1292 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001293
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001294 def test_empty_iterable(self):
1295 # See Issue 12157
1296 p = self.Pool(1)
1297
1298 self.assertEqual(p.map(sqr, []), [])
1299 self.assertEqual(list(p.imap(sqr, [])), [])
1300 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1301 self.assertEqual(p.map_async(sqr, []).get(), [])
1302
1303 p.close()
1304 p.join()
1305
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001306 def test_release_task_refs(self):
1307 # Issue #29861: task arguments and results should not be kept
1308 # alive after we are done with them.
1309 objs = list(CountedObject() for i in range(10))
1310 refs = list(weakref.ref(o) for o in objs)
1311 self.pool.map(identity, objs)
1312
1313 del objs
Victor Stinnerfd6094c2017-05-05 09:47:11 +02001314 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001315 self.assertEqual(set(wr() for wr in refs), {None})
1316 # With a process pool, copies of the objects are returned, check
1317 # they were released too.
1318 self.assertEqual(CountedObject.n_instances, 0)
1319
1320
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001321def unpickleable_result():
1322 return lambda: 42
1323
1324class _TestPoolWorkerErrors(BaseTestCase):
1325 ALLOWED_TYPES = ('processes', )
1326
1327 def test_unpickleable_result(self):
1328 from multiprocessing.pool import MaybeEncodingError
1329 p = multiprocessing.Pool(2)
1330
1331 # Make sure we don't lose pool processes because of encoding errors.
1332 for iteration in range(20):
1333 res = p.apply_async(unpickleable_result)
1334 self.assertRaises(MaybeEncodingError, res.get)
1335
1336 p.close()
1337 p.join()
1338
Jesse Noller654ade32010-01-27 03:05:57 +00001339class _TestPoolWorkerLifetime(BaseTestCase):
1340
1341 ALLOWED_TYPES = ('processes', )
1342 def test_pool_worker_lifetime(self):
1343 p = multiprocessing.Pool(3, maxtasksperchild=10)
1344 self.assertEqual(3, len(p._pool))
1345 origworkerpids = [w.pid for w in p._pool]
1346 # Run many tasks so each worker gets replaced (hopefully)
1347 results = []
1348 for i in range(100):
1349 results.append(p.apply_async(sqr, (i, )))
1350 # Fetch the results and verify we got the right answers,
1351 # also ensuring all the tasks have completed.
1352 for (j, res) in enumerate(results):
1353 self.assertEqual(res.get(), sqr(j))
1354 # Refill the pool
1355 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001356 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001357 # (countdown * DELTA = 5 seconds max startup process time)
1358 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001359 while countdown and not all(w.is_alive() for w in p._pool):
1360 countdown -= 1
1361 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001362 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001363 # All pids should be assigned. See issue #7805.
1364 self.assertNotIn(None, origworkerpids)
1365 self.assertNotIn(None, finalworkerpids)
1366 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001367 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1368 p.close()
1369 p.join()
1370
Charles-François Natali46f990e2011-10-24 18:43:51 +02001371 def test_pool_worker_lifetime_early_close(self):
1372 # Issue #10332: closing a pool whose workers have limited lifetimes
1373 # before all the tasks completed would make join() hang.
1374 p = multiprocessing.Pool(3, maxtasksperchild=1)
1375 results = []
1376 for i in range(6):
1377 results.append(p.apply_async(sqr, (i, 0.3)))
1378 p.close()
1379 p.join()
1380 # check the results
1381 for (j, res) in enumerate(results):
1382 self.assertEqual(res.get(), sqr(j))
1383
1384
Benjamin Petersondfd79492008-06-13 19:13:39 +00001385#
1386# Test that manager has expected number of shared objects left
1387#
1388
1389class _TestZZZNumberOfObjects(BaseTestCase):
1390 # Because test cases are sorted alphabetically, this one will get
1391 # run after all the other tests for the manager. It tests that
1392 # there have been no "reference leaks" for the manager's shared
1393 # objects. Note the comment in _TestPool.test_terminate().
1394 ALLOWED_TYPES = ('manager',)
1395
1396 def test_number_of_objects(self):
1397 EXPECTED_NUMBER = 1 # the pool object is still alive
1398 multiprocessing.active_children() # discard dead process objs
1399 gc.collect() # do garbage collection
1400 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001401 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001402 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001403 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001404 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001405
1406 self.assertEqual(refs, EXPECTED_NUMBER)
1407
1408#
1409# Test of creating a customized manager class
1410#
1411
1412from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1413
1414class FooBar(object):
1415 def f(self):
1416 return 'f()'
1417 def g(self):
1418 raise ValueError
1419 def _h(self):
1420 return '_h()'
1421
1422def baz():
1423 for i in xrange(10):
1424 yield i*i
1425
1426class IteratorProxy(BaseProxy):
1427 _exposed_ = ('next', '__next__')
1428 def __iter__(self):
1429 return self
1430 def next(self):
1431 return self._callmethod('next')
1432 def __next__(self):
1433 return self._callmethod('__next__')
1434
1435class MyManager(BaseManager):
1436 pass
1437
1438MyManager.register('Foo', callable=FooBar)
1439MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1440MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1441
1442
1443class _TestMyManager(BaseTestCase):
1444
1445 ALLOWED_TYPES = ('manager',)
1446
1447 def test_mymanager(self):
1448 manager = MyManager()
1449 manager.start()
1450
1451 foo = manager.Foo()
1452 bar = manager.Bar()
1453 baz = manager.baz()
1454
1455 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1456 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1457
1458 self.assertEqual(foo_methods, ['f', 'g'])
1459 self.assertEqual(bar_methods, ['f', '_h'])
1460
1461 self.assertEqual(foo.f(), 'f()')
1462 self.assertRaises(ValueError, foo.g)
1463 self.assertEqual(foo._callmethod('f'), 'f()')
1464 self.assertRaises(RemoteError, foo._callmethod, '_h')
1465
1466 self.assertEqual(bar.f(), 'f()')
1467 self.assertEqual(bar._h(), '_h()')
1468 self.assertEqual(bar._callmethod('f'), 'f()')
1469 self.assertEqual(bar._callmethod('_h'), '_h()')
1470
1471 self.assertEqual(list(baz), [i*i for i in range(10)])
1472
1473 manager.shutdown()
1474
1475#
1476# Test of connecting to a remote server and using xmlrpclib for serialization
1477#
1478
1479_queue = Queue.Queue()
1480def get_queue():
1481 return _queue
1482
1483class QueueManager(BaseManager):
1484 '''manager class used by server process'''
1485QueueManager.register('get_queue', callable=get_queue)
1486
1487class QueueManager2(BaseManager):
1488 '''manager class which specifies the same interface as QueueManager'''
1489QueueManager2.register('get_queue')
1490
1491
1492SERIALIZER = 'xmlrpclib'
1493
1494class _TestRemoteManager(BaseTestCase):
1495
1496 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001497 values = ['hello world', None, True, 2.25,
1498 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1499 ]
1500 result = values[:]
1501 if test_support.have_unicode:
1502 #result[-1] = u'hall\xe5 v\xe4rlden'
1503 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1504 r'\u0441\u0432\u0456\u0442')
1505 values.append(uvalue)
1506 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001507
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001508 @classmethod
1509 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001510 manager = QueueManager2(
1511 address=address, authkey=authkey, serializer=SERIALIZER
1512 )
1513 manager.connect()
1514 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001515 # Note that xmlrpclib will deserialize object as a list not a tuple
1516 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001517
1518 def test_remote(self):
1519 authkey = os.urandom(32)
1520
1521 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001522 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001523 )
1524 manager.start()
1525
1526 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001527 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001528 p.start()
1529
1530 manager2 = QueueManager2(
1531 address=manager.address, authkey=authkey, serializer=SERIALIZER
1532 )
1533 manager2.connect()
1534 queue = manager2.get_queue()
1535
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001536 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001537
1538 # Because we are using xmlrpclib for serialization instead of
1539 # pickle this will cause a serialization error.
1540 self.assertRaises(Exception, queue.put, time.sleep)
1541
1542 # Make queue finalizer run before the server is stopped
1543 del queue
1544 manager.shutdown()
1545
Jesse Noller459a6482009-03-30 15:50:42 +00001546class _TestManagerRestart(BaseTestCase):
1547
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001548 @classmethod
1549 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001550 manager = QueueManager(
1551 address=address, authkey=authkey, serializer=SERIALIZER)
1552 manager.connect()
1553 queue = manager.get_queue()
1554 queue.put('hello world')
1555
1556 def test_rapid_restart(self):
1557 authkey = os.urandom(32)
1558 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001559 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001560 srvr = manager.get_server()
1561 addr = srvr.address
1562 # Close the connection.Listener socket which gets opened as a part
1563 # of manager.get_server(). It's not needed for the test.
1564 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001565 manager.start()
1566
1567 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001568 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001569 p.start()
1570 queue = manager.get_queue()
1571 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001572 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001573 manager.shutdown()
1574 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001575 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001576 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001577 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001578
Benjamin Petersondfd79492008-06-13 19:13:39 +00001579#
1580#
1581#
1582
1583SENTINEL = latin('')
1584
1585class _TestConnection(BaseTestCase):
1586
1587 ALLOWED_TYPES = ('processes', 'threads')
1588
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001589 @classmethod
1590 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001591 for msg in iter(conn.recv_bytes, SENTINEL):
1592 conn.send_bytes(msg)
1593 conn.close()
1594
1595 def test_connection(self):
1596 conn, child_conn = self.Pipe()
1597
1598 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001599 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001600 p.start()
1601
1602 seq = [1, 2.25, None]
1603 msg = latin('hello world')
1604 longmsg = msg * 10
1605 arr = array.array('i', range(4))
1606
1607 if self.TYPE == 'processes':
1608 self.assertEqual(type(conn.fileno()), int)
1609
1610 self.assertEqual(conn.send(seq), None)
1611 self.assertEqual(conn.recv(), seq)
1612
1613 self.assertEqual(conn.send_bytes(msg), None)
1614 self.assertEqual(conn.recv_bytes(), msg)
1615
1616 if self.TYPE == 'processes':
1617 buffer = array.array('i', [0]*10)
1618 expected = list(arr) + [0] * (10 - len(arr))
1619 self.assertEqual(conn.send_bytes(arr), None)
1620 self.assertEqual(conn.recv_bytes_into(buffer),
1621 len(arr) * buffer.itemsize)
1622 self.assertEqual(list(buffer), expected)
1623
1624 buffer = array.array('i', [0]*10)
1625 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1626 self.assertEqual(conn.send_bytes(arr), None)
1627 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1628 len(arr) * buffer.itemsize)
1629 self.assertEqual(list(buffer), expected)
1630
1631 buffer = bytearray(latin(' ' * 40))
1632 self.assertEqual(conn.send_bytes(longmsg), None)
1633 try:
1634 res = conn.recv_bytes_into(buffer)
1635 except multiprocessing.BufferTooShort, e:
1636 self.assertEqual(e.args, (longmsg,))
1637 else:
1638 self.fail('expected BufferTooShort, got %s' % res)
1639
1640 poll = TimingWrapper(conn.poll)
1641
1642 self.assertEqual(poll(), False)
1643 self.assertTimingAlmostEqual(poll.elapsed, 0)
1644
1645 self.assertEqual(poll(TIMEOUT1), False)
1646 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1647
1648 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001649 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001650
1651 self.assertEqual(poll(TIMEOUT1), True)
1652 self.assertTimingAlmostEqual(poll.elapsed, 0)
1653
1654 self.assertEqual(conn.recv(), None)
1655
1656 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1657 conn.send_bytes(really_big_msg)
1658 self.assertEqual(conn.recv_bytes(), really_big_msg)
1659
1660 conn.send_bytes(SENTINEL) # tell child to quit
1661 child_conn.close()
1662
1663 if self.TYPE == 'processes':
1664 self.assertEqual(conn.readable, True)
1665 self.assertEqual(conn.writable, True)
1666 self.assertRaises(EOFError, conn.recv)
1667 self.assertRaises(EOFError, conn.recv_bytes)
1668
1669 p.join()
1670
1671 def test_duplex_false(self):
1672 reader, writer = self.Pipe(duplex=False)
1673 self.assertEqual(writer.send(1), None)
1674 self.assertEqual(reader.recv(), 1)
1675 if self.TYPE == 'processes':
1676 self.assertEqual(reader.readable, True)
1677 self.assertEqual(reader.writable, False)
1678 self.assertEqual(writer.readable, False)
1679 self.assertEqual(writer.writable, True)
1680 self.assertRaises(IOError, reader.send, 2)
1681 self.assertRaises(IOError, writer.recv)
1682 self.assertRaises(IOError, writer.poll)
1683
1684 def test_spawn_close(self):
1685 # We test that a pipe connection can be closed by parent
1686 # process immediately after child is spawned. On Windows this
1687 # would have sometimes failed on old versions because
1688 # child_conn would be closed before the child got a chance to
1689 # duplicate it.
1690 conn, child_conn = self.Pipe()
1691
1692 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001693 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001694 p.start()
1695 child_conn.close() # this might complete before child initializes
1696
1697 msg = latin('hello')
1698 conn.send_bytes(msg)
1699 self.assertEqual(conn.recv_bytes(), msg)
1700
1701 conn.send_bytes(SENTINEL)
1702 conn.close()
1703 p.join()
1704
1705 def test_sendbytes(self):
1706 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001707 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001708
1709 msg = latin('abcdefghijklmnopqrstuvwxyz')
1710 a, b = self.Pipe()
1711
1712 a.send_bytes(msg)
1713 self.assertEqual(b.recv_bytes(), msg)
1714
1715 a.send_bytes(msg, 5)
1716 self.assertEqual(b.recv_bytes(), msg[5:])
1717
1718 a.send_bytes(msg, 7, 8)
1719 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1720
1721 a.send_bytes(msg, 26)
1722 self.assertEqual(b.recv_bytes(), latin(''))
1723
1724 a.send_bytes(msg, 26, 0)
1725 self.assertEqual(b.recv_bytes(), latin(''))
1726
1727 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1728
1729 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1730
1731 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1732
1733 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1734
1735 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1736
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001737 @classmethod
1738 def _is_fd_assigned(cls, fd):
1739 try:
1740 os.fstat(fd)
1741 except OSError as e:
1742 if e.errno == errno.EBADF:
1743 return False
1744 raise
1745 else:
1746 return True
1747
1748 @classmethod
1749 def _writefd(cls, conn, data, create_dummy_fds=False):
1750 if create_dummy_fds:
1751 for i in range(0, 256):
1752 if not cls._is_fd_assigned(i):
1753 os.dup2(conn.fileno(), i)
1754 fd = reduction.recv_handle(conn)
1755 if msvcrt:
1756 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1757 os.write(fd, data)
1758 os.close(fd)
1759
Charles-François Natalif8413b22011-09-21 18:44:49 +02001760 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001761 def test_fd_transfer(self):
1762 if self.TYPE != 'processes':
1763 self.skipTest("only makes sense with processes")
1764 conn, child_conn = self.Pipe(duplex=True)
1765
1766 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001767 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001768 p.start()
1769 with open(test_support.TESTFN, "wb") as f:
1770 fd = f.fileno()
1771 if msvcrt:
1772 fd = msvcrt.get_osfhandle(fd)
1773 reduction.send_handle(conn, fd, p.pid)
1774 p.join()
1775 with open(test_support.TESTFN, "rb") as f:
1776 self.assertEqual(f.read(), b"foo")
1777
Charles-François Natalif8413b22011-09-21 18:44:49 +02001778 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001779 @unittest.skipIf(sys.platform == "win32",
1780 "test semantics don't make sense on Windows")
1781 @unittest.skipIf(MAXFD <= 256,
1782 "largest assignable fd number is too small")
1783 @unittest.skipUnless(hasattr(os, "dup2"),
1784 "test needs os.dup2()")
1785 def test_large_fd_transfer(self):
1786 # With fd > 256 (issue #11657)
1787 if self.TYPE != 'processes':
1788 self.skipTest("only makes sense with processes")
1789 conn, child_conn = self.Pipe(duplex=True)
1790
1791 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001792 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001793 p.start()
1794 with open(test_support.TESTFN, "wb") as f:
1795 fd = f.fileno()
1796 for newfd in range(256, MAXFD):
1797 if not self._is_fd_assigned(newfd):
1798 break
1799 else:
1800 self.fail("could not find an unassigned large file descriptor")
1801 os.dup2(fd, newfd)
1802 try:
1803 reduction.send_handle(conn, newfd, p.pid)
1804 finally:
1805 os.close(newfd)
1806 p.join()
1807 with open(test_support.TESTFN, "rb") as f:
1808 self.assertEqual(f.read(), b"bar")
1809
Jesus Ceac23484b2011-09-21 03:47:39 +02001810 @classmethod
1811 def _send_data_without_fd(self, conn):
1812 os.write(conn.fileno(), b"\0")
1813
Charles-François Natalif8413b22011-09-21 18:44:49 +02001814 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001815 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1816 def test_missing_fd_transfer(self):
1817 # Check that exception is raised when received data is not
1818 # accompanied by a file descriptor in ancillary data.
1819 if self.TYPE != 'processes':
1820 self.skipTest("only makes sense with processes")
1821 conn, child_conn = self.Pipe(duplex=True)
1822
1823 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1824 p.daemon = True
1825 p.start()
1826 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1827 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001828
Benjamin Petersondfd79492008-06-13 19:13:39 +00001829class _TestListenerClient(BaseTestCase):
1830
1831 ALLOWED_TYPES = ('processes', 'threads')
1832
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001833 @classmethod
1834 def _test(cls, address):
1835 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001836 conn.send('hello')
1837 conn.close()
1838
1839 def test_listener_client(self):
1840 for family in self.connection.families:
1841 l = self.connection.Listener(family=family)
1842 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001843 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001844 p.start()
1845 conn = l.accept()
1846 self.assertEqual(conn.recv(), 'hello')
1847 p.join()
1848 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001849
1850 def test_issue14725(self):
1851 l = self.connection.Listener()
1852 p = self.Process(target=self._test, args=(l.address,))
1853 p.daemon = True
1854 p.start()
1855 time.sleep(1)
1856 # On Windows the client process should by now have connected,
1857 # written data and closed the pipe handle by now. This causes
1858 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1859 # 14725.
1860 conn = l.accept()
1861 self.assertEqual(conn.recv(), 'hello')
1862 conn.close()
1863 p.join()
1864 l.close()
1865
Benjamin Petersondfd79492008-06-13 19:13:39 +00001866#
1867# Test of sending connection and socket objects between processes
1868#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001869"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001870class _TestPicklingConnections(BaseTestCase):
1871
1872 ALLOWED_TYPES = ('processes',)
1873
1874 def _listener(self, conn, families):
1875 for fam in families:
1876 l = self.connection.Listener(family=fam)
1877 conn.send(l.address)
1878 new_conn = l.accept()
1879 conn.send(new_conn)
1880
1881 if self.TYPE == 'processes':
1882 l = socket.socket()
1883 l.bind(('localhost', 0))
1884 conn.send(l.getsockname())
1885 l.listen(1)
1886 new_conn, addr = l.accept()
1887 conn.send(new_conn)
1888
1889 conn.recv()
1890
1891 def _remote(self, conn):
1892 for (address, msg) in iter(conn.recv, None):
1893 client = self.connection.Client(address)
1894 client.send(msg.upper())
1895 client.close()
1896
1897 if self.TYPE == 'processes':
1898 address, msg = conn.recv()
1899 client = socket.socket()
1900 client.connect(address)
1901 client.sendall(msg.upper())
1902 client.close()
1903
1904 conn.close()
1905
1906 def test_pickling(self):
1907 try:
1908 multiprocessing.allow_connection_pickling()
1909 except ImportError:
1910 return
1911
1912 families = self.connection.families
1913
1914 lconn, lconn0 = self.Pipe()
1915 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001916 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001917 lp.start()
1918 lconn0.close()
1919
1920 rconn, rconn0 = self.Pipe()
1921 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001922 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001923 rp.start()
1924 rconn0.close()
1925
1926 for fam in families:
1927 msg = ('This connection uses family %s' % fam).encode('ascii')
1928 address = lconn.recv()
1929 rconn.send((address, msg))
1930 new_conn = lconn.recv()
1931 self.assertEqual(new_conn.recv(), msg.upper())
1932
1933 rconn.send(None)
1934
1935 if self.TYPE == 'processes':
1936 msg = latin('This connection uses a normal socket')
1937 address = lconn.recv()
1938 rconn.send((address, msg))
1939 if hasattr(socket, 'fromfd'):
1940 new_conn = lconn.recv()
1941 self.assertEqual(new_conn.recv(100), msg.upper())
1942 else:
1943 # XXX On Windows with Py2.6 need to backport fromfd()
1944 discard = lconn.recv_bytes()
1945
1946 lconn.send(None)
1947
1948 rconn.close()
1949 lconn.close()
1950
1951 lp.join()
1952 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001953"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001954#
1955#
1956#
1957
1958class _TestHeap(BaseTestCase):
1959
1960 ALLOWED_TYPES = ('processes',)
1961
1962 def test_heap(self):
1963 iterations = 5000
1964 maxblocks = 50
1965 blocks = []
1966
1967 # create and destroy lots of blocks of different sizes
1968 for i in xrange(iterations):
1969 size = int(random.lognormvariate(0, 1) * 1000)
1970 b = multiprocessing.heap.BufferWrapper(size)
1971 blocks.append(b)
1972 if len(blocks) > maxblocks:
1973 i = random.randrange(maxblocks)
1974 del blocks[i]
1975
1976 # get the heap object
1977 heap = multiprocessing.heap.BufferWrapper._heap
1978
1979 # verify the state of the heap
1980 all = []
1981 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001982 heap._lock.acquire()
1983 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001984 for L in heap._len_to_seq.values():
1985 for arena, start, stop in L:
1986 all.append((heap._arenas.index(arena), start, stop,
1987 stop-start, 'free'))
1988 for arena, start, stop in heap._allocated_blocks:
1989 all.append((heap._arenas.index(arena), start, stop,
1990 stop-start, 'occupied'))
1991 occupied += (stop-start)
1992
1993 all.sort()
1994
1995 for i in range(len(all)-1):
1996 (arena, start, stop) = all[i][:3]
1997 (narena, nstart, nstop) = all[i+1][:3]
1998 self.assertTrue((arena != narena and nstart == 0) or
1999 (stop == nstart))
2000
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002001 def test_free_from_gc(self):
2002 # Check that freeing of blocks by the garbage collector doesn't deadlock
2003 # (issue #12352).
2004 # Make sure the GC is enabled, and set lower collection thresholds to
2005 # make collections more frequent (and increase the probability of
2006 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02002007 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002008 gc.enable()
2009 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02002010 thresholds = gc.get_threshold()
2011 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002012 gc.set_threshold(10)
2013
2014 # perform numerous block allocations, with cyclic references to make
2015 # sure objects are collected asynchronously by the gc
2016 for i in range(5000):
2017 a = multiprocessing.heap.BufferWrapper(1)
2018 b = multiprocessing.heap.BufferWrapper(1)
2019 # circular references
2020 a.buddy = b
2021 b.buddy = a
2022
Benjamin Petersondfd79492008-06-13 19:13:39 +00002023#
2024#
2025#
2026
Benjamin Petersondfd79492008-06-13 19:13:39 +00002027class _Foo(Structure):
2028 _fields_ = [
2029 ('x', c_int),
2030 ('y', c_double)
2031 ]
2032
2033class _TestSharedCTypes(BaseTestCase):
2034
2035 ALLOWED_TYPES = ('processes',)
2036
Antoine Pitrou55d935a2010-11-22 16:35:57 +00002037 def setUp(self):
2038 if not HAS_SHAREDCTYPES:
2039 self.skipTest("requires multiprocessing.sharedctypes")
2040
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002041 @classmethod
2042 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002043 x.value *= 2
2044 y.value *= 2
2045 foo.x *= 2
2046 foo.y *= 2
2047 string.value *= 2
2048 for i in range(len(arr)):
2049 arr[i] *= 2
2050
2051 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002052 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002053 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002054 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002055 arr = self.Array('d', range(10), lock=lock)
2056 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00002057 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002058
2059 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002060 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002061 p.start()
2062 p.join()
2063
2064 self.assertEqual(x.value, 14)
2065 self.assertAlmostEqual(y.value, 2.0/3.0)
2066 self.assertEqual(foo.x, 6)
2067 self.assertAlmostEqual(foo.y, 4.0)
2068 for i in range(10):
2069 self.assertAlmostEqual(arr[i], i*2)
2070 self.assertEqual(string.value, latin('hellohello'))
2071
2072 def test_synchronize(self):
2073 self.test_sharedctypes(lock=True)
2074
2075 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002076 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00002077 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002078 foo.x = 0
2079 foo.y = 0
2080 self.assertEqual(bar.x, 2)
2081 self.assertAlmostEqual(bar.y, 5.0)
2082
2083#
2084#
2085#
2086
2087class _TestFinalize(BaseTestCase):
2088
2089 ALLOWED_TYPES = ('processes',)
2090
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002091 @classmethod
2092 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002093 class Foo(object):
2094 pass
2095
2096 a = Foo()
2097 util.Finalize(a, conn.send, args=('a',))
2098 del a # triggers callback for a
2099
2100 b = Foo()
2101 close_b = util.Finalize(b, conn.send, args=('b',))
2102 close_b() # triggers callback for b
2103 close_b() # does nothing because callback has already been called
2104 del b # does nothing because callback has already been called
2105
2106 c = Foo()
2107 util.Finalize(c, conn.send, args=('c',))
2108
2109 d10 = Foo()
2110 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2111
2112 d01 = Foo()
2113 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2114 d02 = Foo()
2115 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2116 d03 = Foo()
2117 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2118
2119 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2120
2121 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2122
Ezio Melottic2077b02011-03-16 12:34:31 +02002123 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002124 # garbage collecting locals
2125 util._exit_function()
2126 conn.close()
2127 os._exit(0)
2128
2129 def test_finalize(self):
2130 conn, child_conn = self.Pipe()
2131
2132 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002133 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002134 p.start()
2135 p.join()
2136
2137 result = [obj for obj in iter(conn.recv, 'STOP')]
2138 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2139
2140#
2141# Test that from ... import * works for each module
2142#
2143
2144class _TestImportStar(BaseTestCase):
2145
2146 ALLOWED_TYPES = ('processes',)
2147
2148 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002149 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002150 'multiprocessing', 'multiprocessing.connection',
2151 'multiprocessing.heap', 'multiprocessing.managers',
2152 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002153 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002154 ]
2155
Charles-François Natalif8413b22011-09-21 18:44:49 +02002156 if HAS_REDUCTION:
2157 modules.append('multiprocessing.reduction')
2158
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002159 if c_int is not None:
2160 # This module requires _ctypes
2161 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002162
2163 for name in modules:
2164 __import__(name)
2165 mod = sys.modules[name]
2166
2167 for attr in getattr(mod, '__all__', ()):
2168 self.assertTrue(
2169 hasattr(mod, attr),
2170 '%r does not have attribute %r' % (mod, attr)
2171 )
2172
2173#
2174# Quick test that logging works -- does not test logging output
2175#
2176
2177class _TestLogging(BaseTestCase):
2178
2179 ALLOWED_TYPES = ('processes',)
2180
2181 def test_enable_logging(self):
2182 logger = multiprocessing.get_logger()
2183 logger.setLevel(util.SUBWARNING)
2184 self.assertTrue(logger is not None)
2185 logger.debug('this will not be printed')
2186 logger.info('nor will this')
2187 logger.setLevel(LOG_LEVEL)
2188
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002189 @classmethod
2190 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002191 logger = multiprocessing.get_logger()
2192 conn.send(logger.getEffectiveLevel())
2193
2194 def test_level(self):
2195 LEVEL1 = 32
2196 LEVEL2 = 37
2197
2198 logger = multiprocessing.get_logger()
2199 root_logger = logging.getLogger()
2200 root_level = root_logger.level
2201
2202 reader, writer = multiprocessing.Pipe(duplex=False)
2203
2204 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002205 p = self.Process(target=self._test_level, args=(writer,))
2206 p.daemon = True
2207 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002208 self.assertEqual(LEVEL1, reader.recv())
2209
2210 logger.setLevel(logging.NOTSET)
2211 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002212 p = self.Process(target=self._test_level, args=(writer,))
2213 p.daemon = True
2214 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002215 self.assertEqual(LEVEL2, reader.recv())
2216
2217 root_logger.setLevel(root_level)
2218 logger.setLevel(level=LOG_LEVEL)
2219
Jesse Noller814d02d2009-11-21 14:38:23 +00002220
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002221# class _TestLoggingProcessName(BaseTestCase):
2222#
2223# def handle(self, record):
2224# assert record.processName == multiprocessing.current_process().name
2225# self.__handled = True
2226#
2227# def test_logging(self):
2228# handler = logging.Handler()
2229# handler.handle = self.handle
2230# self.__handled = False
2231# # Bypass getLogger() and side-effects
2232# logger = logging.getLoggerClass()(
2233# 'multiprocessing.test.TestLoggingProcessName')
2234# logger.addHandler(handler)
2235# logger.propagate = False
2236#
2237# logger.warn('foo')
2238# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002239
Benjamin Petersondfd79492008-06-13 19:13:39 +00002240#
Richard Oudkerkba482642013-02-26 12:37:07 +00002241# Check that Process.join() retries if os.waitpid() fails with EINTR
2242#
2243
2244class _TestPollEintr(BaseTestCase):
2245
2246 ALLOWED_TYPES = ('processes',)
2247
2248 @classmethod
2249 def _killer(cls, pid):
2250 time.sleep(0.5)
2251 os.kill(pid, signal.SIGUSR1)
2252
2253 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2254 def test_poll_eintr(self):
2255 got_signal = [False]
2256 def record(*args):
2257 got_signal[0] = True
2258 pid = os.getpid()
2259 oldhandler = signal.signal(signal.SIGUSR1, record)
2260 try:
2261 killer = self.Process(target=self._killer, args=(pid,))
2262 killer.start()
2263 p = self.Process(target=time.sleep, args=(1,))
2264 p.start()
2265 p.join()
2266 self.assertTrue(got_signal[0])
2267 self.assertEqual(p.exitcode, 0)
2268 killer.join()
2269 finally:
2270 signal.signal(signal.SIGUSR1, oldhandler)
2271
2272#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002273# Test to verify handle verification, see issue 3321
2274#
2275
2276class TestInvalidHandle(unittest.TestCase):
2277
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002278 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002279 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002280 conn = _multiprocessing.Connection(44977608)
2281 self.assertRaises(IOError, conn.poll)
2282 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002283
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002284#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002285# Functions used to create test cases from the base ones in this module
2286#
2287
2288def get_attributes(Source, names):
2289 d = {}
2290 for name in names:
2291 obj = getattr(Source, name)
2292 if type(obj) == type(get_attributes):
2293 obj = staticmethod(obj)
2294 d[name] = obj
2295 return d
2296
2297def create_test_cases(Mixin, type):
2298 result = {}
2299 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002300 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002301
2302 for name in glob.keys():
2303 if name.startswith('_Test'):
2304 base = glob[name]
2305 if type in base.ALLOWED_TYPES:
2306 newname = 'With' + Type + name[1:]
2307 class Temp(base, unittest.TestCase, Mixin):
2308 pass
2309 result[newname] = Temp
2310 Temp.__name__ = newname
2311 Temp.__module__ = Mixin.__module__
2312 return result
2313
2314#
2315# Create test cases
2316#
2317
2318class ProcessesMixin(object):
2319 TYPE = 'processes'
2320 Process = multiprocessing.Process
2321 locals().update(get_attributes(multiprocessing, (
2322 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2323 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2324 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002325 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002326 )))
2327
2328testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2329globals().update(testcases_processes)
2330
2331
2332class ManagerMixin(object):
2333 TYPE = 'manager'
2334 Process = multiprocessing.Process
2335 manager = object.__new__(multiprocessing.managers.SyncManager)
2336 locals().update(get_attributes(manager, (
2337 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2338 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002339 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002340 )))
2341
2342testcases_manager = create_test_cases(ManagerMixin, type='manager')
2343globals().update(testcases_manager)
2344
2345
2346class ThreadsMixin(object):
2347 TYPE = 'threads'
2348 Process = multiprocessing.dummy.Process
2349 locals().update(get_attributes(multiprocessing.dummy, (
2350 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2351 'Condition', 'Event', 'Value', 'Array', 'current_process',
2352 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002353 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002354 )))
2355
2356testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2357globals().update(testcases_threads)
2358
Neal Norwitz0c519b32008-08-25 01:50:24 +00002359class OtherTest(unittest.TestCase):
2360 # TODO: add more tests for deliver/answer challenge.
2361 def test_deliver_challenge_auth_failure(self):
2362 class _FakeConnection(object):
2363 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002364 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002365 def send_bytes(self, data):
2366 pass
2367 self.assertRaises(multiprocessing.AuthenticationError,
2368 multiprocessing.connection.deliver_challenge,
2369 _FakeConnection(), b'abc')
2370
2371 def test_answer_challenge_auth_failure(self):
2372 class _FakeConnection(object):
2373 def __init__(self):
2374 self.count = 0
2375 def recv_bytes(self, size):
2376 self.count += 1
2377 if self.count == 1:
2378 return multiprocessing.connection.CHALLENGE
2379 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002380 return b'something bogus'
2381 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002382 def send_bytes(self, data):
2383 pass
2384 self.assertRaises(multiprocessing.AuthenticationError,
2385 multiprocessing.connection.answer_challenge,
2386 _FakeConnection(), b'abc')
2387
Jesse Noller7152f6d2009-04-02 05:17:26 +00002388#
2389# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2390#
2391
2392def initializer(ns):
2393 ns.test += 1
2394
2395class TestInitializers(unittest.TestCase):
2396 def setUp(self):
2397 self.mgr = multiprocessing.Manager()
2398 self.ns = self.mgr.Namespace()
2399 self.ns.test = 0
2400
2401 def tearDown(self):
2402 self.mgr.shutdown()
2403
2404 def test_manager_initializer(self):
2405 m = multiprocessing.managers.SyncManager()
2406 self.assertRaises(TypeError, m.start, 1)
2407 m.start(initializer, (self.ns,))
2408 self.assertEqual(self.ns.test, 1)
2409 m.shutdown()
2410
2411 def test_pool_initializer(self):
2412 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2413 p = multiprocessing.Pool(1, initializer, (self.ns,))
2414 p.close()
2415 p.join()
2416 self.assertEqual(self.ns.test, 1)
2417
Jesse Noller1b90efb2009-06-30 17:11:52 +00002418#
2419# Issue 5155, 5313, 5331: Test process in processes
2420# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2421#
2422
Richard Oudkerkc5496072013-09-29 17:10:40 +01002423def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002424 try:
2425 item = q.get(block=False)
2426 except Queue.Empty:
2427 pass
2428
Richard Oudkerkc5496072013-09-29 17:10:40 +01002429def _test_process(q):
2430 queue = multiprocessing.Queue()
2431 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2432 subProc.daemon = True
2433 subProc.start()
2434 subProc.join()
2435
Jesse Noller1b90efb2009-06-30 17:11:52 +00002436def _afunc(x):
2437 return x*x
2438
2439def pool_in_process():
2440 pool = multiprocessing.Pool(processes=4)
2441 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2442
2443class _file_like(object):
2444 def __init__(self, delegate):
2445 self._delegate = delegate
2446 self._pid = None
2447
2448 @property
2449 def cache(self):
2450 pid = os.getpid()
2451 # There are no race conditions since fork keeps only the running thread
2452 if pid != self._pid:
2453 self._pid = pid
2454 self._cache = []
2455 return self._cache
2456
2457 def write(self, data):
2458 self.cache.append(data)
2459
2460 def flush(self):
2461 self._delegate.write(''.join(self.cache))
2462 self._cache = []
2463
2464class TestStdinBadfiledescriptor(unittest.TestCase):
2465
2466 def test_queue_in_process(self):
2467 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002468 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002469 proc.start()
2470 proc.join()
2471
2472 def test_pool_in_process(self):
2473 p = multiprocessing.Process(target=pool_in_process)
2474 p.start()
2475 p.join()
2476
2477 def test_flushing(self):
2478 sio = StringIO()
2479 flike = _file_like(sio)
2480 flike.write('foo')
2481 proc = multiprocessing.Process(target=lambda: flike.flush())
2482 flike.flush()
2483 assert sio.getvalue() == 'foo'
2484
Richard Oudkerke4b99382012-07-27 14:05:46 +01002485#
2486# Test interaction with socket timeouts - see Issue #6056
2487#
2488
2489class TestTimeouts(unittest.TestCase):
2490 @classmethod
2491 def _test_timeout(cls, child, address):
2492 time.sleep(1)
2493 child.send(123)
2494 child.close()
2495 conn = multiprocessing.connection.Client(address)
2496 conn.send(456)
2497 conn.close()
2498
2499 def test_timeout(self):
2500 old_timeout = socket.getdefaulttimeout()
2501 try:
2502 socket.setdefaulttimeout(0.1)
2503 parent, child = multiprocessing.Pipe(duplex=True)
2504 l = multiprocessing.connection.Listener(family='AF_INET')
2505 p = multiprocessing.Process(target=self._test_timeout,
2506 args=(child, l.address))
2507 p.start()
2508 child.close()
2509 self.assertEqual(parent.recv(), 123)
2510 parent.close()
2511 conn = l.accept()
2512 self.assertEqual(conn.recv(), 456)
2513 conn.close()
2514 l.close()
2515 p.join(10)
2516 finally:
2517 socket.setdefaulttimeout(old_timeout)
2518
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002519#
2520# Test what happens with no "if __name__ == '__main__'"
2521#
2522
2523class TestNoForkBomb(unittest.TestCase):
2524 def test_noforkbomb(self):
2525 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2526 if WIN32:
2527 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002528 self.assertEqual(out, '')
2529 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002530 else:
2531 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002532 self.assertEqual(out.rstrip(), '123')
2533 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002534
2535#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002536# Issue 12098: check sys.flags of child matches that for parent
2537#
2538
2539class TestFlags(unittest.TestCase):
2540 @classmethod
2541 def run_in_grandchild(cls, conn):
2542 conn.send(tuple(sys.flags))
2543
2544 @classmethod
2545 def run_in_child(cls):
2546 import json
2547 r, w = multiprocessing.Pipe(duplex=False)
2548 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2549 p.start()
2550 grandchild_flags = r.recv()
2551 p.join()
2552 r.close()
2553 w.close()
2554 flags = (tuple(sys.flags), grandchild_flags)
2555 print(json.dumps(flags))
2556
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002557 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002558 def test_flags(self):
2559 import json, subprocess
2560 # start child process using unusual flags
2561 prog = ('from test.test_multiprocessing import TestFlags; ' +
2562 'TestFlags.run_in_child()')
2563 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002564 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002565 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2566 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002567
2568#
2569# Issue #17555: ForkAwareThreadLock
2570#
2571
2572class TestForkAwareThreadLock(unittest.TestCase):
2573 # We recurisvely start processes. Issue #17555 meant that the
2574 # after fork registry would get duplicate entries for the same
2575 # lock. The size of the registry at generation n was ~2**n.
2576
2577 @classmethod
2578 def child(cls, n, conn):
2579 if n > 1:
2580 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2581 p.start()
2582 p.join()
2583 else:
2584 conn.send(len(util._afterfork_registry))
2585 conn.close()
2586
2587 def test_lock(self):
2588 r, w = multiprocessing.Pipe(False)
2589 l = util.ForkAwareThreadLock()
2590 old_size = len(util._afterfork_registry)
2591 p = multiprocessing.Process(target=self.child, args=(5, w))
2592 p.start()
2593 new_size = r.recv()
2594 p.join()
2595 self.assertLessEqual(new_size, old_size)
2596
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002597#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002598# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2599#
2600
2601class TestIgnoreEINTR(unittest.TestCase):
2602
2603 @classmethod
2604 def _test_ignore(cls, conn):
2605 def handler(signum, frame):
2606 pass
2607 signal.signal(signal.SIGUSR1, handler)
2608 conn.send('ready')
2609 x = conn.recv()
2610 conn.send(x)
2611 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2612
2613 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2614 def test_ignore(self):
2615 conn, child_conn = multiprocessing.Pipe()
2616 try:
2617 p = multiprocessing.Process(target=self._test_ignore,
2618 args=(child_conn,))
2619 p.daemon = True
2620 p.start()
2621 child_conn.close()
2622 self.assertEqual(conn.recv(), 'ready')
2623 time.sleep(0.1)
2624 os.kill(p.pid, signal.SIGUSR1)
2625 time.sleep(0.1)
2626 conn.send(1234)
2627 self.assertEqual(conn.recv(), 1234)
2628 time.sleep(0.1)
2629 os.kill(p.pid, signal.SIGUSR1)
2630 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2631 time.sleep(0.1)
2632 p.join()
2633 finally:
2634 conn.close()
2635
2636 @classmethod
2637 def _test_ignore_listener(cls, conn):
2638 def handler(signum, frame):
2639 pass
2640 signal.signal(signal.SIGUSR1, handler)
2641 l = multiprocessing.connection.Listener()
2642 conn.send(l.address)
2643 a = l.accept()
2644 a.send('welcome')
2645
2646 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2647 def test_ignore_listener(self):
2648 conn, child_conn = multiprocessing.Pipe()
2649 try:
2650 p = multiprocessing.Process(target=self._test_ignore_listener,
2651 args=(child_conn,))
2652 p.daemon = True
2653 p.start()
2654 child_conn.close()
2655 address = conn.recv()
2656 time.sleep(0.1)
2657 os.kill(p.pid, signal.SIGUSR1)
2658 time.sleep(0.1)
2659 client = multiprocessing.connection.Client(address)
2660 self.assertEqual(client.recv(), 'welcome')
2661 p.join()
2662 finally:
2663 conn.close()
2664
2665#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002666#
2667#
2668
Jesse Noller1b90efb2009-06-30 17:11:52 +00002669testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002670 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002671 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002672
Benjamin Petersondfd79492008-06-13 19:13:39 +00002673#
2674#
2675#
2676
2677def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002678 if sys.platform.startswith("linux"):
2679 try:
2680 lock = multiprocessing.RLock()
2681 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002682 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002683
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002684 check_enough_semaphores()
2685
Benjamin Petersondfd79492008-06-13 19:13:39 +00002686 if run is None:
2687 from test.test_support import run_unittest as run
2688
2689 util.get_temp_dir() # creates temp directory for use by all processes
2690
2691 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2692
Jesse Noller146b7ab2008-07-02 16:44:09 +00002693 ProcessesMixin.pool = multiprocessing.Pool(4)
2694 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2695 ManagerMixin.manager.__init__()
2696 ManagerMixin.manager.start()
2697 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002698
2699 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002700 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2701 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002702 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2703 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002704 )
2705
2706 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2707 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002708 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2709 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002710 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002711 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002712 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002713 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2714 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2715 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002716 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002717
Jesse Noller146b7ab2008-07-02 16:44:09 +00002718 ThreadsMixin.pool.terminate()
2719 ProcessesMixin.pool.terminate()
2720 ManagerMixin.pool.terminate()
2721 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002722
Jesse Noller146b7ab2008-07-02 16:44:09 +00002723 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002724
2725def main():
2726 test_main(unittest.TextTestRunner(verbosity=2).run)
2727
2728if __name__ == '__main__':
2729 main()