blob: 681529ff7d42641c2dd28f53edc795e4d7bacdcf [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010017import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020021# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000022# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
Charles-François Natalif8413b22011-09-21 18:44:49 +020035from multiprocessing import util
36
37try:
38 from multiprocessing import reduction
39 HAS_REDUCTION = True
40except ImportError:
41 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Brian Curtina06e9b82010-10-07 02:27:41 +000043try:
44 from multiprocessing.sharedctypes import Value, copy
45 HAS_SHAREDCTYPES = True
46except ImportError:
47 HAS_SHAREDCTYPES = False
48
Antoine Pitroua1a8da82011-08-23 19:54:20 +020049try:
50 import msvcrt
51except ImportError:
52 msvcrt = None
53
Benjamin Petersondfd79492008-06-13 19:13:39 +000054#
55#
56#
57
Benjamin Petersone79edf52008-07-13 18:34:58 +000058latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000059
Benjamin Petersondfd79492008-06-13 19:13:39 +000060#
61# Constants
62#
63
64LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000065#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000066
67DELTA = 0.1
68CHECK_TIMINGS = False # making true makes tests take a lot longer
69 # and can sometimes cause some non-serious
70 # failures because some calls block a bit
71 # longer than expected
72if CHECK_TIMINGS:
73 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
74else:
75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
76
77HAVE_GETVALUE = not getattr(_multiprocessing,
78 'HAVE_BROKEN_SEM_GETVALUE', False)
79
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000080WIN32 = (sys.platform == "win32")
81
Antoine Pitroua1a8da82011-08-23 19:54:20 +020082try:
83 MAXFD = os.sysconf("SC_OPEN_MAX")
84except:
85 MAXFD = 256
86
Benjamin Petersondfd79492008-06-13 19:13:39 +000087#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000088# Some tests require ctypes
89#
90
91try:
Nick Coghlan13623662010-04-10 14:24:36 +000092 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000093except ImportError:
94 Structure = object
95 c_int = c_double = None
96
Charles-François Natali6392d7f2011-11-22 18:35:18 +010097
98def check_enough_semaphores():
99 """Check that the system supports enough semaphores to run the test."""
100 # minimum number of semaphores available according to POSIX
101 nsems_min = 256
102 try:
103 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
104 except (AttributeError, ValueError):
105 # sysconf not available or setting not available
106 return
107 if nsems == -1 or nsems >= nsems_min:
108 return
109 raise unittest.SkipTest("The OS doesn't support enough semaphores "
110 "to run the test (required: %d)." % nsems_min)
111
112
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000113#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000114# Creates a wrapper for a function which records the time it takes to finish
115#
116
117class TimingWrapper(object):
118
119 def __init__(self, func):
120 self.func = func
121 self.elapsed = None
122
123 def __call__(self, *args, **kwds):
124 t = time.time()
125 try:
126 return self.func(*args, **kwds)
127 finally:
128 self.elapsed = time.time() - t
129
130#
131# Base class for test cases
132#
133
134class BaseTestCase(object):
135
136 ALLOWED_TYPES = ('processes', 'manager', 'threads')
137
138 def assertTimingAlmostEqual(self, a, b):
139 if CHECK_TIMINGS:
140 self.assertAlmostEqual(a, b, 1)
141
142 def assertReturnsIfImplemented(self, value, func, *args):
143 try:
144 res = func(*args)
145 except NotImplementedError:
146 pass
147 else:
148 return self.assertEqual(value, res)
149
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000150 # For the sanity of Windows users, rather than crashing or freezing in
151 # multiple ways.
152 def __reduce__(self, *args):
153 raise NotImplementedError("shouldn't try to pickle a test case")
154
155 __reduce_ex__ = __reduce__
156
Benjamin Petersondfd79492008-06-13 19:13:39 +0000157#
158# Return the value of a semaphore
159#
160
161def get_value(self):
162 try:
163 return self.get_value()
164 except AttributeError:
165 try:
166 return self._Semaphore__value
167 except AttributeError:
168 try:
169 return self._value
170 except AttributeError:
171 raise NotImplementedError
172
173#
174# Testcases
175#
176
177class _TestProcess(BaseTestCase):
178
179 ALLOWED_TYPES = ('processes', 'threads')
180
181 def test_current(self):
182 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600183 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184
185 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000186 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000187
188 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000189 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000190 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000191 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000192 self.assertEqual(current.ident, os.getpid())
193 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000195 @classmethod
196 def _test(cls, q, *args, **kwds):
197 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198 q.put(args)
199 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000201 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203 q.put(current.pid)
204
205 def test_process(self):
206 q = self.Queue(1)
207 e = self.Event()
208 args = (q, 1, 2)
209 kwargs = {'hello':23, 'bye':2.54}
210 name = 'SomeProcess'
211 p = self.Process(
212 target=self._test, args=args, kwargs=kwargs, name=name
213 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000214 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215 current = self.current_process()
216
217 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000218 self.assertEqual(p.authkey, current.authkey)
219 self.assertEqual(p.is_alive(), False)
220 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000221 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000223 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000224
225 p.start()
226
Ezio Melotti2623a372010-11-21 13:34:58 +0000227 self.assertEqual(p.exitcode, None)
228 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000229 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000230
Ezio Melotti2623a372010-11-21 13:34:58 +0000231 self.assertEqual(q.get(), args[1:])
232 self.assertEqual(q.get(), kwargs)
233 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000234 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000235 self.assertEqual(q.get(), current.authkey)
236 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000237
238 p.join()
239
Ezio Melotti2623a372010-11-21 13:34:58 +0000240 self.assertEqual(p.exitcode, 0)
241 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000242 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000243
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000244 @classmethod
245 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000246 time.sleep(1000)
247
248 def test_terminate(self):
249 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600250 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000251
252 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000253 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000254 p.start()
255
256 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000258 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000259
260 p.terminate()
261
262 join = TimingWrapper(p.join)
263 self.assertEqual(join(), None)
264 self.assertTimingAlmostEqual(join.elapsed, 0.0)
265
266 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000267 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000268
269 p.join()
270
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000271 # XXX sometimes get p.exitcode == 0 on Windows ...
272 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000273
274 def test_cpu_count(self):
275 try:
276 cpus = multiprocessing.cpu_count()
277 except NotImplementedError:
278 cpus = 1
279 self.assertTrue(type(cpus) is int)
280 self.assertTrue(cpus >= 1)
281
282 def test_active_children(self):
283 self.assertEqual(type(self.active_children()), list)
284
285 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000286 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000287
Jesus Cea6f6016b2011-09-09 20:26:57 +0200288 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000289 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000290 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000291
292 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000294
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000295 @classmethod
296 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000297 from multiprocessing import forking
298 wconn.send(id)
299 if len(id) < 2:
300 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000301 p = cls.Process(
302 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000303 )
304 p.start()
305 p.join()
306
307 def test_recursion(self):
308 rconn, wconn = self.Pipe(duplex=False)
309 self._test_recursion(wconn, [])
310
311 time.sleep(DELTA)
312 result = []
313 while rconn.poll():
314 result.append(rconn.recv())
315
316 expected = [
317 [],
318 [0],
319 [0, 0],
320 [0, 1],
321 [1],
322 [1, 0],
323 [1, 1]
324 ]
325 self.assertEqual(result, expected)
326
Richard Oudkerk2182e052012-06-06 19:01:14 +0100327 @classmethod
328 def _test_sys_exit(cls, reason, testfn):
329 sys.stderr = open(testfn, 'w')
330 sys.exit(reason)
331
332 def test_sys_exit(self):
333 # See Issue 13854
334 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600335 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100336
337 testfn = test_support.TESTFN
338 self.addCleanup(test_support.unlink, testfn)
339
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000340 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100341 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
342 p.daemon = True
343 p.start()
344 p.join(5)
345 self.assertEqual(p.exitcode, code)
346
347 with open(testfn, 'r') as f:
348 self.assertEqual(f.read().rstrip(), str(reason))
349
350 for reason in (True, False, 8):
351 p = self.Process(target=sys.exit, args=(reason,))
352 p.daemon = True
353 p.start()
354 p.join(5)
355 self.assertEqual(p.exitcode, reason)
356
Benjamin Petersondfd79492008-06-13 19:13:39 +0000357#
358#
359#
360
361class _UpperCaser(multiprocessing.Process):
362
363 def __init__(self):
364 multiprocessing.Process.__init__(self)
365 self.child_conn, self.parent_conn = multiprocessing.Pipe()
366
367 def run(self):
368 self.parent_conn.close()
369 for s in iter(self.child_conn.recv, None):
370 self.child_conn.send(s.upper())
371 self.child_conn.close()
372
373 def submit(self, s):
374 assert type(s) is str
375 self.parent_conn.send(s)
376 return self.parent_conn.recv()
377
378 def stop(self):
379 self.parent_conn.send(None)
380 self.parent_conn.close()
381 self.child_conn.close()
382
383class _TestSubclassingProcess(BaseTestCase):
384
385 ALLOWED_TYPES = ('processes',)
386
387 def test_subclassing(self):
388 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200389 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000390 uppercaser.start()
391 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
392 self.assertEqual(uppercaser.submit('world'), 'WORLD')
393 uppercaser.stop()
394 uppercaser.join()
395
396#
397#
398#
399
400def queue_empty(q):
401 if hasattr(q, 'empty'):
402 return q.empty()
403 else:
404 return q.qsize() == 0
405
406def queue_full(q, maxsize):
407 if hasattr(q, 'full'):
408 return q.full()
409 else:
410 return q.qsize() == maxsize
411
412
413class _TestQueue(BaseTestCase):
414
415
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000416 @classmethod
417 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000418 child_can_start.wait()
419 for i in range(6):
420 queue.get()
421 parent_can_continue.set()
422
423 def test_put(self):
424 MAXSIZE = 6
425 queue = self.Queue(maxsize=MAXSIZE)
426 child_can_start = self.Event()
427 parent_can_continue = self.Event()
428
429 proc = self.Process(
430 target=self._test_put,
431 args=(queue, child_can_start, parent_can_continue)
432 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000433 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000434 proc.start()
435
436 self.assertEqual(queue_empty(queue), True)
437 self.assertEqual(queue_full(queue, MAXSIZE), False)
438
439 queue.put(1)
440 queue.put(2, True)
441 queue.put(3, True, None)
442 queue.put(4, False)
443 queue.put(5, False, None)
444 queue.put_nowait(6)
445
446 # the values may be in buffer but not yet in pipe so sleep a bit
447 time.sleep(DELTA)
448
449 self.assertEqual(queue_empty(queue), False)
450 self.assertEqual(queue_full(queue, MAXSIZE), True)
451
452 put = TimingWrapper(queue.put)
453 put_nowait = TimingWrapper(queue.put_nowait)
454
455 self.assertRaises(Queue.Full, put, 7, False)
456 self.assertTimingAlmostEqual(put.elapsed, 0)
457
458 self.assertRaises(Queue.Full, put, 7, False, None)
459 self.assertTimingAlmostEqual(put.elapsed, 0)
460
461 self.assertRaises(Queue.Full, put_nowait, 7)
462 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
463
464 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
465 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
466
467 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
468 self.assertTimingAlmostEqual(put.elapsed, 0)
469
470 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
471 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
472
473 child_can_start.set()
474 parent_can_continue.wait()
475
476 self.assertEqual(queue_empty(queue), True)
477 self.assertEqual(queue_full(queue, MAXSIZE), False)
478
479 proc.join()
480
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000481 @classmethod
482 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000483 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000484 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000485 queue.put(2)
486 queue.put(3)
487 queue.put(4)
488 queue.put(5)
489 parent_can_continue.set()
490
491 def test_get(self):
492 queue = self.Queue()
493 child_can_start = self.Event()
494 parent_can_continue = self.Event()
495
496 proc = self.Process(
497 target=self._test_get,
498 args=(queue, child_can_start, parent_can_continue)
499 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000500 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000501 proc.start()
502
503 self.assertEqual(queue_empty(queue), True)
504
505 child_can_start.set()
506 parent_can_continue.wait()
507
508 time.sleep(DELTA)
509 self.assertEqual(queue_empty(queue), False)
510
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000511 # Hangs unexpectedly, remove for now
512 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000513 self.assertEqual(queue.get(True, None), 2)
514 self.assertEqual(queue.get(True), 3)
515 self.assertEqual(queue.get(timeout=1), 4)
516 self.assertEqual(queue.get_nowait(), 5)
517
518 self.assertEqual(queue_empty(queue), True)
519
520 get = TimingWrapper(queue.get)
521 get_nowait = TimingWrapper(queue.get_nowait)
522
523 self.assertRaises(Queue.Empty, get, False)
524 self.assertTimingAlmostEqual(get.elapsed, 0)
525
526 self.assertRaises(Queue.Empty, get, False, None)
527 self.assertTimingAlmostEqual(get.elapsed, 0)
528
529 self.assertRaises(Queue.Empty, get_nowait)
530 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
531
532 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
533 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
534
535 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
536 self.assertTimingAlmostEqual(get.elapsed, 0)
537
538 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
539 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
540
541 proc.join()
542
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000543 @classmethod
544 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000545 for i in range(10, 20):
546 queue.put(i)
547 # note that at this point the items may only be buffered, so the
548 # process cannot shutdown until the feeder thread has finished
549 # pushing items onto the pipe.
550
551 def test_fork(self):
552 # Old versions of Queue would fail to create a new feeder
553 # thread for a forked process if the original process had its
554 # own feeder thread. This test checks that this no longer
555 # happens.
556
557 queue = self.Queue()
558
559 # put items on queue so that main process starts a feeder thread
560 for i in range(10):
561 queue.put(i)
562
563 # wait to make sure thread starts before we fork a new process
564 time.sleep(DELTA)
565
566 # fork process
567 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200568 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000569 p.start()
570
571 # check that all expected items are in the queue
572 for i in range(20):
573 self.assertEqual(queue.get(), i)
574 self.assertRaises(Queue.Empty, queue.get, False)
575
576 p.join()
577
578 def test_qsize(self):
579 q = self.Queue()
580 try:
581 self.assertEqual(q.qsize(), 0)
582 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600583 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584 q.put(1)
585 self.assertEqual(q.qsize(), 1)
586 q.put(5)
587 self.assertEqual(q.qsize(), 2)
588 q.get()
589 self.assertEqual(q.qsize(), 1)
590 q.get()
591 self.assertEqual(q.qsize(), 0)
592
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000593 @classmethod
594 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000595 for obj in iter(q.get, None):
596 time.sleep(DELTA)
597 q.task_done()
598
599 def test_task_done(self):
600 queue = self.JoinableQueue()
601
602 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000603 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000604
605 workers = [self.Process(target=self._test_task_done, args=(queue,))
606 for i in xrange(4)]
607
608 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200609 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000610 p.start()
611
612 for i in xrange(10):
613 queue.put(i)
614
615 queue.join()
616
617 for p in workers:
618 queue.put(None)
619
620 for p in workers:
621 p.join()
622
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200623 def test_no_import_lock_contention(self):
624 with test_support.temp_cwd():
625 module_name = 'imported_by_an_imported_module'
626 with open(module_name + '.py', 'w') as f:
627 f.write("""if 1:
628 import multiprocessing
629
630 q = multiprocessing.Queue()
631 q.put('knock knock')
632 q.get(timeout=3)
633 q.close()
634 """)
635
636 with test_support.DirsOnSysPath(os.getcwd()):
637 try:
638 __import__(module_name)
639 except Queue.Empty:
640 self.fail("Probable regression on import lock contention;"
641 " see Issue #22853")
642
Benjamin Petersondfd79492008-06-13 19:13:39 +0000643#
644#
645#
646
647class _TestLock(BaseTestCase):
648
649 def test_lock(self):
650 lock = self.Lock()
651 self.assertEqual(lock.acquire(), True)
652 self.assertEqual(lock.acquire(False), False)
653 self.assertEqual(lock.release(), None)
654 self.assertRaises((ValueError, threading.ThreadError), lock.release)
655
656 def test_rlock(self):
657 lock = self.RLock()
658 self.assertEqual(lock.acquire(), True)
659 self.assertEqual(lock.acquire(), True)
660 self.assertEqual(lock.acquire(), True)
661 self.assertEqual(lock.release(), None)
662 self.assertEqual(lock.release(), None)
663 self.assertEqual(lock.release(), None)
664 self.assertRaises((AssertionError, RuntimeError), lock.release)
665
Jesse Noller82eb5902009-03-30 23:29:31 +0000666 def test_lock_context(self):
667 with self.Lock():
668 pass
669
Benjamin Petersondfd79492008-06-13 19:13:39 +0000670
671class _TestSemaphore(BaseTestCase):
672
673 def _test_semaphore(self, sem):
674 self.assertReturnsIfImplemented(2, get_value, sem)
675 self.assertEqual(sem.acquire(), True)
676 self.assertReturnsIfImplemented(1, get_value, sem)
677 self.assertEqual(sem.acquire(), True)
678 self.assertReturnsIfImplemented(0, get_value, sem)
679 self.assertEqual(sem.acquire(False), False)
680 self.assertReturnsIfImplemented(0, get_value, sem)
681 self.assertEqual(sem.release(), None)
682 self.assertReturnsIfImplemented(1, get_value, sem)
683 self.assertEqual(sem.release(), None)
684 self.assertReturnsIfImplemented(2, get_value, sem)
685
686 def test_semaphore(self):
687 sem = self.Semaphore(2)
688 self._test_semaphore(sem)
689 self.assertEqual(sem.release(), None)
690 self.assertReturnsIfImplemented(3, get_value, sem)
691 self.assertEqual(sem.release(), None)
692 self.assertReturnsIfImplemented(4, get_value, sem)
693
694 def test_bounded_semaphore(self):
695 sem = self.BoundedSemaphore(2)
696 self._test_semaphore(sem)
697 # Currently fails on OS/X
698 #if HAVE_GETVALUE:
699 # self.assertRaises(ValueError, sem.release)
700 # self.assertReturnsIfImplemented(2, get_value, sem)
701
702 def test_timeout(self):
703 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600704 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000705
706 sem = self.Semaphore(0)
707 acquire = TimingWrapper(sem.acquire)
708
709 self.assertEqual(acquire(False), False)
710 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
711
712 self.assertEqual(acquire(False, None), False)
713 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
714
715 self.assertEqual(acquire(False, TIMEOUT1), False)
716 self.assertTimingAlmostEqual(acquire.elapsed, 0)
717
718 self.assertEqual(acquire(True, TIMEOUT2), False)
719 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
720
721 self.assertEqual(acquire(timeout=TIMEOUT3), False)
722 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
723
724
725class _TestCondition(BaseTestCase):
726
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000727 @classmethod
728 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000729 cond.acquire()
730 sleeping.release()
731 cond.wait(timeout)
732 woken.release()
733 cond.release()
734
735 def check_invariant(self, cond):
736 # this is only supposed to succeed when there are no sleepers
737 if self.TYPE == 'processes':
738 try:
739 sleepers = (cond._sleeping_count.get_value() -
740 cond._woken_count.get_value())
741 self.assertEqual(sleepers, 0)
742 self.assertEqual(cond._wait_semaphore.get_value(), 0)
743 except NotImplementedError:
744 pass
745
746 def test_notify(self):
747 cond = self.Condition()
748 sleeping = self.Semaphore(0)
749 woken = self.Semaphore(0)
750
751 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000752 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000753 p.start()
754
755 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000756 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000757 p.start()
758
759 # wait for both children to start sleeping
760 sleeping.acquire()
761 sleeping.acquire()
762
763 # check no process/thread has woken up
764 time.sleep(DELTA)
765 self.assertReturnsIfImplemented(0, get_value, woken)
766
767 # wake up one process/thread
768 cond.acquire()
769 cond.notify()
770 cond.release()
771
772 # check one process/thread has woken up
773 time.sleep(DELTA)
774 self.assertReturnsIfImplemented(1, get_value, woken)
775
776 # wake up another
777 cond.acquire()
778 cond.notify()
779 cond.release()
780
781 # check other has woken up
782 time.sleep(DELTA)
783 self.assertReturnsIfImplemented(2, get_value, woken)
784
785 # check state is not mucked up
786 self.check_invariant(cond)
787 p.join()
788
789 def test_notify_all(self):
790 cond = self.Condition()
791 sleeping = self.Semaphore(0)
792 woken = self.Semaphore(0)
793
794 # start some threads/processes which will timeout
795 for i in range(3):
796 p = self.Process(target=self.f,
797 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000798 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 p.start()
800
801 t = threading.Thread(target=self.f,
802 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000803 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000804 t.start()
805
806 # wait for them all to sleep
807 for i in xrange(6):
808 sleeping.acquire()
809
810 # check they have all timed out
811 for i in xrange(6):
812 woken.acquire()
813 self.assertReturnsIfImplemented(0, get_value, woken)
814
815 # check state is not mucked up
816 self.check_invariant(cond)
817
818 # start some more threads/processes
819 for i in range(3):
820 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000821 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000822 p.start()
823
824 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000825 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000826 t.start()
827
828 # wait for them to all sleep
829 for i in xrange(6):
830 sleeping.acquire()
831
832 # check no process/thread has woken up
833 time.sleep(DELTA)
834 self.assertReturnsIfImplemented(0, get_value, woken)
835
836 # wake them all up
837 cond.acquire()
838 cond.notify_all()
839 cond.release()
840
841 # check they have all woken
842 time.sleep(DELTA)
843 self.assertReturnsIfImplemented(6, get_value, woken)
844
845 # check state is not mucked up
846 self.check_invariant(cond)
847
848 def test_timeout(self):
849 cond = self.Condition()
850 wait = TimingWrapper(cond.wait)
851 cond.acquire()
852 res = wait(TIMEOUT1)
853 cond.release()
854 self.assertEqual(res, None)
855 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
856
857
858class _TestEvent(BaseTestCase):
859
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000860 @classmethod
861 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000862 time.sleep(TIMEOUT2)
863 event.set()
864
865 def test_event(self):
866 event = self.Event()
867 wait = TimingWrapper(event.wait)
868
Ezio Melottic2077b02011-03-16 12:34:31 +0200869 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000870 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000871 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000872
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000873 # Removed, threading.Event.wait() will return the value of the __flag
874 # instead of None. API Shear with the semaphore backed mp.Event
875 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000876 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000877 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000878 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
879
880 event.set()
881
882 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000883 self.assertEqual(event.is_set(), True)
884 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000885 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000886 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000887 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
888 # self.assertEqual(event.is_set(), True)
889
890 event.clear()
891
892 #self.assertEqual(event.is_set(), False)
893
Jesus Cea6f6016b2011-09-09 20:26:57 +0200894 p = self.Process(target=self._test_event, args=(event,))
895 p.daemon = True
896 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000897 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000898
899#
900#
901#
902
903class _TestValue(BaseTestCase):
904
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000905 ALLOWED_TYPES = ('processes',)
906
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 codes_values = [
908 ('i', 4343, 24234),
909 ('d', 3.625, -4.25),
910 ('h', -232, 234),
911 ('c', latin('x'), latin('y'))
912 ]
913
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000914 def setUp(self):
915 if not HAS_SHAREDCTYPES:
916 self.skipTest("requires multiprocessing.sharedctypes")
917
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000918 @classmethod
919 def _test(cls, values):
920 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000921 sv.value = cv[2]
922
923
924 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000925 if raw:
926 values = [self.RawValue(code, value)
927 for code, value, _ in self.codes_values]
928 else:
929 values = [self.Value(code, value)
930 for code, value, _ in self.codes_values]
931
932 for sv, cv in zip(values, self.codes_values):
933 self.assertEqual(sv.value, cv[1])
934
935 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200936 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000937 proc.start()
938 proc.join()
939
940 for sv, cv in zip(values, self.codes_values):
941 self.assertEqual(sv.value, cv[2])
942
943 def test_rawvalue(self):
944 self.test_value(raw=True)
945
946 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000947 val1 = self.Value('i', 5)
948 lock1 = val1.get_lock()
949 obj1 = val1.get_obj()
950
951 val2 = self.Value('i', 5, lock=None)
952 lock2 = val2.get_lock()
953 obj2 = val2.get_obj()
954
955 lock = self.Lock()
956 val3 = self.Value('i', 5, lock=lock)
957 lock3 = val3.get_lock()
958 obj3 = val3.get_obj()
959 self.assertEqual(lock, lock3)
960
Jesse Noller6ab22152009-01-18 02:45:38 +0000961 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000962 self.assertFalse(hasattr(arr4, 'get_lock'))
963 self.assertFalse(hasattr(arr4, 'get_obj'))
964
Jesse Noller6ab22152009-01-18 02:45:38 +0000965 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
966
967 arr5 = self.RawValue('i', 5)
968 self.assertFalse(hasattr(arr5, 'get_lock'))
969 self.assertFalse(hasattr(arr5, 'get_obj'))
970
Benjamin Petersondfd79492008-06-13 19:13:39 +0000971
972class _TestArray(BaseTestCase):
973
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000974 ALLOWED_TYPES = ('processes',)
975
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000976 @classmethod
977 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000978 for i in range(1, len(seq)):
979 seq[i] += seq[i-1]
980
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000981 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000982 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000983 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
984 if raw:
985 arr = self.RawArray('i', seq)
986 else:
987 arr = self.Array('i', seq)
988
989 self.assertEqual(len(arr), len(seq))
990 self.assertEqual(arr[3], seq[3])
991 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
992
993 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
994
995 self.assertEqual(list(arr[:]), seq)
996
997 self.f(seq)
998
999 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001000 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001001 p.start()
1002 p.join()
1003
1004 self.assertEqual(list(arr[:]), seq)
1005
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001006 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001007 def test_array_from_size(self):
1008 size = 10
1009 # Test for zeroing (see issue #11675).
1010 # The repetition below strengthens the test by increasing the chances
1011 # of previously allocated non-zero memory being used for the new array
1012 # on the 2nd and 3rd loops.
1013 for _ in range(3):
1014 arr = self.Array('i', size)
1015 self.assertEqual(len(arr), size)
1016 self.assertEqual(list(arr), [0] * size)
1017 arr[:] = range(10)
1018 self.assertEqual(list(arr), range(10))
1019 del arr
1020
1021 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001022 def test_rawarray(self):
1023 self.test_array(raw=True)
1024
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001025 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001026 def test_array_accepts_long(self):
1027 arr = self.Array('i', 10L)
1028 self.assertEqual(len(arr), 10)
1029 raw_arr = self.RawArray('i', 10L)
1030 self.assertEqual(len(raw_arr), 10)
1031
1032 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001033 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001034 arr1 = self.Array('i', range(10))
1035 lock1 = arr1.get_lock()
1036 obj1 = arr1.get_obj()
1037
1038 arr2 = self.Array('i', range(10), lock=None)
1039 lock2 = arr2.get_lock()
1040 obj2 = arr2.get_obj()
1041
1042 lock = self.Lock()
1043 arr3 = self.Array('i', range(10), lock=lock)
1044 lock3 = arr3.get_lock()
1045 obj3 = arr3.get_obj()
1046 self.assertEqual(lock, lock3)
1047
Jesse Noller6ab22152009-01-18 02:45:38 +00001048 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001049 self.assertFalse(hasattr(arr4, 'get_lock'))
1050 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001051 self.assertRaises(AttributeError,
1052 self.Array, 'i', range(10), lock='notalock')
1053
1054 arr5 = self.RawArray('i', range(10))
1055 self.assertFalse(hasattr(arr5, 'get_lock'))
1056 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001057
1058#
1059#
1060#
1061
1062class _TestContainers(BaseTestCase):
1063
1064 ALLOWED_TYPES = ('manager',)
1065
1066 def test_list(self):
1067 a = self.list(range(10))
1068 self.assertEqual(a[:], range(10))
1069
1070 b = self.list()
1071 self.assertEqual(b[:], [])
1072
1073 b.extend(range(5))
1074 self.assertEqual(b[:], range(5))
1075
1076 self.assertEqual(b[2], 2)
1077 self.assertEqual(b[2:10], [2,3,4])
1078
1079 b *= 2
1080 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1081
1082 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1083
1084 self.assertEqual(a[:], range(10))
1085
1086 d = [a, b]
1087 e = self.list(d)
1088 self.assertEqual(
1089 e[:],
1090 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1091 )
1092
1093 f = self.list([a])
1094 a.append('hello')
1095 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1096
1097 def test_dict(self):
1098 d = self.dict()
1099 indices = range(65, 70)
1100 for i in indices:
1101 d[i] = chr(i)
1102 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1103 self.assertEqual(sorted(d.keys()), indices)
1104 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1105 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1106
1107 def test_namespace(self):
1108 n = self.Namespace()
1109 n.name = 'Bob'
1110 n.job = 'Builder'
1111 n._hidden = 'hidden'
1112 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1113 del n.job
1114 self.assertEqual(str(n), "Namespace(name='Bob')")
1115 self.assertTrue(hasattr(n, 'name'))
1116 self.assertTrue(not hasattr(n, 'job'))
1117
1118#
1119#
1120#
1121
1122def sqr(x, wait=0.0):
1123 time.sleep(wait)
1124 return x*x
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001125
1126class SayWhenError(ValueError): pass
1127
1128def exception_throwing_generator(total, when):
1129 for i in range(total):
1130 if i == when:
1131 raise SayWhenError("Somebody said when")
1132 yield i
1133
Benjamin Petersondfd79492008-06-13 19:13:39 +00001134class _TestPool(BaseTestCase):
1135
1136 def test_apply(self):
1137 papply = self.pool.apply
1138 self.assertEqual(papply(sqr, (5,)), sqr(5))
1139 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1140
1141 def test_map(self):
1142 pmap = self.pool.map
1143 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1144 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1145 map(sqr, range(100)))
1146
Richard Oudkerk21aad972013-10-28 23:02:22 +00001147 def test_map_unplicklable(self):
1148 # Issue #19425 -- failure to pickle should not cause a hang
1149 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001150 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001151 class A(object):
1152 def __reduce__(self):
1153 raise RuntimeError('cannot pickle')
1154 with self.assertRaises(RuntimeError):
1155 self.pool.map(sqr, [A()]*10)
1156
Jesse Noller7530e472009-07-16 14:23:04 +00001157 def test_map_chunksize(self):
1158 try:
1159 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1160 except multiprocessing.TimeoutError:
1161 self.fail("pool.map_async with chunksize stalled on null list")
1162
Benjamin Petersondfd79492008-06-13 19:13:39 +00001163 def test_async(self):
1164 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1165 get = TimingWrapper(res.get)
1166 self.assertEqual(get(), 49)
1167 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1168
1169 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001170 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001171 get = TimingWrapper(res.get)
1172 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1173 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1174
1175 def test_imap(self):
1176 it = self.pool.imap(sqr, range(10))
1177 self.assertEqual(list(it), map(sqr, range(10)))
1178
1179 it = self.pool.imap(sqr, range(10))
1180 for i in range(10):
1181 self.assertEqual(it.next(), i*i)
1182 self.assertRaises(StopIteration, it.next)
1183
1184 it = self.pool.imap(sqr, range(1000), chunksize=100)
1185 for i in range(1000):
1186 self.assertEqual(it.next(), i*i)
1187 self.assertRaises(StopIteration, it.next)
1188
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001189 def test_imap_handle_iterable_exception(self):
1190 if self.TYPE == 'manager':
1191 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1192
1193 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1194 for i in range(3):
1195 self.assertEqual(next(it), i*i)
1196 self.assertRaises(SayWhenError, it.next)
1197
1198 # SayWhenError seen at start of problematic chunk's results
1199 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1200 for i in range(6):
1201 self.assertEqual(next(it), i*i)
1202 self.assertRaises(SayWhenError, it.next)
1203 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1204 for i in range(4):
1205 self.assertEqual(next(it), i*i)
1206 self.assertRaises(SayWhenError, it.next)
1207
Benjamin Petersondfd79492008-06-13 19:13:39 +00001208 def test_imap_unordered(self):
1209 it = self.pool.imap_unordered(sqr, range(1000))
1210 self.assertEqual(sorted(it), map(sqr, range(1000)))
1211
1212 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1213 self.assertEqual(sorted(it), map(sqr, range(1000)))
1214
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001215 def test_imap_unordered_handle_iterable_exception(self):
1216 if self.TYPE == 'manager':
1217 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1218
1219 it = self.pool.imap_unordered(sqr,
1220 exception_throwing_generator(10, 3),
1221 1)
1222 with self.assertRaises(SayWhenError):
1223 # imap_unordered makes it difficult to anticipate the SayWhenError
1224 for i in range(10):
1225 self.assertEqual(next(it), i*i)
1226
1227 it = self.pool.imap_unordered(sqr,
1228 exception_throwing_generator(20, 7),
1229 2)
1230 with self.assertRaises(SayWhenError):
1231 for i in range(20):
1232 self.assertEqual(next(it), i*i)
1233
Benjamin Petersondfd79492008-06-13 19:13:39 +00001234 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001235 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1236 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1237
Benjamin Petersondfd79492008-06-13 19:13:39 +00001238 p = multiprocessing.Pool(3)
1239 self.assertEqual(3, len(p._pool))
1240 p.close()
1241 p.join()
1242
1243 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001244 p = self.Pool(4)
1245 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001246 time.sleep, [0.1 for i in range(10000)], chunksize=1
1247 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001248 p.terminate()
1249 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001250 join()
1251 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001252
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001253 def test_empty_iterable(self):
1254 # See Issue 12157
1255 p = self.Pool(1)
1256
1257 self.assertEqual(p.map(sqr, []), [])
1258 self.assertEqual(list(p.imap(sqr, [])), [])
1259 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1260 self.assertEqual(p.map_async(sqr, []).get(), [])
1261
1262 p.close()
1263 p.join()
1264
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001265def unpickleable_result():
1266 return lambda: 42
1267
1268class _TestPoolWorkerErrors(BaseTestCase):
1269 ALLOWED_TYPES = ('processes', )
1270
1271 def test_unpickleable_result(self):
1272 from multiprocessing.pool import MaybeEncodingError
1273 p = multiprocessing.Pool(2)
1274
1275 # Make sure we don't lose pool processes because of encoding errors.
1276 for iteration in range(20):
1277 res = p.apply_async(unpickleable_result)
1278 self.assertRaises(MaybeEncodingError, res.get)
1279
1280 p.close()
1281 p.join()
1282
Jesse Noller654ade32010-01-27 03:05:57 +00001283class _TestPoolWorkerLifetime(BaseTestCase):
1284
1285 ALLOWED_TYPES = ('processes', )
1286 def test_pool_worker_lifetime(self):
1287 p = multiprocessing.Pool(3, maxtasksperchild=10)
1288 self.assertEqual(3, len(p._pool))
1289 origworkerpids = [w.pid for w in p._pool]
1290 # Run many tasks so each worker gets replaced (hopefully)
1291 results = []
1292 for i in range(100):
1293 results.append(p.apply_async(sqr, (i, )))
1294 # Fetch the results and verify we got the right answers,
1295 # also ensuring all the tasks have completed.
1296 for (j, res) in enumerate(results):
1297 self.assertEqual(res.get(), sqr(j))
1298 # Refill the pool
1299 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001300 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001301 # (countdown * DELTA = 5 seconds max startup process time)
1302 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001303 while countdown and not all(w.is_alive() for w in p._pool):
1304 countdown -= 1
1305 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001306 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001307 # All pids should be assigned. See issue #7805.
1308 self.assertNotIn(None, origworkerpids)
1309 self.assertNotIn(None, finalworkerpids)
1310 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001311 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1312 p.close()
1313 p.join()
1314
Charles-François Natali46f990e2011-10-24 18:43:51 +02001315 def test_pool_worker_lifetime_early_close(self):
1316 # Issue #10332: closing a pool whose workers have limited lifetimes
1317 # before all the tasks completed would make join() hang.
1318 p = multiprocessing.Pool(3, maxtasksperchild=1)
1319 results = []
1320 for i in range(6):
1321 results.append(p.apply_async(sqr, (i, 0.3)))
1322 p.close()
1323 p.join()
1324 # check the results
1325 for (j, res) in enumerate(results):
1326 self.assertEqual(res.get(), sqr(j))
1327
1328
Benjamin Petersondfd79492008-06-13 19:13:39 +00001329#
1330# Test that manager has expected number of shared objects left
1331#
1332
1333class _TestZZZNumberOfObjects(BaseTestCase):
1334 # Because test cases are sorted alphabetically, this one will get
1335 # run after all the other tests for the manager. It tests that
1336 # there have been no "reference leaks" for the manager's shared
1337 # objects. Note the comment in _TestPool.test_terminate().
1338 ALLOWED_TYPES = ('manager',)
1339
1340 def test_number_of_objects(self):
1341 EXPECTED_NUMBER = 1 # the pool object is still alive
1342 multiprocessing.active_children() # discard dead process objs
1343 gc.collect() # do garbage collection
1344 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001345 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001346 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001347 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001348 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001349
1350 self.assertEqual(refs, EXPECTED_NUMBER)
1351
1352#
1353# Test of creating a customized manager class
1354#
1355
1356from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1357
1358class FooBar(object):
1359 def f(self):
1360 return 'f()'
1361 def g(self):
1362 raise ValueError
1363 def _h(self):
1364 return '_h()'
1365
1366def baz():
1367 for i in xrange(10):
1368 yield i*i
1369
1370class IteratorProxy(BaseProxy):
1371 _exposed_ = ('next', '__next__')
1372 def __iter__(self):
1373 return self
1374 def next(self):
1375 return self._callmethod('next')
1376 def __next__(self):
1377 return self._callmethod('__next__')
1378
1379class MyManager(BaseManager):
1380 pass
1381
1382MyManager.register('Foo', callable=FooBar)
1383MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1384MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1385
1386
1387class _TestMyManager(BaseTestCase):
1388
1389 ALLOWED_TYPES = ('manager',)
1390
1391 def test_mymanager(self):
1392 manager = MyManager()
1393 manager.start()
1394
1395 foo = manager.Foo()
1396 bar = manager.Bar()
1397 baz = manager.baz()
1398
1399 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1400 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1401
1402 self.assertEqual(foo_methods, ['f', 'g'])
1403 self.assertEqual(bar_methods, ['f', '_h'])
1404
1405 self.assertEqual(foo.f(), 'f()')
1406 self.assertRaises(ValueError, foo.g)
1407 self.assertEqual(foo._callmethod('f'), 'f()')
1408 self.assertRaises(RemoteError, foo._callmethod, '_h')
1409
1410 self.assertEqual(bar.f(), 'f()')
1411 self.assertEqual(bar._h(), '_h()')
1412 self.assertEqual(bar._callmethod('f'), 'f()')
1413 self.assertEqual(bar._callmethod('_h'), '_h()')
1414
1415 self.assertEqual(list(baz), [i*i for i in range(10)])
1416
1417 manager.shutdown()
1418
1419#
1420# Test of connecting to a remote server and using xmlrpclib for serialization
1421#
1422
1423_queue = Queue.Queue()
1424def get_queue():
1425 return _queue
1426
1427class QueueManager(BaseManager):
1428 '''manager class used by server process'''
1429QueueManager.register('get_queue', callable=get_queue)
1430
1431class QueueManager2(BaseManager):
1432 '''manager class which specifies the same interface as QueueManager'''
1433QueueManager2.register('get_queue')
1434
1435
1436SERIALIZER = 'xmlrpclib'
1437
1438class _TestRemoteManager(BaseTestCase):
1439
1440 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001441 values = ['hello world', None, True, 2.25,
1442 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1443 ]
1444 result = values[:]
1445 if test_support.have_unicode:
1446 #result[-1] = u'hall\xe5 v\xe4rlden'
1447 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1448 r'\u0441\u0432\u0456\u0442')
1449 values.append(uvalue)
1450 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001451
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001452 @classmethod
1453 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001454 manager = QueueManager2(
1455 address=address, authkey=authkey, serializer=SERIALIZER
1456 )
1457 manager.connect()
1458 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001459 # Note that xmlrpclib will deserialize object as a list not a tuple
1460 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001461
1462 def test_remote(self):
1463 authkey = os.urandom(32)
1464
1465 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001466 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001467 )
1468 manager.start()
1469
1470 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001471 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001472 p.start()
1473
1474 manager2 = QueueManager2(
1475 address=manager.address, authkey=authkey, serializer=SERIALIZER
1476 )
1477 manager2.connect()
1478 queue = manager2.get_queue()
1479
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001480 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001481
1482 # Because we are using xmlrpclib for serialization instead of
1483 # pickle this will cause a serialization error.
1484 self.assertRaises(Exception, queue.put, time.sleep)
1485
1486 # Make queue finalizer run before the server is stopped
1487 del queue
1488 manager.shutdown()
1489
Jesse Noller459a6482009-03-30 15:50:42 +00001490class _TestManagerRestart(BaseTestCase):
1491
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001492 @classmethod
1493 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001494 manager = QueueManager(
1495 address=address, authkey=authkey, serializer=SERIALIZER)
1496 manager.connect()
1497 queue = manager.get_queue()
1498 queue.put('hello world')
1499
1500 def test_rapid_restart(self):
1501 authkey = os.urandom(32)
1502 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001503 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001504 srvr = manager.get_server()
1505 addr = srvr.address
1506 # Close the connection.Listener socket which gets opened as a part
1507 # of manager.get_server(). It's not needed for the test.
1508 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001509 manager.start()
1510
1511 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001512 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001513 p.start()
1514 queue = manager.get_queue()
1515 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001516 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001517 manager.shutdown()
1518 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001519 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001520 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001521 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001522
Benjamin Petersondfd79492008-06-13 19:13:39 +00001523#
1524#
1525#
1526
1527SENTINEL = latin('')
1528
1529class _TestConnection(BaseTestCase):
1530
1531 ALLOWED_TYPES = ('processes', 'threads')
1532
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001533 @classmethod
1534 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001535 for msg in iter(conn.recv_bytes, SENTINEL):
1536 conn.send_bytes(msg)
1537 conn.close()
1538
1539 def test_connection(self):
1540 conn, child_conn = self.Pipe()
1541
1542 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001543 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001544 p.start()
1545
1546 seq = [1, 2.25, None]
1547 msg = latin('hello world')
1548 longmsg = msg * 10
1549 arr = array.array('i', range(4))
1550
1551 if self.TYPE == 'processes':
1552 self.assertEqual(type(conn.fileno()), int)
1553
1554 self.assertEqual(conn.send(seq), None)
1555 self.assertEqual(conn.recv(), seq)
1556
1557 self.assertEqual(conn.send_bytes(msg), None)
1558 self.assertEqual(conn.recv_bytes(), msg)
1559
1560 if self.TYPE == 'processes':
1561 buffer = array.array('i', [0]*10)
1562 expected = list(arr) + [0] * (10 - len(arr))
1563 self.assertEqual(conn.send_bytes(arr), None)
1564 self.assertEqual(conn.recv_bytes_into(buffer),
1565 len(arr) * buffer.itemsize)
1566 self.assertEqual(list(buffer), expected)
1567
1568 buffer = array.array('i', [0]*10)
1569 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1570 self.assertEqual(conn.send_bytes(arr), None)
1571 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1572 len(arr) * buffer.itemsize)
1573 self.assertEqual(list(buffer), expected)
1574
1575 buffer = bytearray(latin(' ' * 40))
1576 self.assertEqual(conn.send_bytes(longmsg), None)
1577 try:
1578 res = conn.recv_bytes_into(buffer)
1579 except multiprocessing.BufferTooShort, e:
1580 self.assertEqual(e.args, (longmsg,))
1581 else:
1582 self.fail('expected BufferTooShort, got %s' % res)
1583
1584 poll = TimingWrapper(conn.poll)
1585
1586 self.assertEqual(poll(), False)
1587 self.assertTimingAlmostEqual(poll.elapsed, 0)
1588
1589 self.assertEqual(poll(TIMEOUT1), False)
1590 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1591
1592 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001593 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001594
1595 self.assertEqual(poll(TIMEOUT1), True)
1596 self.assertTimingAlmostEqual(poll.elapsed, 0)
1597
1598 self.assertEqual(conn.recv(), None)
1599
1600 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1601 conn.send_bytes(really_big_msg)
1602 self.assertEqual(conn.recv_bytes(), really_big_msg)
1603
1604 conn.send_bytes(SENTINEL) # tell child to quit
1605 child_conn.close()
1606
1607 if self.TYPE == 'processes':
1608 self.assertEqual(conn.readable, True)
1609 self.assertEqual(conn.writable, True)
1610 self.assertRaises(EOFError, conn.recv)
1611 self.assertRaises(EOFError, conn.recv_bytes)
1612
1613 p.join()
1614
1615 def test_duplex_false(self):
1616 reader, writer = self.Pipe(duplex=False)
1617 self.assertEqual(writer.send(1), None)
1618 self.assertEqual(reader.recv(), 1)
1619 if self.TYPE == 'processes':
1620 self.assertEqual(reader.readable, True)
1621 self.assertEqual(reader.writable, False)
1622 self.assertEqual(writer.readable, False)
1623 self.assertEqual(writer.writable, True)
1624 self.assertRaises(IOError, reader.send, 2)
1625 self.assertRaises(IOError, writer.recv)
1626 self.assertRaises(IOError, writer.poll)
1627
1628 def test_spawn_close(self):
1629 # We test that a pipe connection can be closed by parent
1630 # process immediately after child is spawned. On Windows this
1631 # would have sometimes failed on old versions because
1632 # child_conn would be closed before the child got a chance to
1633 # duplicate it.
1634 conn, child_conn = self.Pipe()
1635
1636 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001637 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001638 p.start()
1639 child_conn.close() # this might complete before child initializes
1640
1641 msg = latin('hello')
1642 conn.send_bytes(msg)
1643 self.assertEqual(conn.recv_bytes(), msg)
1644
1645 conn.send_bytes(SENTINEL)
1646 conn.close()
1647 p.join()
1648
1649 def test_sendbytes(self):
1650 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001651 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001652
1653 msg = latin('abcdefghijklmnopqrstuvwxyz')
1654 a, b = self.Pipe()
1655
1656 a.send_bytes(msg)
1657 self.assertEqual(b.recv_bytes(), msg)
1658
1659 a.send_bytes(msg, 5)
1660 self.assertEqual(b.recv_bytes(), msg[5:])
1661
1662 a.send_bytes(msg, 7, 8)
1663 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1664
1665 a.send_bytes(msg, 26)
1666 self.assertEqual(b.recv_bytes(), latin(''))
1667
1668 a.send_bytes(msg, 26, 0)
1669 self.assertEqual(b.recv_bytes(), latin(''))
1670
1671 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1672
1673 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1674
1675 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1676
1677 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1678
1679 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1680
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001681 @classmethod
1682 def _is_fd_assigned(cls, fd):
1683 try:
1684 os.fstat(fd)
1685 except OSError as e:
1686 if e.errno == errno.EBADF:
1687 return False
1688 raise
1689 else:
1690 return True
1691
1692 @classmethod
1693 def _writefd(cls, conn, data, create_dummy_fds=False):
1694 if create_dummy_fds:
1695 for i in range(0, 256):
1696 if not cls._is_fd_assigned(i):
1697 os.dup2(conn.fileno(), i)
1698 fd = reduction.recv_handle(conn)
1699 if msvcrt:
1700 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1701 os.write(fd, data)
1702 os.close(fd)
1703
Charles-François Natalif8413b22011-09-21 18:44:49 +02001704 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001705 def test_fd_transfer(self):
1706 if self.TYPE != 'processes':
1707 self.skipTest("only makes sense with processes")
1708 conn, child_conn = self.Pipe(duplex=True)
1709
1710 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001711 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001712 p.start()
1713 with open(test_support.TESTFN, "wb") as f:
1714 fd = f.fileno()
1715 if msvcrt:
1716 fd = msvcrt.get_osfhandle(fd)
1717 reduction.send_handle(conn, fd, p.pid)
1718 p.join()
1719 with open(test_support.TESTFN, "rb") as f:
1720 self.assertEqual(f.read(), b"foo")
1721
Charles-François Natalif8413b22011-09-21 18:44:49 +02001722 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001723 @unittest.skipIf(sys.platform == "win32",
1724 "test semantics don't make sense on Windows")
1725 @unittest.skipIf(MAXFD <= 256,
1726 "largest assignable fd number is too small")
1727 @unittest.skipUnless(hasattr(os, "dup2"),
1728 "test needs os.dup2()")
1729 def test_large_fd_transfer(self):
1730 # With fd > 256 (issue #11657)
1731 if self.TYPE != 'processes':
1732 self.skipTest("only makes sense with processes")
1733 conn, child_conn = self.Pipe(duplex=True)
1734
1735 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001736 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001737 p.start()
1738 with open(test_support.TESTFN, "wb") as f:
1739 fd = f.fileno()
1740 for newfd in range(256, MAXFD):
1741 if not self._is_fd_assigned(newfd):
1742 break
1743 else:
1744 self.fail("could not find an unassigned large file descriptor")
1745 os.dup2(fd, newfd)
1746 try:
1747 reduction.send_handle(conn, newfd, p.pid)
1748 finally:
1749 os.close(newfd)
1750 p.join()
1751 with open(test_support.TESTFN, "rb") as f:
1752 self.assertEqual(f.read(), b"bar")
1753
Jesus Ceac23484b2011-09-21 03:47:39 +02001754 @classmethod
1755 def _send_data_without_fd(self, conn):
1756 os.write(conn.fileno(), b"\0")
1757
Charles-François Natalif8413b22011-09-21 18:44:49 +02001758 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001759 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1760 def test_missing_fd_transfer(self):
1761 # Check that exception is raised when received data is not
1762 # accompanied by a file descriptor in ancillary data.
1763 if self.TYPE != 'processes':
1764 self.skipTest("only makes sense with processes")
1765 conn, child_conn = self.Pipe(duplex=True)
1766
1767 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1768 p.daemon = True
1769 p.start()
1770 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1771 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001772
Benjamin Petersondfd79492008-06-13 19:13:39 +00001773class _TestListenerClient(BaseTestCase):
1774
1775 ALLOWED_TYPES = ('processes', 'threads')
1776
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001777 @classmethod
1778 def _test(cls, address):
1779 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001780 conn.send('hello')
1781 conn.close()
1782
1783 def test_listener_client(self):
1784 for family in self.connection.families:
1785 l = self.connection.Listener(family=family)
1786 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001787 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001788 p.start()
1789 conn = l.accept()
1790 self.assertEqual(conn.recv(), 'hello')
1791 p.join()
1792 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001793
1794 def test_issue14725(self):
1795 l = self.connection.Listener()
1796 p = self.Process(target=self._test, args=(l.address,))
1797 p.daemon = True
1798 p.start()
1799 time.sleep(1)
1800 # On Windows the client process should by now have connected,
1801 # written data and closed the pipe handle by now. This causes
1802 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1803 # 14725.
1804 conn = l.accept()
1805 self.assertEqual(conn.recv(), 'hello')
1806 conn.close()
1807 p.join()
1808 l.close()
1809
Benjamin Petersondfd79492008-06-13 19:13:39 +00001810#
1811# Test of sending connection and socket objects between processes
1812#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001813"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001814class _TestPicklingConnections(BaseTestCase):
1815
1816 ALLOWED_TYPES = ('processes',)
1817
1818 def _listener(self, conn, families):
1819 for fam in families:
1820 l = self.connection.Listener(family=fam)
1821 conn.send(l.address)
1822 new_conn = l.accept()
1823 conn.send(new_conn)
1824
1825 if self.TYPE == 'processes':
1826 l = socket.socket()
1827 l.bind(('localhost', 0))
1828 conn.send(l.getsockname())
1829 l.listen(1)
1830 new_conn, addr = l.accept()
1831 conn.send(new_conn)
1832
1833 conn.recv()
1834
1835 def _remote(self, conn):
1836 for (address, msg) in iter(conn.recv, None):
1837 client = self.connection.Client(address)
1838 client.send(msg.upper())
1839 client.close()
1840
1841 if self.TYPE == 'processes':
1842 address, msg = conn.recv()
1843 client = socket.socket()
1844 client.connect(address)
1845 client.sendall(msg.upper())
1846 client.close()
1847
1848 conn.close()
1849
1850 def test_pickling(self):
1851 try:
1852 multiprocessing.allow_connection_pickling()
1853 except ImportError:
1854 return
1855
1856 families = self.connection.families
1857
1858 lconn, lconn0 = self.Pipe()
1859 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001860 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001861 lp.start()
1862 lconn0.close()
1863
1864 rconn, rconn0 = self.Pipe()
1865 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001866 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001867 rp.start()
1868 rconn0.close()
1869
1870 for fam in families:
1871 msg = ('This connection uses family %s' % fam).encode('ascii')
1872 address = lconn.recv()
1873 rconn.send((address, msg))
1874 new_conn = lconn.recv()
1875 self.assertEqual(new_conn.recv(), msg.upper())
1876
1877 rconn.send(None)
1878
1879 if self.TYPE == 'processes':
1880 msg = latin('This connection uses a normal socket')
1881 address = lconn.recv()
1882 rconn.send((address, msg))
1883 if hasattr(socket, 'fromfd'):
1884 new_conn = lconn.recv()
1885 self.assertEqual(new_conn.recv(100), msg.upper())
1886 else:
1887 # XXX On Windows with Py2.6 need to backport fromfd()
1888 discard = lconn.recv_bytes()
1889
1890 lconn.send(None)
1891
1892 rconn.close()
1893 lconn.close()
1894
1895 lp.join()
1896 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001897"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001898#
1899#
1900#
1901
1902class _TestHeap(BaseTestCase):
1903
1904 ALLOWED_TYPES = ('processes',)
1905
1906 def test_heap(self):
1907 iterations = 5000
1908 maxblocks = 50
1909 blocks = []
1910
1911 # create and destroy lots of blocks of different sizes
1912 for i in xrange(iterations):
1913 size = int(random.lognormvariate(0, 1) * 1000)
1914 b = multiprocessing.heap.BufferWrapper(size)
1915 blocks.append(b)
1916 if len(blocks) > maxblocks:
1917 i = random.randrange(maxblocks)
1918 del blocks[i]
1919
1920 # get the heap object
1921 heap = multiprocessing.heap.BufferWrapper._heap
1922
1923 # verify the state of the heap
1924 all = []
1925 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001926 heap._lock.acquire()
1927 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001928 for L in heap._len_to_seq.values():
1929 for arena, start, stop in L:
1930 all.append((heap._arenas.index(arena), start, stop,
1931 stop-start, 'free'))
1932 for arena, start, stop in heap._allocated_blocks:
1933 all.append((heap._arenas.index(arena), start, stop,
1934 stop-start, 'occupied'))
1935 occupied += (stop-start)
1936
1937 all.sort()
1938
1939 for i in range(len(all)-1):
1940 (arena, start, stop) = all[i][:3]
1941 (narena, nstart, nstop) = all[i+1][:3]
1942 self.assertTrue((arena != narena and nstart == 0) or
1943 (stop == nstart))
1944
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001945 def test_free_from_gc(self):
1946 # Check that freeing of blocks by the garbage collector doesn't deadlock
1947 # (issue #12352).
1948 # Make sure the GC is enabled, and set lower collection thresholds to
1949 # make collections more frequent (and increase the probability of
1950 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001951 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001952 gc.enable()
1953 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001954 thresholds = gc.get_threshold()
1955 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001956 gc.set_threshold(10)
1957
1958 # perform numerous block allocations, with cyclic references to make
1959 # sure objects are collected asynchronously by the gc
1960 for i in range(5000):
1961 a = multiprocessing.heap.BufferWrapper(1)
1962 b = multiprocessing.heap.BufferWrapper(1)
1963 # circular references
1964 a.buddy = b
1965 b.buddy = a
1966
Benjamin Petersondfd79492008-06-13 19:13:39 +00001967#
1968#
1969#
1970
Benjamin Petersondfd79492008-06-13 19:13:39 +00001971class _Foo(Structure):
1972 _fields_ = [
1973 ('x', c_int),
1974 ('y', c_double)
1975 ]
1976
1977class _TestSharedCTypes(BaseTestCase):
1978
1979 ALLOWED_TYPES = ('processes',)
1980
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001981 def setUp(self):
1982 if not HAS_SHAREDCTYPES:
1983 self.skipTest("requires multiprocessing.sharedctypes")
1984
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001985 @classmethod
1986 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001987 x.value *= 2
1988 y.value *= 2
1989 foo.x *= 2
1990 foo.y *= 2
1991 string.value *= 2
1992 for i in range(len(arr)):
1993 arr[i] *= 2
1994
1995 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001996 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001997 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001998 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001999 arr = self.Array('d', range(10), lock=lock)
2000 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00002001 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002002
2003 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002004 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002005 p.start()
2006 p.join()
2007
2008 self.assertEqual(x.value, 14)
2009 self.assertAlmostEqual(y.value, 2.0/3.0)
2010 self.assertEqual(foo.x, 6)
2011 self.assertAlmostEqual(foo.y, 4.0)
2012 for i in range(10):
2013 self.assertAlmostEqual(arr[i], i*2)
2014 self.assertEqual(string.value, latin('hellohello'))
2015
2016 def test_synchronize(self):
2017 self.test_sharedctypes(lock=True)
2018
2019 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002020 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00002021 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002022 foo.x = 0
2023 foo.y = 0
2024 self.assertEqual(bar.x, 2)
2025 self.assertAlmostEqual(bar.y, 5.0)
2026
2027#
2028#
2029#
2030
2031class _TestFinalize(BaseTestCase):
2032
2033 ALLOWED_TYPES = ('processes',)
2034
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002035 @classmethod
2036 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002037 class Foo(object):
2038 pass
2039
2040 a = Foo()
2041 util.Finalize(a, conn.send, args=('a',))
2042 del a # triggers callback for a
2043
2044 b = Foo()
2045 close_b = util.Finalize(b, conn.send, args=('b',))
2046 close_b() # triggers callback for b
2047 close_b() # does nothing because callback has already been called
2048 del b # does nothing because callback has already been called
2049
2050 c = Foo()
2051 util.Finalize(c, conn.send, args=('c',))
2052
2053 d10 = Foo()
2054 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2055
2056 d01 = Foo()
2057 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2058 d02 = Foo()
2059 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2060 d03 = Foo()
2061 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2062
2063 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2064
2065 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2066
Ezio Melottic2077b02011-03-16 12:34:31 +02002067 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002068 # garbage collecting locals
2069 util._exit_function()
2070 conn.close()
2071 os._exit(0)
2072
2073 def test_finalize(self):
2074 conn, child_conn = self.Pipe()
2075
2076 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002077 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002078 p.start()
2079 p.join()
2080
2081 result = [obj for obj in iter(conn.recv, 'STOP')]
2082 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2083
2084#
2085# Test that from ... import * works for each module
2086#
2087
2088class _TestImportStar(BaseTestCase):
2089
2090 ALLOWED_TYPES = ('processes',)
2091
2092 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002093 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002094 'multiprocessing', 'multiprocessing.connection',
2095 'multiprocessing.heap', 'multiprocessing.managers',
2096 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002097 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002098 ]
2099
Charles-François Natalif8413b22011-09-21 18:44:49 +02002100 if HAS_REDUCTION:
2101 modules.append('multiprocessing.reduction')
2102
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002103 if c_int is not None:
2104 # This module requires _ctypes
2105 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002106
2107 for name in modules:
2108 __import__(name)
2109 mod = sys.modules[name]
2110
2111 for attr in getattr(mod, '__all__', ()):
2112 self.assertTrue(
2113 hasattr(mod, attr),
2114 '%r does not have attribute %r' % (mod, attr)
2115 )
2116
2117#
2118# Quick test that logging works -- does not test logging output
2119#
2120
2121class _TestLogging(BaseTestCase):
2122
2123 ALLOWED_TYPES = ('processes',)
2124
2125 def test_enable_logging(self):
2126 logger = multiprocessing.get_logger()
2127 logger.setLevel(util.SUBWARNING)
2128 self.assertTrue(logger is not None)
2129 logger.debug('this will not be printed')
2130 logger.info('nor will this')
2131 logger.setLevel(LOG_LEVEL)
2132
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002133 @classmethod
2134 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002135 logger = multiprocessing.get_logger()
2136 conn.send(logger.getEffectiveLevel())
2137
2138 def test_level(self):
2139 LEVEL1 = 32
2140 LEVEL2 = 37
2141
2142 logger = multiprocessing.get_logger()
2143 root_logger = logging.getLogger()
2144 root_level = root_logger.level
2145
2146 reader, writer = multiprocessing.Pipe(duplex=False)
2147
2148 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002149 p = self.Process(target=self._test_level, args=(writer,))
2150 p.daemon = True
2151 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002152 self.assertEqual(LEVEL1, reader.recv())
2153
2154 logger.setLevel(logging.NOTSET)
2155 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002156 p = self.Process(target=self._test_level, args=(writer,))
2157 p.daemon = True
2158 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002159 self.assertEqual(LEVEL2, reader.recv())
2160
2161 root_logger.setLevel(root_level)
2162 logger.setLevel(level=LOG_LEVEL)
2163
Jesse Noller814d02d2009-11-21 14:38:23 +00002164
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002165# class _TestLoggingProcessName(BaseTestCase):
2166#
2167# def handle(self, record):
2168# assert record.processName == multiprocessing.current_process().name
2169# self.__handled = True
2170#
2171# def test_logging(self):
2172# handler = logging.Handler()
2173# handler.handle = self.handle
2174# self.__handled = False
2175# # Bypass getLogger() and side-effects
2176# logger = logging.getLoggerClass()(
2177# 'multiprocessing.test.TestLoggingProcessName')
2178# logger.addHandler(handler)
2179# logger.propagate = False
2180#
2181# logger.warn('foo')
2182# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002183
Benjamin Petersondfd79492008-06-13 19:13:39 +00002184#
Richard Oudkerkba482642013-02-26 12:37:07 +00002185# Check that Process.join() retries if os.waitpid() fails with EINTR
2186#
2187
2188class _TestPollEintr(BaseTestCase):
2189
2190 ALLOWED_TYPES = ('processes',)
2191
2192 @classmethod
2193 def _killer(cls, pid):
2194 time.sleep(0.5)
2195 os.kill(pid, signal.SIGUSR1)
2196
2197 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2198 def test_poll_eintr(self):
2199 got_signal = [False]
2200 def record(*args):
2201 got_signal[0] = True
2202 pid = os.getpid()
2203 oldhandler = signal.signal(signal.SIGUSR1, record)
2204 try:
2205 killer = self.Process(target=self._killer, args=(pid,))
2206 killer.start()
2207 p = self.Process(target=time.sleep, args=(1,))
2208 p.start()
2209 p.join()
2210 self.assertTrue(got_signal[0])
2211 self.assertEqual(p.exitcode, 0)
2212 killer.join()
2213 finally:
2214 signal.signal(signal.SIGUSR1, oldhandler)
2215
2216#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002217# Test to verify handle verification, see issue 3321
2218#
2219
2220class TestInvalidHandle(unittest.TestCase):
2221
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002222 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002223 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002224 conn = _multiprocessing.Connection(44977608)
2225 self.assertRaises(IOError, conn.poll)
2226 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002227
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002228#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002229# Functions used to create test cases from the base ones in this module
2230#
2231
2232def get_attributes(Source, names):
2233 d = {}
2234 for name in names:
2235 obj = getattr(Source, name)
2236 if type(obj) == type(get_attributes):
2237 obj = staticmethod(obj)
2238 d[name] = obj
2239 return d
2240
2241def create_test_cases(Mixin, type):
2242 result = {}
2243 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002244 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002245
2246 for name in glob.keys():
2247 if name.startswith('_Test'):
2248 base = glob[name]
2249 if type in base.ALLOWED_TYPES:
2250 newname = 'With' + Type + name[1:]
2251 class Temp(base, unittest.TestCase, Mixin):
2252 pass
2253 result[newname] = Temp
2254 Temp.__name__ = newname
2255 Temp.__module__ = Mixin.__module__
2256 return result
2257
2258#
2259# Create test cases
2260#
2261
2262class ProcessesMixin(object):
2263 TYPE = 'processes'
2264 Process = multiprocessing.Process
2265 locals().update(get_attributes(multiprocessing, (
2266 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2267 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2268 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002269 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002270 )))
2271
2272testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2273globals().update(testcases_processes)
2274
2275
2276class ManagerMixin(object):
2277 TYPE = 'manager'
2278 Process = multiprocessing.Process
2279 manager = object.__new__(multiprocessing.managers.SyncManager)
2280 locals().update(get_attributes(manager, (
2281 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2282 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002283 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002284 )))
2285
2286testcases_manager = create_test_cases(ManagerMixin, type='manager')
2287globals().update(testcases_manager)
2288
2289
2290class ThreadsMixin(object):
2291 TYPE = 'threads'
2292 Process = multiprocessing.dummy.Process
2293 locals().update(get_attributes(multiprocessing.dummy, (
2294 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2295 'Condition', 'Event', 'Value', 'Array', 'current_process',
2296 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002297 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002298 )))
2299
2300testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2301globals().update(testcases_threads)
2302
Neal Norwitz0c519b32008-08-25 01:50:24 +00002303class OtherTest(unittest.TestCase):
2304 # TODO: add more tests for deliver/answer challenge.
2305 def test_deliver_challenge_auth_failure(self):
2306 class _FakeConnection(object):
2307 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002308 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002309 def send_bytes(self, data):
2310 pass
2311 self.assertRaises(multiprocessing.AuthenticationError,
2312 multiprocessing.connection.deliver_challenge,
2313 _FakeConnection(), b'abc')
2314
2315 def test_answer_challenge_auth_failure(self):
2316 class _FakeConnection(object):
2317 def __init__(self):
2318 self.count = 0
2319 def recv_bytes(self, size):
2320 self.count += 1
2321 if self.count == 1:
2322 return multiprocessing.connection.CHALLENGE
2323 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002324 return b'something bogus'
2325 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002326 def send_bytes(self, data):
2327 pass
2328 self.assertRaises(multiprocessing.AuthenticationError,
2329 multiprocessing.connection.answer_challenge,
2330 _FakeConnection(), b'abc')
2331
Jesse Noller7152f6d2009-04-02 05:17:26 +00002332#
2333# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2334#
2335
2336def initializer(ns):
2337 ns.test += 1
2338
2339class TestInitializers(unittest.TestCase):
2340 def setUp(self):
2341 self.mgr = multiprocessing.Manager()
2342 self.ns = self.mgr.Namespace()
2343 self.ns.test = 0
2344
2345 def tearDown(self):
2346 self.mgr.shutdown()
2347
2348 def test_manager_initializer(self):
2349 m = multiprocessing.managers.SyncManager()
2350 self.assertRaises(TypeError, m.start, 1)
2351 m.start(initializer, (self.ns,))
2352 self.assertEqual(self.ns.test, 1)
2353 m.shutdown()
2354
2355 def test_pool_initializer(self):
2356 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2357 p = multiprocessing.Pool(1, initializer, (self.ns,))
2358 p.close()
2359 p.join()
2360 self.assertEqual(self.ns.test, 1)
2361
Jesse Noller1b90efb2009-06-30 17:11:52 +00002362#
2363# Issue 5155, 5313, 5331: Test process in processes
2364# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2365#
2366
Richard Oudkerkc5496072013-09-29 17:10:40 +01002367def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002368 try:
2369 item = q.get(block=False)
2370 except Queue.Empty:
2371 pass
2372
Richard Oudkerkc5496072013-09-29 17:10:40 +01002373def _test_process(q):
2374 queue = multiprocessing.Queue()
2375 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2376 subProc.daemon = True
2377 subProc.start()
2378 subProc.join()
2379
Jesse Noller1b90efb2009-06-30 17:11:52 +00002380def _afunc(x):
2381 return x*x
2382
2383def pool_in_process():
2384 pool = multiprocessing.Pool(processes=4)
2385 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2386
2387class _file_like(object):
2388 def __init__(self, delegate):
2389 self._delegate = delegate
2390 self._pid = None
2391
2392 @property
2393 def cache(self):
2394 pid = os.getpid()
2395 # There are no race conditions since fork keeps only the running thread
2396 if pid != self._pid:
2397 self._pid = pid
2398 self._cache = []
2399 return self._cache
2400
2401 def write(self, data):
2402 self.cache.append(data)
2403
2404 def flush(self):
2405 self._delegate.write(''.join(self.cache))
2406 self._cache = []
2407
2408class TestStdinBadfiledescriptor(unittest.TestCase):
2409
2410 def test_queue_in_process(self):
2411 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002412 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002413 proc.start()
2414 proc.join()
2415
2416 def test_pool_in_process(self):
2417 p = multiprocessing.Process(target=pool_in_process)
2418 p.start()
2419 p.join()
2420
2421 def test_flushing(self):
2422 sio = StringIO()
2423 flike = _file_like(sio)
2424 flike.write('foo')
2425 proc = multiprocessing.Process(target=lambda: flike.flush())
2426 flike.flush()
2427 assert sio.getvalue() == 'foo'
2428
Richard Oudkerke4b99382012-07-27 14:05:46 +01002429#
2430# Test interaction with socket timeouts - see Issue #6056
2431#
2432
2433class TestTimeouts(unittest.TestCase):
2434 @classmethod
2435 def _test_timeout(cls, child, address):
2436 time.sleep(1)
2437 child.send(123)
2438 child.close()
2439 conn = multiprocessing.connection.Client(address)
2440 conn.send(456)
2441 conn.close()
2442
2443 def test_timeout(self):
2444 old_timeout = socket.getdefaulttimeout()
2445 try:
2446 socket.setdefaulttimeout(0.1)
2447 parent, child = multiprocessing.Pipe(duplex=True)
2448 l = multiprocessing.connection.Listener(family='AF_INET')
2449 p = multiprocessing.Process(target=self._test_timeout,
2450 args=(child, l.address))
2451 p.start()
2452 child.close()
2453 self.assertEqual(parent.recv(), 123)
2454 parent.close()
2455 conn = l.accept()
2456 self.assertEqual(conn.recv(), 456)
2457 conn.close()
2458 l.close()
2459 p.join(10)
2460 finally:
2461 socket.setdefaulttimeout(old_timeout)
2462
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002463#
2464# Test what happens with no "if __name__ == '__main__'"
2465#
2466
2467class TestNoForkBomb(unittest.TestCase):
2468 def test_noforkbomb(self):
2469 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2470 if WIN32:
2471 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002472 self.assertEqual(out, '')
2473 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002474 else:
2475 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002476 self.assertEqual(out.rstrip(), '123')
2477 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002478
2479#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002480# Issue 12098: check sys.flags of child matches that for parent
2481#
2482
2483class TestFlags(unittest.TestCase):
2484 @classmethod
2485 def run_in_grandchild(cls, conn):
2486 conn.send(tuple(sys.flags))
2487
2488 @classmethod
2489 def run_in_child(cls):
2490 import json
2491 r, w = multiprocessing.Pipe(duplex=False)
2492 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2493 p.start()
2494 grandchild_flags = r.recv()
2495 p.join()
2496 r.close()
2497 w.close()
2498 flags = (tuple(sys.flags), grandchild_flags)
2499 print(json.dumps(flags))
2500
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002501 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002502 def test_flags(self):
2503 import json, subprocess
2504 # start child process using unusual flags
2505 prog = ('from test.test_multiprocessing import TestFlags; ' +
2506 'TestFlags.run_in_child()')
2507 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002508 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002509 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2510 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002511
2512#
2513# Issue #17555: ForkAwareThreadLock
2514#
2515
2516class TestForkAwareThreadLock(unittest.TestCase):
2517 # We recurisvely start processes. Issue #17555 meant that the
2518 # after fork registry would get duplicate entries for the same
2519 # lock. The size of the registry at generation n was ~2**n.
2520
2521 @classmethod
2522 def child(cls, n, conn):
2523 if n > 1:
2524 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2525 p.start()
2526 p.join()
2527 else:
2528 conn.send(len(util._afterfork_registry))
2529 conn.close()
2530
2531 def test_lock(self):
2532 r, w = multiprocessing.Pipe(False)
2533 l = util.ForkAwareThreadLock()
2534 old_size = len(util._afterfork_registry)
2535 p = multiprocessing.Process(target=self.child, args=(5, w))
2536 p.start()
2537 new_size = r.recv()
2538 p.join()
2539 self.assertLessEqual(new_size, old_size)
2540
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002541#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002542# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2543#
2544
2545class TestIgnoreEINTR(unittest.TestCase):
2546
2547 @classmethod
2548 def _test_ignore(cls, conn):
2549 def handler(signum, frame):
2550 pass
2551 signal.signal(signal.SIGUSR1, handler)
2552 conn.send('ready')
2553 x = conn.recv()
2554 conn.send(x)
2555 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2556
2557 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2558 def test_ignore(self):
2559 conn, child_conn = multiprocessing.Pipe()
2560 try:
2561 p = multiprocessing.Process(target=self._test_ignore,
2562 args=(child_conn,))
2563 p.daemon = True
2564 p.start()
2565 child_conn.close()
2566 self.assertEqual(conn.recv(), 'ready')
2567 time.sleep(0.1)
2568 os.kill(p.pid, signal.SIGUSR1)
2569 time.sleep(0.1)
2570 conn.send(1234)
2571 self.assertEqual(conn.recv(), 1234)
2572 time.sleep(0.1)
2573 os.kill(p.pid, signal.SIGUSR1)
2574 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2575 time.sleep(0.1)
2576 p.join()
2577 finally:
2578 conn.close()
2579
2580 @classmethod
2581 def _test_ignore_listener(cls, conn):
2582 def handler(signum, frame):
2583 pass
2584 signal.signal(signal.SIGUSR1, handler)
2585 l = multiprocessing.connection.Listener()
2586 conn.send(l.address)
2587 a = l.accept()
2588 a.send('welcome')
2589
2590 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2591 def test_ignore_listener(self):
2592 conn, child_conn = multiprocessing.Pipe()
2593 try:
2594 p = multiprocessing.Process(target=self._test_ignore_listener,
2595 args=(child_conn,))
2596 p.daemon = True
2597 p.start()
2598 child_conn.close()
2599 address = conn.recv()
2600 time.sleep(0.1)
2601 os.kill(p.pid, signal.SIGUSR1)
2602 time.sleep(0.1)
2603 client = multiprocessing.connection.Client(address)
2604 self.assertEqual(client.recv(), 'welcome')
2605 p.join()
2606 finally:
2607 conn.close()
2608
2609#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002610#
2611#
2612
Jesse Noller1b90efb2009-06-30 17:11:52 +00002613testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002614 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002615 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002616
Benjamin Petersondfd79492008-06-13 19:13:39 +00002617#
2618#
2619#
2620
2621def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002622 if sys.platform.startswith("linux"):
2623 try:
2624 lock = multiprocessing.RLock()
2625 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002626 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002627
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002628 check_enough_semaphores()
2629
Benjamin Petersondfd79492008-06-13 19:13:39 +00002630 if run is None:
2631 from test.test_support import run_unittest as run
2632
2633 util.get_temp_dir() # creates temp directory for use by all processes
2634
2635 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2636
Jesse Noller146b7ab2008-07-02 16:44:09 +00002637 ProcessesMixin.pool = multiprocessing.Pool(4)
2638 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2639 ManagerMixin.manager.__init__()
2640 ManagerMixin.manager.start()
2641 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002642
2643 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002644 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2645 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002646 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2647 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002648 )
2649
2650 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2651 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002652 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2653 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002654 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002655 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002656 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002657 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2658 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2659 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002660 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002661
Jesse Noller146b7ab2008-07-02 16:44:09 +00002662 ThreadsMixin.pool.terminate()
2663 ProcessesMixin.pool.terminate()
2664 ManagerMixin.pool.terminate()
2665 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002666
Jesse Noller146b7ab2008-07-02 16:44:09 +00002667 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002668
2669def main():
2670 test_main(unittest.TextTestRunner(verbosity=2).run)
2671
2672if __name__ == '__main__':
2673 main()