blob: b1e75b566325b07cac458ccd2983a45d5f890c43 [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010017import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020021# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000022# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
Charles-François Natalif8413b22011-09-21 18:44:49 +020035from multiprocessing import util
36
37try:
38 from multiprocessing import reduction
39 HAS_REDUCTION = True
40except ImportError:
41 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Brian Curtina06e9b82010-10-07 02:27:41 +000043try:
44 from multiprocessing.sharedctypes import Value, copy
45 HAS_SHAREDCTYPES = True
46except ImportError:
47 HAS_SHAREDCTYPES = False
48
Antoine Pitroua1a8da82011-08-23 19:54:20 +020049try:
50 import msvcrt
51except ImportError:
52 msvcrt = None
53
Benjamin Petersondfd79492008-06-13 19:13:39 +000054#
55#
56#
57
Benjamin Petersone79edf52008-07-13 18:34:58 +000058latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000059
Benjamin Petersondfd79492008-06-13 19:13:39 +000060#
61# Constants
62#
63
64LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000065#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000066
67DELTA = 0.1
68CHECK_TIMINGS = False # making true makes tests take a lot longer
69 # and can sometimes cause some non-serious
70 # failures because some calls block a bit
71 # longer than expected
72if CHECK_TIMINGS:
73 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
74else:
75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
76
77HAVE_GETVALUE = not getattr(_multiprocessing,
78 'HAVE_BROKEN_SEM_GETVALUE', False)
79
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000080WIN32 = (sys.platform == "win32")
81
Antoine Pitroua1a8da82011-08-23 19:54:20 +020082try:
83 MAXFD = os.sysconf("SC_OPEN_MAX")
84except:
85 MAXFD = 256
86
Benjamin Petersondfd79492008-06-13 19:13:39 +000087#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000088# Some tests require ctypes
89#
90
91try:
Nick Coghlan13623662010-04-10 14:24:36 +000092 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000093except ImportError:
94 Structure = object
95 c_int = c_double = None
96
Charles-François Natali6392d7f2011-11-22 18:35:18 +010097
98def check_enough_semaphores():
99 """Check that the system supports enough semaphores to run the test."""
100 # minimum number of semaphores available according to POSIX
101 nsems_min = 256
102 try:
103 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
104 except (AttributeError, ValueError):
105 # sysconf not available or setting not available
106 return
107 if nsems == -1 or nsems >= nsems_min:
108 return
109 raise unittest.SkipTest("The OS doesn't support enough semaphores "
110 "to run the test (required: %d)." % nsems_min)
111
112
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000113#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000114# Creates a wrapper for a function which records the time it takes to finish
115#
116
117class TimingWrapper(object):
118
119 def __init__(self, func):
120 self.func = func
121 self.elapsed = None
122
123 def __call__(self, *args, **kwds):
124 t = time.time()
125 try:
126 return self.func(*args, **kwds)
127 finally:
128 self.elapsed = time.time() - t
129
130#
131# Base class for test cases
132#
133
134class BaseTestCase(object):
135
136 ALLOWED_TYPES = ('processes', 'manager', 'threads')
137
138 def assertTimingAlmostEqual(self, a, b):
139 if CHECK_TIMINGS:
140 self.assertAlmostEqual(a, b, 1)
141
142 def assertReturnsIfImplemented(self, value, func, *args):
143 try:
144 res = func(*args)
145 except NotImplementedError:
146 pass
147 else:
148 return self.assertEqual(value, res)
149
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000150 # For the sanity of Windows users, rather than crashing or freezing in
151 # multiple ways.
152 def __reduce__(self, *args):
153 raise NotImplementedError("shouldn't try to pickle a test case")
154
155 __reduce_ex__ = __reduce__
156
Benjamin Petersondfd79492008-06-13 19:13:39 +0000157#
158# Return the value of a semaphore
159#
160
161def get_value(self):
162 try:
163 return self.get_value()
164 except AttributeError:
165 try:
166 return self._Semaphore__value
167 except AttributeError:
168 try:
169 return self._value
170 except AttributeError:
171 raise NotImplementedError
172
173#
174# Testcases
175#
176
177class _TestProcess(BaseTestCase):
178
179 ALLOWED_TYPES = ('processes', 'threads')
180
181 def test_current(self):
182 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600183 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184
185 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000186 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000187
188 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000189 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000190 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000191 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000192 self.assertEqual(current.ident, os.getpid())
193 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000195 @classmethod
196 def _test(cls, q, *args, **kwds):
197 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198 q.put(args)
199 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000201 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203 q.put(current.pid)
204
205 def test_process(self):
206 q = self.Queue(1)
207 e = self.Event()
208 args = (q, 1, 2)
209 kwargs = {'hello':23, 'bye':2.54}
210 name = 'SomeProcess'
211 p = self.Process(
212 target=self._test, args=args, kwargs=kwargs, name=name
213 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000214 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215 current = self.current_process()
216
217 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000218 self.assertEqual(p.authkey, current.authkey)
219 self.assertEqual(p.is_alive(), False)
220 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000221 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000223 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000224
225 p.start()
226
Ezio Melotti2623a372010-11-21 13:34:58 +0000227 self.assertEqual(p.exitcode, None)
228 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000229 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000230
Ezio Melotti2623a372010-11-21 13:34:58 +0000231 self.assertEqual(q.get(), args[1:])
232 self.assertEqual(q.get(), kwargs)
233 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000234 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000235 self.assertEqual(q.get(), current.authkey)
236 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000237
238 p.join()
239
Ezio Melotti2623a372010-11-21 13:34:58 +0000240 self.assertEqual(p.exitcode, 0)
241 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000242 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000243
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000244 @classmethod
245 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000246 time.sleep(1000)
247
248 def test_terminate(self):
249 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600250 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000251
252 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000253 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000254 p.start()
255
256 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000258 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000259
260 p.terminate()
261
262 join = TimingWrapper(p.join)
263 self.assertEqual(join(), None)
264 self.assertTimingAlmostEqual(join.elapsed, 0.0)
265
266 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000267 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000268
269 p.join()
270
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000271 # XXX sometimes get p.exitcode == 0 on Windows ...
272 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000273
274 def test_cpu_count(self):
275 try:
276 cpus = multiprocessing.cpu_count()
277 except NotImplementedError:
278 cpus = 1
279 self.assertTrue(type(cpus) is int)
280 self.assertTrue(cpus >= 1)
281
282 def test_active_children(self):
283 self.assertEqual(type(self.active_children()), list)
284
285 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000286 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000287
Jesus Cea6f6016b2011-09-09 20:26:57 +0200288 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000289 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000290 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000291
292 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000294
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000295 @classmethod
296 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000297 from multiprocessing import forking
298 wconn.send(id)
299 if len(id) < 2:
300 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000301 p = cls.Process(
302 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000303 )
304 p.start()
305 p.join()
306
307 def test_recursion(self):
308 rconn, wconn = self.Pipe(duplex=False)
309 self._test_recursion(wconn, [])
310
311 time.sleep(DELTA)
312 result = []
313 while rconn.poll():
314 result.append(rconn.recv())
315
316 expected = [
317 [],
318 [0],
319 [0, 0],
320 [0, 1],
321 [1],
322 [1, 0],
323 [1, 1]
324 ]
325 self.assertEqual(result, expected)
326
Richard Oudkerk2182e052012-06-06 19:01:14 +0100327 @classmethod
328 def _test_sys_exit(cls, reason, testfn):
329 sys.stderr = open(testfn, 'w')
330 sys.exit(reason)
331
332 def test_sys_exit(self):
333 # See Issue 13854
334 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600335 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100336
337 testfn = test_support.TESTFN
338 self.addCleanup(test_support.unlink, testfn)
339
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000340 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100341 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
342 p.daemon = True
343 p.start()
344 p.join(5)
345 self.assertEqual(p.exitcode, code)
346
347 with open(testfn, 'r') as f:
348 self.assertEqual(f.read().rstrip(), str(reason))
349
350 for reason in (True, False, 8):
351 p = self.Process(target=sys.exit, args=(reason,))
352 p.daemon = True
353 p.start()
354 p.join(5)
355 self.assertEqual(p.exitcode, reason)
356
Benjamin Petersondfd79492008-06-13 19:13:39 +0000357#
358#
359#
360
361class _UpperCaser(multiprocessing.Process):
362
363 def __init__(self):
364 multiprocessing.Process.__init__(self)
365 self.child_conn, self.parent_conn = multiprocessing.Pipe()
366
367 def run(self):
368 self.parent_conn.close()
369 for s in iter(self.child_conn.recv, None):
370 self.child_conn.send(s.upper())
371 self.child_conn.close()
372
373 def submit(self, s):
374 assert type(s) is str
375 self.parent_conn.send(s)
376 return self.parent_conn.recv()
377
378 def stop(self):
379 self.parent_conn.send(None)
380 self.parent_conn.close()
381 self.child_conn.close()
382
383class _TestSubclassingProcess(BaseTestCase):
384
385 ALLOWED_TYPES = ('processes',)
386
387 def test_subclassing(self):
388 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200389 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000390 uppercaser.start()
391 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
392 self.assertEqual(uppercaser.submit('world'), 'WORLD')
393 uppercaser.stop()
394 uppercaser.join()
395
396#
397#
398#
399
400def queue_empty(q):
401 if hasattr(q, 'empty'):
402 return q.empty()
403 else:
404 return q.qsize() == 0
405
406def queue_full(q, maxsize):
407 if hasattr(q, 'full'):
408 return q.full()
409 else:
410 return q.qsize() == maxsize
411
412
413class _TestQueue(BaseTestCase):
414
415
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000416 @classmethod
417 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000418 child_can_start.wait()
419 for i in range(6):
420 queue.get()
421 parent_can_continue.set()
422
423 def test_put(self):
424 MAXSIZE = 6
425 queue = self.Queue(maxsize=MAXSIZE)
426 child_can_start = self.Event()
427 parent_can_continue = self.Event()
428
429 proc = self.Process(
430 target=self._test_put,
431 args=(queue, child_can_start, parent_can_continue)
432 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000433 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000434 proc.start()
435
436 self.assertEqual(queue_empty(queue), True)
437 self.assertEqual(queue_full(queue, MAXSIZE), False)
438
439 queue.put(1)
440 queue.put(2, True)
441 queue.put(3, True, None)
442 queue.put(4, False)
443 queue.put(5, False, None)
444 queue.put_nowait(6)
445
446 # the values may be in buffer but not yet in pipe so sleep a bit
447 time.sleep(DELTA)
448
449 self.assertEqual(queue_empty(queue), False)
450 self.assertEqual(queue_full(queue, MAXSIZE), True)
451
452 put = TimingWrapper(queue.put)
453 put_nowait = TimingWrapper(queue.put_nowait)
454
455 self.assertRaises(Queue.Full, put, 7, False)
456 self.assertTimingAlmostEqual(put.elapsed, 0)
457
458 self.assertRaises(Queue.Full, put, 7, False, None)
459 self.assertTimingAlmostEqual(put.elapsed, 0)
460
461 self.assertRaises(Queue.Full, put_nowait, 7)
462 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
463
464 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
465 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
466
467 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
468 self.assertTimingAlmostEqual(put.elapsed, 0)
469
470 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
471 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
472
473 child_can_start.set()
474 parent_can_continue.wait()
475
476 self.assertEqual(queue_empty(queue), True)
477 self.assertEqual(queue_full(queue, MAXSIZE), False)
478
479 proc.join()
480
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000481 @classmethod
482 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000483 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000484 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000485 queue.put(2)
486 queue.put(3)
487 queue.put(4)
488 queue.put(5)
489 parent_can_continue.set()
490
491 def test_get(self):
492 queue = self.Queue()
493 child_can_start = self.Event()
494 parent_can_continue = self.Event()
495
496 proc = self.Process(
497 target=self._test_get,
498 args=(queue, child_can_start, parent_can_continue)
499 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000500 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000501 proc.start()
502
503 self.assertEqual(queue_empty(queue), True)
504
505 child_can_start.set()
506 parent_can_continue.wait()
507
508 time.sleep(DELTA)
509 self.assertEqual(queue_empty(queue), False)
510
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000511 # Hangs unexpectedly, remove for now
512 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000513 self.assertEqual(queue.get(True, None), 2)
514 self.assertEqual(queue.get(True), 3)
515 self.assertEqual(queue.get(timeout=1), 4)
516 self.assertEqual(queue.get_nowait(), 5)
517
518 self.assertEqual(queue_empty(queue), True)
519
520 get = TimingWrapper(queue.get)
521 get_nowait = TimingWrapper(queue.get_nowait)
522
523 self.assertRaises(Queue.Empty, get, False)
524 self.assertTimingAlmostEqual(get.elapsed, 0)
525
526 self.assertRaises(Queue.Empty, get, False, None)
527 self.assertTimingAlmostEqual(get.elapsed, 0)
528
529 self.assertRaises(Queue.Empty, get_nowait)
530 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
531
532 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
533 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
534
535 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
536 self.assertTimingAlmostEqual(get.elapsed, 0)
537
538 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
539 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
540
541 proc.join()
542
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000543 @classmethod
544 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000545 for i in range(10, 20):
546 queue.put(i)
547 # note that at this point the items may only be buffered, so the
548 # process cannot shutdown until the feeder thread has finished
549 # pushing items onto the pipe.
550
551 def test_fork(self):
552 # Old versions of Queue would fail to create a new feeder
553 # thread for a forked process if the original process had its
554 # own feeder thread. This test checks that this no longer
555 # happens.
556
557 queue = self.Queue()
558
559 # put items on queue so that main process starts a feeder thread
560 for i in range(10):
561 queue.put(i)
562
563 # wait to make sure thread starts before we fork a new process
564 time.sleep(DELTA)
565
566 # fork process
567 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200568 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000569 p.start()
570
571 # check that all expected items are in the queue
572 for i in range(20):
573 self.assertEqual(queue.get(), i)
574 self.assertRaises(Queue.Empty, queue.get, False)
575
576 p.join()
577
578 def test_qsize(self):
579 q = self.Queue()
580 try:
581 self.assertEqual(q.qsize(), 0)
582 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600583 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584 q.put(1)
585 self.assertEqual(q.qsize(), 1)
586 q.put(5)
587 self.assertEqual(q.qsize(), 2)
588 q.get()
589 self.assertEqual(q.qsize(), 1)
590 q.get()
591 self.assertEqual(q.qsize(), 0)
592
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000593 @classmethod
594 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000595 for obj in iter(q.get, None):
596 time.sleep(DELTA)
597 q.task_done()
598
599 def test_task_done(self):
600 queue = self.JoinableQueue()
601
602 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000603 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000604
605 workers = [self.Process(target=self._test_task_done, args=(queue,))
606 for i in xrange(4)]
607
608 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200609 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000610 p.start()
611
612 for i in xrange(10):
613 queue.put(i)
614
615 queue.join()
616
617 for p in workers:
618 queue.put(None)
619
620 for p in workers:
621 p.join()
622
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200623 def test_no_import_lock_contention(self):
624 with test_support.temp_cwd():
625 module_name = 'imported_by_an_imported_module'
626 with open(module_name + '.py', 'w') as f:
627 f.write("""if 1:
628 import multiprocessing
629
630 q = multiprocessing.Queue()
631 q.put('knock knock')
632 q.get(timeout=3)
633 q.close()
634 """)
635
636 with test_support.DirsOnSysPath(os.getcwd()):
637 try:
638 __import__(module_name)
639 except Queue.Empty:
640 self.fail("Probable regression on import lock contention;"
641 " see Issue #22853")
642
Benjamin Petersondfd79492008-06-13 19:13:39 +0000643#
644#
645#
646
647class _TestLock(BaseTestCase):
648
649 def test_lock(self):
650 lock = self.Lock()
651 self.assertEqual(lock.acquire(), True)
652 self.assertEqual(lock.acquire(False), False)
653 self.assertEqual(lock.release(), None)
654 self.assertRaises((ValueError, threading.ThreadError), lock.release)
655
656 def test_rlock(self):
657 lock = self.RLock()
658 self.assertEqual(lock.acquire(), True)
659 self.assertEqual(lock.acquire(), True)
660 self.assertEqual(lock.acquire(), True)
661 self.assertEqual(lock.release(), None)
662 self.assertEqual(lock.release(), None)
663 self.assertEqual(lock.release(), None)
664 self.assertRaises((AssertionError, RuntimeError), lock.release)
665
Jesse Noller82eb5902009-03-30 23:29:31 +0000666 def test_lock_context(self):
667 with self.Lock():
668 pass
669
Benjamin Petersondfd79492008-06-13 19:13:39 +0000670
671class _TestSemaphore(BaseTestCase):
672
673 def _test_semaphore(self, sem):
674 self.assertReturnsIfImplemented(2, get_value, sem)
675 self.assertEqual(sem.acquire(), True)
676 self.assertReturnsIfImplemented(1, get_value, sem)
677 self.assertEqual(sem.acquire(), True)
678 self.assertReturnsIfImplemented(0, get_value, sem)
679 self.assertEqual(sem.acquire(False), False)
680 self.assertReturnsIfImplemented(0, get_value, sem)
681 self.assertEqual(sem.release(), None)
682 self.assertReturnsIfImplemented(1, get_value, sem)
683 self.assertEqual(sem.release(), None)
684 self.assertReturnsIfImplemented(2, get_value, sem)
685
686 def test_semaphore(self):
687 sem = self.Semaphore(2)
688 self._test_semaphore(sem)
689 self.assertEqual(sem.release(), None)
690 self.assertReturnsIfImplemented(3, get_value, sem)
691 self.assertEqual(sem.release(), None)
692 self.assertReturnsIfImplemented(4, get_value, sem)
693
694 def test_bounded_semaphore(self):
695 sem = self.BoundedSemaphore(2)
696 self._test_semaphore(sem)
697 # Currently fails on OS/X
698 #if HAVE_GETVALUE:
699 # self.assertRaises(ValueError, sem.release)
700 # self.assertReturnsIfImplemented(2, get_value, sem)
701
702 def test_timeout(self):
703 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600704 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000705
706 sem = self.Semaphore(0)
707 acquire = TimingWrapper(sem.acquire)
708
709 self.assertEqual(acquire(False), False)
710 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
711
712 self.assertEqual(acquire(False, None), False)
713 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
714
715 self.assertEqual(acquire(False, TIMEOUT1), False)
716 self.assertTimingAlmostEqual(acquire.elapsed, 0)
717
718 self.assertEqual(acquire(True, TIMEOUT2), False)
719 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
720
721 self.assertEqual(acquire(timeout=TIMEOUT3), False)
722 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
723
724
725class _TestCondition(BaseTestCase):
726
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000727 @classmethod
728 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000729 cond.acquire()
730 sleeping.release()
731 cond.wait(timeout)
732 woken.release()
733 cond.release()
734
735 def check_invariant(self, cond):
736 # this is only supposed to succeed when there are no sleepers
737 if self.TYPE == 'processes':
738 try:
739 sleepers = (cond._sleeping_count.get_value() -
740 cond._woken_count.get_value())
741 self.assertEqual(sleepers, 0)
742 self.assertEqual(cond._wait_semaphore.get_value(), 0)
743 except NotImplementedError:
744 pass
745
746 def test_notify(self):
747 cond = self.Condition()
748 sleeping = self.Semaphore(0)
749 woken = self.Semaphore(0)
750
751 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000752 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000753 p.start()
754
755 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000756 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000757 p.start()
758
759 # wait for both children to start sleeping
760 sleeping.acquire()
761 sleeping.acquire()
762
763 # check no process/thread has woken up
764 time.sleep(DELTA)
765 self.assertReturnsIfImplemented(0, get_value, woken)
766
767 # wake up one process/thread
768 cond.acquire()
769 cond.notify()
770 cond.release()
771
772 # check one process/thread has woken up
773 time.sleep(DELTA)
774 self.assertReturnsIfImplemented(1, get_value, woken)
775
776 # wake up another
777 cond.acquire()
778 cond.notify()
779 cond.release()
780
781 # check other has woken up
782 time.sleep(DELTA)
783 self.assertReturnsIfImplemented(2, get_value, woken)
784
785 # check state is not mucked up
786 self.check_invariant(cond)
787 p.join()
788
789 def test_notify_all(self):
790 cond = self.Condition()
791 sleeping = self.Semaphore(0)
792 woken = self.Semaphore(0)
793
794 # start some threads/processes which will timeout
795 for i in range(3):
796 p = self.Process(target=self.f,
797 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000798 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 p.start()
800
801 t = threading.Thread(target=self.f,
802 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000803 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000804 t.start()
805
806 # wait for them all to sleep
807 for i in xrange(6):
808 sleeping.acquire()
809
810 # check they have all timed out
811 for i in xrange(6):
812 woken.acquire()
813 self.assertReturnsIfImplemented(0, get_value, woken)
814
815 # check state is not mucked up
816 self.check_invariant(cond)
817
818 # start some more threads/processes
819 for i in range(3):
820 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000821 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000822 p.start()
823
824 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000825 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000826 t.start()
827
828 # wait for them to all sleep
829 for i in xrange(6):
830 sleeping.acquire()
831
832 # check no process/thread has woken up
833 time.sleep(DELTA)
834 self.assertReturnsIfImplemented(0, get_value, woken)
835
836 # wake them all up
837 cond.acquire()
838 cond.notify_all()
839 cond.release()
840
841 # check they have all woken
842 time.sleep(DELTA)
843 self.assertReturnsIfImplemented(6, get_value, woken)
844
845 # check state is not mucked up
846 self.check_invariant(cond)
847
848 def test_timeout(self):
849 cond = self.Condition()
850 wait = TimingWrapper(cond.wait)
851 cond.acquire()
852 res = wait(TIMEOUT1)
853 cond.release()
854 self.assertEqual(res, None)
855 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
856
857
858class _TestEvent(BaseTestCase):
859
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000860 @classmethod
861 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000862 time.sleep(TIMEOUT2)
863 event.set()
864
865 def test_event(self):
866 event = self.Event()
867 wait = TimingWrapper(event.wait)
868
Ezio Melottic2077b02011-03-16 12:34:31 +0200869 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000870 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000871 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000872
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000873 # Removed, threading.Event.wait() will return the value of the __flag
874 # instead of None. API Shear with the semaphore backed mp.Event
875 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000876 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000877 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000878 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
879
880 event.set()
881
882 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000883 self.assertEqual(event.is_set(), True)
884 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000885 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000886 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000887 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
888 # self.assertEqual(event.is_set(), True)
889
890 event.clear()
891
892 #self.assertEqual(event.is_set(), False)
893
Jesus Cea6f6016b2011-09-09 20:26:57 +0200894 p = self.Process(target=self._test_event, args=(event,))
895 p.daemon = True
896 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000897 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000898
899#
900#
901#
902
903class _TestValue(BaseTestCase):
904
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000905 ALLOWED_TYPES = ('processes',)
906
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 codes_values = [
908 ('i', 4343, 24234),
909 ('d', 3.625, -4.25),
910 ('h', -232, 234),
911 ('c', latin('x'), latin('y'))
912 ]
913
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000914 def setUp(self):
915 if not HAS_SHAREDCTYPES:
916 self.skipTest("requires multiprocessing.sharedctypes")
917
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000918 @classmethod
919 def _test(cls, values):
920 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000921 sv.value = cv[2]
922
923
924 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000925 if raw:
926 values = [self.RawValue(code, value)
927 for code, value, _ in self.codes_values]
928 else:
929 values = [self.Value(code, value)
930 for code, value, _ in self.codes_values]
931
932 for sv, cv in zip(values, self.codes_values):
933 self.assertEqual(sv.value, cv[1])
934
935 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200936 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000937 proc.start()
938 proc.join()
939
940 for sv, cv in zip(values, self.codes_values):
941 self.assertEqual(sv.value, cv[2])
942
943 def test_rawvalue(self):
944 self.test_value(raw=True)
945
946 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000947 val1 = self.Value('i', 5)
948 lock1 = val1.get_lock()
949 obj1 = val1.get_obj()
950
951 val2 = self.Value('i', 5, lock=None)
952 lock2 = val2.get_lock()
953 obj2 = val2.get_obj()
954
955 lock = self.Lock()
956 val3 = self.Value('i', 5, lock=lock)
957 lock3 = val3.get_lock()
958 obj3 = val3.get_obj()
959 self.assertEqual(lock, lock3)
960
Jesse Noller6ab22152009-01-18 02:45:38 +0000961 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000962 self.assertFalse(hasattr(arr4, 'get_lock'))
963 self.assertFalse(hasattr(arr4, 'get_obj'))
964
Jesse Noller6ab22152009-01-18 02:45:38 +0000965 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
966
967 arr5 = self.RawValue('i', 5)
968 self.assertFalse(hasattr(arr5, 'get_lock'))
969 self.assertFalse(hasattr(arr5, 'get_obj'))
970
Benjamin Petersondfd79492008-06-13 19:13:39 +0000971
972class _TestArray(BaseTestCase):
973
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000974 ALLOWED_TYPES = ('processes',)
975
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000976 @classmethod
977 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000978 for i in range(1, len(seq)):
979 seq[i] += seq[i-1]
980
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000981 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000982 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000983 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
984 if raw:
985 arr = self.RawArray('i', seq)
986 else:
987 arr = self.Array('i', seq)
988
989 self.assertEqual(len(arr), len(seq))
990 self.assertEqual(arr[3], seq[3])
991 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
992
993 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
994
995 self.assertEqual(list(arr[:]), seq)
996
997 self.f(seq)
998
999 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001000 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001001 p.start()
1002 p.join()
1003
1004 self.assertEqual(list(arr[:]), seq)
1005
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001006 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001007 def test_array_from_size(self):
1008 size = 10
1009 # Test for zeroing (see issue #11675).
1010 # The repetition below strengthens the test by increasing the chances
1011 # of previously allocated non-zero memory being used for the new array
1012 # on the 2nd and 3rd loops.
1013 for _ in range(3):
1014 arr = self.Array('i', size)
1015 self.assertEqual(len(arr), size)
1016 self.assertEqual(list(arr), [0] * size)
1017 arr[:] = range(10)
1018 self.assertEqual(list(arr), range(10))
1019 del arr
1020
1021 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001022 def test_rawarray(self):
1023 self.test_array(raw=True)
1024
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001025 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001026 def test_array_accepts_long(self):
1027 arr = self.Array('i', 10L)
1028 self.assertEqual(len(arr), 10)
1029 raw_arr = self.RawArray('i', 10L)
1030 self.assertEqual(len(raw_arr), 10)
1031
1032 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001033 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001034 arr1 = self.Array('i', range(10))
1035 lock1 = arr1.get_lock()
1036 obj1 = arr1.get_obj()
1037
1038 arr2 = self.Array('i', range(10), lock=None)
1039 lock2 = arr2.get_lock()
1040 obj2 = arr2.get_obj()
1041
1042 lock = self.Lock()
1043 arr3 = self.Array('i', range(10), lock=lock)
1044 lock3 = arr3.get_lock()
1045 obj3 = arr3.get_obj()
1046 self.assertEqual(lock, lock3)
1047
Jesse Noller6ab22152009-01-18 02:45:38 +00001048 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001049 self.assertFalse(hasattr(arr4, 'get_lock'))
1050 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001051 self.assertRaises(AttributeError,
1052 self.Array, 'i', range(10), lock='notalock')
1053
1054 arr5 = self.RawArray('i', range(10))
1055 self.assertFalse(hasattr(arr5, 'get_lock'))
1056 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001057
1058#
1059#
1060#
1061
1062class _TestContainers(BaseTestCase):
1063
1064 ALLOWED_TYPES = ('manager',)
1065
1066 def test_list(self):
1067 a = self.list(range(10))
1068 self.assertEqual(a[:], range(10))
1069
1070 b = self.list()
1071 self.assertEqual(b[:], [])
1072
1073 b.extend(range(5))
1074 self.assertEqual(b[:], range(5))
1075
1076 self.assertEqual(b[2], 2)
1077 self.assertEqual(b[2:10], [2,3,4])
1078
1079 b *= 2
1080 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1081
1082 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1083
1084 self.assertEqual(a[:], range(10))
1085
1086 d = [a, b]
1087 e = self.list(d)
1088 self.assertEqual(
1089 e[:],
1090 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1091 )
1092
1093 f = self.list([a])
1094 a.append('hello')
1095 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1096
1097 def test_dict(self):
1098 d = self.dict()
1099 indices = range(65, 70)
1100 for i in indices:
1101 d[i] = chr(i)
1102 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1103 self.assertEqual(sorted(d.keys()), indices)
1104 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1105 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1106
1107 def test_namespace(self):
1108 n = self.Namespace()
1109 n.name = 'Bob'
1110 n.job = 'Builder'
1111 n._hidden = 'hidden'
1112 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1113 del n.job
1114 self.assertEqual(str(n), "Namespace(name='Bob')")
1115 self.assertTrue(hasattr(n, 'name'))
1116 self.assertTrue(not hasattr(n, 'job'))
1117
1118#
1119#
1120#
1121
1122def sqr(x, wait=0.0):
1123 time.sleep(wait)
1124 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001125class _TestPool(BaseTestCase):
1126
1127 def test_apply(self):
1128 papply = self.pool.apply
1129 self.assertEqual(papply(sqr, (5,)), sqr(5))
1130 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1131
1132 def test_map(self):
1133 pmap = self.pool.map
1134 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1135 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1136 map(sqr, range(100)))
1137
Richard Oudkerk21aad972013-10-28 23:02:22 +00001138 def test_map_unplicklable(self):
1139 # Issue #19425 -- failure to pickle should not cause a hang
1140 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001141 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001142 class A(object):
1143 def __reduce__(self):
1144 raise RuntimeError('cannot pickle')
1145 with self.assertRaises(RuntimeError):
1146 self.pool.map(sqr, [A()]*10)
1147
Jesse Noller7530e472009-07-16 14:23:04 +00001148 def test_map_chunksize(self):
1149 try:
1150 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1151 except multiprocessing.TimeoutError:
1152 self.fail("pool.map_async with chunksize stalled on null list")
1153
Benjamin Petersondfd79492008-06-13 19:13:39 +00001154 def test_async(self):
1155 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1156 get = TimingWrapper(res.get)
1157 self.assertEqual(get(), 49)
1158 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1159
1160 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001161 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001162 get = TimingWrapper(res.get)
1163 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1164 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1165
1166 def test_imap(self):
1167 it = self.pool.imap(sqr, range(10))
1168 self.assertEqual(list(it), map(sqr, range(10)))
1169
1170 it = self.pool.imap(sqr, range(10))
1171 for i in range(10):
1172 self.assertEqual(it.next(), i*i)
1173 self.assertRaises(StopIteration, it.next)
1174
1175 it = self.pool.imap(sqr, range(1000), chunksize=100)
1176 for i in range(1000):
1177 self.assertEqual(it.next(), i*i)
1178 self.assertRaises(StopIteration, it.next)
1179
1180 def test_imap_unordered(self):
1181 it = self.pool.imap_unordered(sqr, range(1000))
1182 self.assertEqual(sorted(it), map(sqr, range(1000)))
1183
1184 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1185 self.assertEqual(sorted(it), map(sqr, range(1000)))
1186
1187 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001188 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1189 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1190
Benjamin Petersondfd79492008-06-13 19:13:39 +00001191 p = multiprocessing.Pool(3)
1192 self.assertEqual(3, len(p._pool))
1193 p.close()
1194 p.join()
1195
1196 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001197 p = self.Pool(4)
1198 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001199 time.sleep, [0.1 for i in range(10000)], chunksize=1
1200 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001201 p.terminate()
1202 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001203 join()
1204 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001205
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001206 def test_empty_iterable(self):
1207 # See Issue 12157
1208 p = self.Pool(1)
1209
1210 self.assertEqual(p.map(sqr, []), [])
1211 self.assertEqual(list(p.imap(sqr, [])), [])
1212 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1213 self.assertEqual(p.map_async(sqr, []).get(), [])
1214
1215 p.close()
1216 p.join()
1217
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001218def unpickleable_result():
1219 return lambda: 42
1220
1221class _TestPoolWorkerErrors(BaseTestCase):
1222 ALLOWED_TYPES = ('processes', )
1223
1224 def test_unpickleable_result(self):
1225 from multiprocessing.pool import MaybeEncodingError
1226 p = multiprocessing.Pool(2)
1227
1228 # Make sure we don't lose pool processes because of encoding errors.
1229 for iteration in range(20):
1230 res = p.apply_async(unpickleable_result)
1231 self.assertRaises(MaybeEncodingError, res.get)
1232
1233 p.close()
1234 p.join()
1235
Jesse Noller654ade32010-01-27 03:05:57 +00001236class _TestPoolWorkerLifetime(BaseTestCase):
1237
1238 ALLOWED_TYPES = ('processes', )
1239 def test_pool_worker_lifetime(self):
1240 p = multiprocessing.Pool(3, maxtasksperchild=10)
1241 self.assertEqual(3, len(p._pool))
1242 origworkerpids = [w.pid for w in p._pool]
1243 # Run many tasks so each worker gets replaced (hopefully)
1244 results = []
1245 for i in range(100):
1246 results.append(p.apply_async(sqr, (i, )))
1247 # Fetch the results and verify we got the right answers,
1248 # also ensuring all the tasks have completed.
1249 for (j, res) in enumerate(results):
1250 self.assertEqual(res.get(), sqr(j))
1251 # Refill the pool
1252 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001253 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001254 # (countdown * DELTA = 5 seconds max startup process time)
1255 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001256 while countdown and not all(w.is_alive() for w in p._pool):
1257 countdown -= 1
1258 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001259 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001260 # All pids should be assigned. See issue #7805.
1261 self.assertNotIn(None, origworkerpids)
1262 self.assertNotIn(None, finalworkerpids)
1263 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001264 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1265 p.close()
1266 p.join()
1267
Charles-François Natali46f990e2011-10-24 18:43:51 +02001268 def test_pool_worker_lifetime_early_close(self):
1269 # Issue #10332: closing a pool whose workers have limited lifetimes
1270 # before all the tasks completed would make join() hang.
1271 p = multiprocessing.Pool(3, maxtasksperchild=1)
1272 results = []
1273 for i in range(6):
1274 results.append(p.apply_async(sqr, (i, 0.3)))
1275 p.close()
1276 p.join()
1277 # check the results
1278 for (j, res) in enumerate(results):
1279 self.assertEqual(res.get(), sqr(j))
1280
1281
Benjamin Petersondfd79492008-06-13 19:13:39 +00001282#
1283# Test that manager has expected number of shared objects left
1284#
1285
1286class _TestZZZNumberOfObjects(BaseTestCase):
1287 # Because test cases are sorted alphabetically, this one will get
1288 # run after all the other tests for the manager. It tests that
1289 # there have been no "reference leaks" for the manager's shared
1290 # objects. Note the comment in _TestPool.test_terminate().
1291 ALLOWED_TYPES = ('manager',)
1292
1293 def test_number_of_objects(self):
1294 EXPECTED_NUMBER = 1 # the pool object is still alive
1295 multiprocessing.active_children() # discard dead process objs
1296 gc.collect() # do garbage collection
1297 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001298 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001299 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001300 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001301 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001302
1303 self.assertEqual(refs, EXPECTED_NUMBER)
1304
1305#
1306# Test of creating a customized manager class
1307#
1308
1309from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1310
1311class FooBar(object):
1312 def f(self):
1313 return 'f()'
1314 def g(self):
1315 raise ValueError
1316 def _h(self):
1317 return '_h()'
1318
1319def baz():
1320 for i in xrange(10):
1321 yield i*i
1322
1323class IteratorProxy(BaseProxy):
1324 _exposed_ = ('next', '__next__')
1325 def __iter__(self):
1326 return self
1327 def next(self):
1328 return self._callmethod('next')
1329 def __next__(self):
1330 return self._callmethod('__next__')
1331
1332class MyManager(BaseManager):
1333 pass
1334
1335MyManager.register('Foo', callable=FooBar)
1336MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1337MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1338
1339
1340class _TestMyManager(BaseTestCase):
1341
1342 ALLOWED_TYPES = ('manager',)
1343
1344 def test_mymanager(self):
1345 manager = MyManager()
1346 manager.start()
1347
1348 foo = manager.Foo()
1349 bar = manager.Bar()
1350 baz = manager.baz()
1351
1352 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1353 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1354
1355 self.assertEqual(foo_methods, ['f', 'g'])
1356 self.assertEqual(bar_methods, ['f', '_h'])
1357
1358 self.assertEqual(foo.f(), 'f()')
1359 self.assertRaises(ValueError, foo.g)
1360 self.assertEqual(foo._callmethod('f'), 'f()')
1361 self.assertRaises(RemoteError, foo._callmethod, '_h')
1362
1363 self.assertEqual(bar.f(), 'f()')
1364 self.assertEqual(bar._h(), '_h()')
1365 self.assertEqual(bar._callmethod('f'), 'f()')
1366 self.assertEqual(bar._callmethod('_h'), '_h()')
1367
1368 self.assertEqual(list(baz), [i*i for i in range(10)])
1369
1370 manager.shutdown()
1371
1372#
1373# Test of connecting to a remote server and using xmlrpclib for serialization
1374#
1375
1376_queue = Queue.Queue()
1377def get_queue():
1378 return _queue
1379
1380class QueueManager(BaseManager):
1381 '''manager class used by server process'''
1382QueueManager.register('get_queue', callable=get_queue)
1383
1384class QueueManager2(BaseManager):
1385 '''manager class which specifies the same interface as QueueManager'''
1386QueueManager2.register('get_queue')
1387
1388
1389SERIALIZER = 'xmlrpclib'
1390
1391class _TestRemoteManager(BaseTestCase):
1392
1393 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001394 values = ['hello world', None, True, 2.25,
1395 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1396 ]
1397 result = values[:]
1398 if test_support.have_unicode:
1399 #result[-1] = u'hall\xe5 v\xe4rlden'
1400 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1401 r'\u0441\u0432\u0456\u0442')
1402 values.append(uvalue)
1403 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001404
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001405 @classmethod
1406 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001407 manager = QueueManager2(
1408 address=address, authkey=authkey, serializer=SERIALIZER
1409 )
1410 manager.connect()
1411 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001412 # Note that xmlrpclib will deserialize object as a list not a tuple
1413 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001414
1415 def test_remote(self):
1416 authkey = os.urandom(32)
1417
1418 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001419 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001420 )
1421 manager.start()
1422
1423 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001424 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001425 p.start()
1426
1427 manager2 = QueueManager2(
1428 address=manager.address, authkey=authkey, serializer=SERIALIZER
1429 )
1430 manager2.connect()
1431 queue = manager2.get_queue()
1432
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001433 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001434
1435 # Because we are using xmlrpclib for serialization instead of
1436 # pickle this will cause a serialization error.
1437 self.assertRaises(Exception, queue.put, time.sleep)
1438
1439 # Make queue finalizer run before the server is stopped
1440 del queue
1441 manager.shutdown()
1442
Jesse Noller459a6482009-03-30 15:50:42 +00001443class _TestManagerRestart(BaseTestCase):
1444
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001445 @classmethod
1446 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001447 manager = QueueManager(
1448 address=address, authkey=authkey, serializer=SERIALIZER)
1449 manager.connect()
1450 queue = manager.get_queue()
1451 queue.put('hello world')
1452
1453 def test_rapid_restart(self):
1454 authkey = os.urandom(32)
1455 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001456 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001457 srvr = manager.get_server()
1458 addr = srvr.address
1459 # Close the connection.Listener socket which gets opened as a part
1460 # of manager.get_server(). It's not needed for the test.
1461 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001462 manager.start()
1463
1464 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001465 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001466 p.start()
1467 queue = manager.get_queue()
1468 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001469 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001470 manager.shutdown()
1471 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001472 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001473 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001474 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001475
Benjamin Petersondfd79492008-06-13 19:13:39 +00001476#
1477#
1478#
1479
1480SENTINEL = latin('')
1481
1482class _TestConnection(BaseTestCase):
1483
1484 ALLOWED_TYPES = ('processes', 'threads')
1485
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001486 @classmethod
1487 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001488 for msg in iter(conn.recv_bytes, SENTINEL):
1489 conn.send_bytes(msg)
1490 conn.close()
1491
1492 def test_connection(self):
1493 conn, child_conn = self.Pipe()
1494
1495 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001496 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001497 p.start()
1498
1499 seq = [1, 2.25, None]
1500 msg = latin('hello world')
1501 longmsg = msg * 10
1502 arr = array.array('i', range(4))
1503
1504 if self.TYPE == 'processes':
1505 self.assertEqual(type(conn.fileno()), int)
1506
1507 self.assertEqual(conn.send(seq), None)
1508 self.assertEqual(conn.recv(), seq)
1509
1510 self.assertEqual(conn.send_bytes(msg), None)
1511 self.assertEqual(conn.recv_bytes(), msg)
1512
1513 if self.TYPE == 'processes':
1514 buffer = array.array('i', [0]*10)
1515 expected = list(arr) + [0] * (10 - len(arr))
1516 self.assertEqual(conn.send_bytes(arr), None)
1517 self.assertEqual(conn.recv_bytes_into(buffer),
1518 len(arr) * buffer.itemsize)
1519 self.assertEqual(list(buffer), expected)
1520
1521 buffer = array.array('i', [0]*10)
1522 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1523 self.assertEqual(conn.send_bytes(arr), None)
1524 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1525 len(arr) * buffer.itemsize)
1526 self.assertEqual(list(buffer), expected)
1527
1528 buffer = bytearray(latin(' ' * 40))
1529 self.assertEqual(conn.send_bytes(longmsg), None)
1530 try:
1531 res = conn.recv_bytes_into(buffer)
1532 except multiprocessing.BufferTooShort, e:
1533 self.assertEqual(e.args, (longmsg,))
1534 else:
1535 self.fail('expected BufferTooShort, got %s' % res)
1536
1537 poll = TimingWrapper(conn.poll)
1538
1539 self.assertEqual(poll(), False)
1540 self.assertTimingAlmostEqual(poll.elapsed, 0)
1541
1542 self.assertEqual(poll(TIMEOUT1), False)
1543 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1544
1545 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001546 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001547
1548 self.assertEqual(poll(TIMEOUT1), True)
1549 self.assertTimingAlmostEqual(poll.elapsed, 0)
1550
1551 self.assertEqual(conn.recv(), None)
1552
1553 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1554 conn.send_bytes(really_big_msg)
1555 self.assertEqual(conn.recv_bytes(), really_big_msg)
1556
1557 conn.send_bytes(SENTINEL) # tell child to quit
1558 child_conn.close()
1559
1560 if self.TYPE == 'processes':
1561 self.assertEqual(conn.readable, True)
1562 self.assertEqual(conn.writable, True)
1563 self.assertRaises(EOFError, conn.recv)
1564 self.assertRaises(EOFError, conn.recv_bytes)
1565
1566 p.join()
1567
1568 def test_duplex_false(self):
1569 reader, writer = self.Pipe(duplex=False)
1570 self.assertEqual(writer.send(1), None)
1571 self.assertEqual(reader.recv(), 1)
1572 if self.TYPE == 'processes':
1573 self.assertEqual(reader.readable, True)
1574 self.assertEqual(reader.writable, False)
1575 self.assertEqual(writer.readable, False)
1576 self.assertEqual(writer.writable, True)
1577 self.assertRaises(IOError, reader.send, 2)
1578 self.assertRaises(IOError, writer.recv)
1579 self.assertRaises(IOError, writer.poll)
1580
1581 def test_spawn_close(self):
1582 # We test that a pipe connection can be closed by parent
1583 # process immediately after child is spawned. On Windows this
1584 # would have sometimes failed on old versions because
1585 # child_conn would be closed before the child got a chance to
1586 # duplicate it.
1587 conn, child_conn = self.Pipe()
1588
1589 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001590 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001591 p.start()
1592 child_conn.close() # this might complete before child initializes
1593
1594 msg = latin('hello')
1595 conn.send_bytes(msg)
1596 self.assertEqual(conn.recv_bytes(), msg)
1597
1598 conn.send_bytes(SENTINEL)
1599 conn.close()
1600 p.join()
1601
1602 def test_sendbytes(self):
1603 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001604 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001605
1606 msg = latin('abcdefghijklmnopqrstuvwxyz')
1607 a, b = self.Pipe()
1608
1609 a.send_bytes(msg)
1610 self.assertEqual(b.recv_bytes(), msg)
1611
1612 a.send_bytes(msg, 5)
1613 self.assertEqual(b.recv_bytes(), msg[5:])
1614
1615 a.send_bytes(msg, 7, 8)
1616 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1617
1618 a.send_bytes(msg, 26)
1619 self.assertEqual(b.recv_bytes(), latin(''))
1620
1621 a.send_bytes(msg, 26, 0)
1622 self.assertEqual(b.recv_bytes(), latin(''))
1623
1624 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1625
1626 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1627
1628 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1629
1630 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1631
1632 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1633
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001634 @classmethod
1635 def _is_fd_assigned(cls, fd):
1636 try:
1637 os.fstat(fd)
1638 except OSError as e:
1639 if e.errno == errno.EBADF:
1640 return False
1641 raise
1642 else:
1643 return True
1644
1645 @classmethod
1646 def _writefd(cls, conn, data, create_dummy_fds=False):
1647 if create_dummy_fds:
1648 for i in range(0, 256):
1649 if not cls._is_fd_assigned(i):
1650 os.dup2(conn.fileno(), i)
1651 fd = reduction.recv_handle(conn)
1652 if msvcrt:
1653 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1654 os.write(fd, data)
1655 os.close(fd)
1656
Charles-François Natalif8413b22011-09-21 18:44:49 +02001657 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001658 def test_fd_transfer(self):
1659 if self.TYPE != 'processes':
1660 self.skipTest("only makes sense with processes")
1661 conn, child_conn = self.Pipe(duplex=True)
1662
1663 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001664 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001665 p.start()
1666 with open(test_support.TESTFN, "wb") as f:
1667 fd = f.fileno()
1668 if msvcrt:
1669 fd = msvcrt.get_osfhandle(fd)
1670 reduction.send_handle(conn, fd, p.pid)
1671 p.join()
1672 with open(test_support.TESTFN, "rb") as f:
1673 self.assertEqual(f.read(), b"foo")
1674
Charles-François Natalif8413b22011-09-21 18:44:49 +02001675 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001676 @unittest.skipIf(sys.platform == "win32",
1677 "test semantics don't make sense on Windows")
1678 @unittest.skipIf(MAXFD <= 256,
1679 "largest assignable fd number is too small")
1680 @unittest.skipUnless(hasattr(os, "dup2"),
1681 "test needs os.dup2()")
1682 def test_large_fd_transfer(self):
1683 # With fd > 256 (issue #11657)
1684 if self.TYPE != 'processes':
1685 self.skipTest("only makes sense with processes")
1686 conn, child_conn = self.Pipe(duplex=True)
1687
1688 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001689 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001690 p.start()
1691 with open(test_support.TESTFN, "wb") as f:
1692 fd = f.fileno()
1693 for newfd in range(256, MAXFD):
1694 if not self._is_fd_assigned(newfd):
1695 break
1696 else:
1697 self.fail("could not find an unassigned large file descriptor")
1698 os.dup2(fd, newfd)
1699 try:
1700 reduction.send_handle(conn, newfd, p.pid)
1701 finally:
1702 os.close(newfd)
1703 p.join()
1704 with open(test_support.TESTFN, "rb") as f:
1705 self.assertEqual(f.read(), b"bar")
1706
Jesus Ceac23484b2011-09-21 03:47:39 +02001707 @classmethod
1708 def _send_data_without_fd(self, conn):
1709 os.write(conn.fileno(), b"\0")
1710
Charles-François Natalif8413b22011-09-21 18:44:49 +02001711 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001712 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1713 def test_missing_fd_transfer(self):
1714 # Check that exception is raised when received data is not
1715 # accompanied by a file descriptor in ancillary data.
1716 if self.TYPE != 'processes':
1717 self.skipTest("only makes sense with processes")
1718 conn, child_conn = self.Pipe(duplex=True)
1719
1720 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1721 p.daemon = True
1722 p.start()
1723 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1724 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001725
Benjamin Petersondfd79492008-06-13 19:13:39 +00001726class _TestListenerClient(BaseTestCase):
1727
1728 ALLOWED_TYPES = ('processes', 'threads')
1729
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001730 @classmethod
1731 def _test(cls, address):
1732 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001733 conn.send('hello')
1734 conn.close()
1735
1736 def test_listener_client(self):
1737 for family in self.connection.families:
1738 l = self.connection.Listener(family=family)
1739 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001740 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001741 p.start()
1742 conn = l.accept()
1743 self.assertEqual(conn.recv(), 'hello')
1744 p.join()
1745 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001746
1747 def test_issue14725(self):
1748 l = self.connection.Listener()
1749 p = self.Process(target=self._test, args=(l.address,))
1750 p.daemon = True
1751 p.start()
1752 time.sleep(1)
1753 # On Windows the client process should by now have connected,
1754 # written data and closed the pipe handle by now. This causes
1755 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1756 # 14725.
1757 conn = l.accept()
1758 self.assertEqual(conn.recv(), 'hello')
1759 conn.close()
1760 p.join()
1761 l.close()
1762
Benjamin Petersondfd79492008-06-13 19:13:39 +00001763#
1764# Test of sending connection and socket objects between processes
1765#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001766"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001767class _TestPicklingConnections(BaseTestCase):
1768
1769 ALLOWED_TYPES = ('processes',)
1770
1771 def _listener(self, conn, families):
1772 for fam in families:
1773 l = self.connection.Listener(family=fam)
1774 conn.send(l.address)
1775 new_conn = l.accept()
1776 conn.send(new_conn)
1777
1778 if self.TYPE == 'processes':
1779 l = socket.socket()
1780 l.bind(('localhost', 0))
1781 conn.send(l.getsockname())
1782 l.listen(1)
1783 new_conn, addr = l.accept()
1784 conn.send(new_conn)
1785
1786 conn.recv()
1787
1788 def _remote(self, conn):
1789 for (address, msg) in iter(conn.recv, None):
1790 client = self.connection.Client(address)
1791 client.send(msg.upper())
1792 client.close()
1793
1794 if self.TYPE == 'processes':
1795 address, msg = conn.recv()
1796 client = socket.socket()
1797 client.connect(address)
1798 client.sendall(msg.upper())
1799 client.close()
1800
1801 conn.close()
1802
1803 def test_pickling(self):
1804 try:
1805 multiprocessing.allow_connection_pickling()
1806 except ImportError:
1807 return
1808
1809 families = self.connection.families
1810
1811 lconn, lconn0 = self.Pipe()
1812 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001813 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001814 lp.start()
1815 lconn0.close()
1816
1817 rconn, rconn0 = self.Pipe()
1818 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001819 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001820 rp.start()
1821 rconn0.close()
1822
1823 for fam in families:
1824 msg = ('This connection uses family %s' % fam).encode('ascii')
1825 address = lconn.recv()
1826 rconn.send((address, msg))
1827 new_conn = lconn.recv()
1828 self.assertEqual(new_conn.recv(), msg.upper())
1829
1830 rconn.send(None)
1831
1832 if self.TYPE == 'processes':
1833 msg = latin('This connection uses a normal socket')
1834 address = lconn.recv()
1835 rconn.send((address, msg))
1836 if hasattr(socket, 'fromfd'):
1837 new_conn = lconn.recv()
1838 self.assertEqual(new_conn.recv(100), msg.upper())
1839 else:
1840 # XXX On Windows with Py2.6 need to backport fromfd()
1841 discard = lconn.recv_bytes()
1842
1843 lconn.send(None)
1844
1845 rconn.close()
1846 lconn.close()
1847
1848 lp.join()
1849 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001850"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001851#
1852#
1853#
1854
1855class _TestHeap(BaseTestCase):
1856
1857 ALLOWED_TYPES = ('processes',)
1858
1859 def test_heap(self):
1860 iterations = 5000
1861 maxblocks = 50
1862 blocks = []
1863
1864 # create and destroy lots of blocks of different sizes
1865 for i in xrange(iterations):
1866 size = int(random.lognormvariate(0, 1) * 1000)
1867 b = multiprocessing.heap.BufferWrapper(size)
1868 blocks.append(b)
1869 if len(blocks) > maxblocks:
1870 i = random.randrange(maxblocks)
1871 del blocks[i]
1872
1873 # get the heap object
1874 heap = multiprocessing.heap.BufferWrapper._heap
1875
1876 # verify the state of the heap
1877 all = []
1878 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001879 heap._lock.acquire()
1880 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001881 for L in heap._len_to_seq.values():
1882 for arena, start, stop in L:
1883 all.append((heap._arenas.index(arena), start, stop,
1884 stop-start, 'free'))
1885 for arena, start, stop in heap._allocated_blocks:
1886 all.append((heap._arenas.index(arena), start, stop,
1887 stop-start, 'occupied'))
1888 occupied += (stop-start)
1889
1890 all.sort()
1891
1892 for i in range(len(all)-1):
1893 (arena, start, stop) = all[i][:3]
1894 (narena, nstart, nstop) = all[i+1][:3]
1895 self.assertTrue((arena != narena and nstart == 0) or
1896 (stop == nstart))
1897
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001898 def test_free_from_gc(self):
1899 # Check that freeing of blocks by the garbage collector doesn't deadlock
1900 # (issue #12352).
1901 # Make sure the GC is enabled, and set lower collection thresholds to
1902 # make collections more frequent (and increase the probability of
1903 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001904 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001905 gc.enable()
1906 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001907 thresholds = gc.get_threshold()
1908 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001909 gc.set_threshold(10)
1910
1911 # perform numerous block allocations, with cyclic references to make
1912 # sure objects are collected asynchronously by the gc
1913 for i in range(5000):
1914 a = multiprocessing.heap.BufferWrapper(1)
1915 b = multiprocessing.heap.BufferWrapper(1)
1916 # circular references
1917 a.buddy = b
1918 b.buddy = a
1919
Benjamin Petersondfd79492008-06-13 19:13:39 +00001920#
1921#
1922#
1923
Benjamin Petersondfd79492008-06-13 19:13:39 +00001924class _Foo(Structure):
1925 _fields_ = [
1926 ('x', c_int),
1927 ('y', c_double)
1928 ]
1929
1930class _TestSharedCTypes(BaseTestCase):
1931
1932 ALLOWED_TYPES = ('processes',)
1933
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001934 def setUp(self):
1935 if not HAS_SHAREDCTYPES:
1936 self.skipTest("requires multiprocessing.sharedctypes")
1937
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001938 @classmethod
1939 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001940 x.value *= 2
1941 y.value *= 2
1942 foo.x *= 2
1943 foo.y *= 2
1944 string.value *= 2
1945 for i in range(len(arr)):
1946 arr[i] *= 2
1947
1948 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001949 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001950 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001951 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001952 arr = self.Array('d', range(10), lock=lock)
1953 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001954 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001955
1956 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001957 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001958 p.start()
1959 p.join()
1960
1961 self.assertEqual(x.value, 14)
1962 self.assertAlmostEqual(y.value, 2.0/3.0)
1963 self.assertEqual(foo.x, 6)
1964 self.assertAlmostEqual(foo.y, 4.0)
1965 for i in range(10):
1966 self.assertAlmostEqual(arr[i], i*2)
1967 self.assertEqual(string.value, latin('hellohello'))
1968
1969 def test_synchronize(self):
1970 self.test_sharedctypes(lock=True)
1971
1972 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001973 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001974 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001975 foo.x = 0
1976 foo.y = 0
1977 self.assertEqual(bar.x, 2)
1978 self.assertAlmostEqual(bar.y, 5.0)
1979
1980#
1981#
1982#
1983
1984class _TestFinalize(BaseTestCase):
1985
1986 ALLOWED_TYPES = ('processes',)
1987
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001988 @classmethod
1989 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001990 class Foo(object):
1991 pass
1992
1993 a = Foo()
1994 util.Finalize(a, conn.send, args=('a',))
1995 del a # triggers callback for a
1996
1997 b = Foo()
1998 close_b = util.Finalize(b, conn.send, args=('b',))
1999 close_b() # triggers callback for b
2000 close_b() # does nothing because callback has already been called
2001 del b # does nothing because callback has already been called
2002
2003 c = Foo()
2004 util.Finalize(c, conn.send, args=('c',))
2005
2006 d10 = Foo()
2007 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2008
2009 d01 = Foo()
2010 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2011 d02 = Foo()
2012 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2013 d03 = Foo()
2014 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2015
2016 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2017
2018 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2019
Ezio Melottic2077b02011-03-16 12:34:31 +02002020 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002021 # garbage collecting locals
2022 util._exit_function()
2023 conn.close()
2024 os._exit(0)
2025
2026 def test_finalize(self):
2027 conn, child_conn = self.Pipe()
2028
2029 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002030 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002031 p.start()
2032 p.join()
2033
2034 result = [obj for obj in iter(conn.recv, 'STOP')]
2035 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2036
2037#
2038# Test that from ... import * works for each module
2039#
2040
2041class _TestImportStar(BaseTestCase):
2042
2043 ALLOWED_TYPES = ('processes',)
2044
2045 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002046 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002047 'multiprocessing', 'multiprocessing.connection',
2048 'multiprocessing.heap', 'multiprocessing.managers',
2049 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002050 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002051 ]
2052
Charles-François Natalif8413b22011-09-21 18:44:49 +02002053 if HAS_REDUCTION:
2054 modules.append('multiprocessing.reduction')
2055
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002056 if c_int is not None:
2057 # This module requires _ctypes
2058 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002059
2060 for name in modules:
2061 __import__(name)
2062 mod = sys.modules[name]
2063
2064 for attr in getattr(mod, '__all__', ()):
2065 self.assertTrue(
2066 hasattr(mod, attr),
2067 '%r does not have attribute %r' % (mod, attr)
2068 )
2069
2070#
2071# Quick test that logging works -- does not test logging output
2072#
2073
2074class _TestLogging(BaseTestCase):
2075
2076 ALLOWED_TYPES = ('processes',)
2077
2078 def test_enable_logging(self):
2079 logger = multiprocessing.get_logger()
2080 logger.setLevel(util.SUBWARNING)
2081 self.assertTrue(logger is not None)
2082 logger.debug('this will not be printed')
2083 logger.info('nor will this')
2084 logger.setLevel(LOG_LEVEL)
2085
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002086 @classmethod
2087 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002088 logger = multiprocessing.get_logger()
2089 conn.send(logger.getEffectiveLevel())
2090
2091 def test_level(self):
2092 LEVEL1 = 32
2093 LEVEL2 = 37
2094
2095 logger = multiprocessing.get_logger()
2096 root_logger = logging.getLogger()
2097 root_level = root_logger.level
2098
2099 reader, writer = multiprocessing.Pipe(duplex=False)
2100
2101 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002102 p = self.Process(target=self._test_level, args=(writer,))
2103 p.daemon = True
2104 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002105 self.assertEqual(LEVEL1, reader.recv())
2106
2107 logger.setLevel(logging.NOTSET)
2108 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002109 p = self.Process(target=self._test_level, args=(writer,))
2110 p.daemon = True
2111 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002112 self.assertEqual(LEVEL2, reader.recv())
2113
2114 root_logger.setLevel(root_level)
2115 logger.setLevel(level=LOG_LEVEL)
2116
Jesse Noller814d02d2009-11-21 14:38:23 +00002117
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002118# class _TestLoggingProcessName(BaseTestCase):
2119#
2120# def handle(self, record):
2121# assert record.processName == multiprocessing.current_process().name
2122# self.__handled = True
2123#
2124# def test_logging(self):
2125# handler = logging.Handler()
2126# handler.handle = self.handle
2127# self.__handled = False
2128# # Bypass getLogger() and side-effects
2129# logger = logging.getLoggerClass()(
2130# 'multiprocessing.test.TestLoggingProcessName')
2131# logger.addHandler(handler)
2132# logger.propagate = False
2133#
2134# logger.warn('foo')
2135# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002136
Benjamin Petersondfd79492008-06-13 19:13:39 +00002137#
Richard Oudkerkba482642013-02-26 12:37:07 +00002138# Check that Process.join() retries if os.waitpid() fails with EINTR
2139#
2140
2141class _TestPollEintr(BaseTestCase):
2142
2143 ALLOWED_TYPES = ('processes',)
2144
2145 @classmethod
2146 def _killer(cls, pid):
2147 time.sleep(0.5)
2148 os.kill(pid, signal.SIGUSR1)
2149
2150 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2151 def test_poll_eintr(self):
2152 got_signal = [False]
2153 def record(*args):
2154 got_signal[0] = True
2155 pid = os.getpid()
2156 oldhandler = signal.signal(signal.SIGUSR1, record)
2157 try:
2158 killer = self.Process(target=self._killer, args=(pid,))
2159 killer.start()
2160 p = self.Process(target=time.sleep, args=(1,))
2161 p.start()
2162 p.join()
2163 self.assertTrue(got_signal[0])
2164 self.assertEqual(p.exitcode, 0)
2165 killer.join()
2166 finally:
2167 signal.signal(signal.SIGUSR1, oldhandler)
2168
2169#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002170# Test to verify handle verification, see issue 3321
2171#
2172
2173class TestInvalidHandle(unittest.TestCase):
2174
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002175 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002176 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002177 conn = _multiprocessing.Connection(44977608)
2178 self.assertRaises(IOError, conn.poll)
2179 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002180
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002181#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002182# Functions used to create test cases from the base ones in this module
2183#
2184
2185def get_attributes(Source, names):
2186 d = {}
2187 for name in names:
2188 obj = getattr(Source, name)
2189 if type(obj) == type(get_attributes):
2190 obj = staticmethod(obj)
2191 d[name] = obj
2192 return d
2193
2194def create_test_cases(Mixin, type):
2195 result = {}
2196 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002197 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002198
2199 for name in glob.keys():
2200 if name.startswith('_Test'):
2201 base = glob[name]
2202 if type in base.ALLOWED_TYPES:
2203 newname = 'With' + Type + name[1:]
2204 class Temp(base, unittest.TestCase, Mixin):
2205 pass
2206 result[newname] = Temp
2207 Temp.__name__ = newname
2208 Temp.__module__ = Mixin.__module__
2209 return result
2210
2211#
2212# Create test cases
2213#
2214
2215class ProcessesMixin(object):
2216 TYPE = 'processes'
2217 Process = multiprocessing.Process
2218 locals().update(get_attributes(multiprocessing, (
2219 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2220 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2221 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002222 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002223 )))
2224
2225testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2226globals().update(testcases_processes)
2227
2228
2229class ManagerMixin(object):
2230 TYPE = 'manager'
2231 Process = multiprocessing.Process
2232 manager = object.__new__(multiprocessing.managers.SyncManager)
2233 locals().update(get_attributes(manager, (
2234 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2235 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002236 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002237 )))
2238
2239testcases_manager = create_test_cases(ManagerMixin, type='manager')
2240globals().update(testcases_manager)
2241
2242
2243class ThreadsMixin(object):
2244 TYPE = 'threads'
2245 Process = multiprocessing.dummy.Process
2246 locals().update(get_attributes(multiprocessing.dummy, (
2247 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2248 'Condition', 'Event', 'Value', 'Array', 'current_process',
2249 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002250 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002251 )))
2252
2253testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2254globals().update(testcases_threads)
2255
Neal Norwitz0c519b32008-08-25 01:50:24 +00002256class OtherTest(unittest.TestCase):
2257 # TODO: add more tests for deliver/answer challenge.
2258 def test_deliver_challenge_auth_failure(self):
2259 class _FakeConnection(object):
2260 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002261 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002262 def send_bytes(self, data):
2263 pass
2264 self.assertRaises(multiprocessing.AuthenticationError,
2265 multiprocessing.connection.deliver_challenge,
2266 _FakeConnection(), b'abc')
2267
2268 def test_answer_challenge_auth_failure(self):
2269 class _FakeConnection(object):
2270 def __init__(self):
2271 self.count = 0
2272 def recv_bytes(self, size):
2273 self.count += 1
2274 if self.count == 1:
2275 return multiprocessing.connection.CHALLENGE
2276 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002277 return b'something bogus'
2278 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002279 def send_bytes(self, data):
2280 pass
2281 self.assertRaises(multiprocessing.AuthenticationError,
2282 multiprocessing.connection.answer_challenge,
2283 _FakeConnection(), b'abc')
2284
Jesse Noller7152f6d2009-04-02 05:17:26 +00002285#
2286# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2287#
2288
2289def initializer(ns):
2290 ns.test += 1
2291
2292class TestInitializers(unittest.TestCase):
2293 def setUp(self):
2294 self.mgr = multiprocessing.Manager()
2295 self.ns = self.mgr.Namespace()
2296 self.ns.test = 0
2297
2298 def tearDown(self):
2299 self.mgr.shutdown()
2300
2301 def test_manager_initializer(self):
2302 m = multiprocessing.managers.SyncManager()
2303 self.assertRaises(TypeError, m.start, 1)
2304 m.start(initializer, (self.ns,))
2305 self.assertEqual(self.ns.test, 1)
2306 m.shutdown()
2307
2308 def test_pool_initializer(self):
2309 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2310 p = multiprocessing.Pool(1, initializer, (self.ns,))
2311 p.close()
2312 p.join()
2313 self.assertEqual(self.ns.test, 1)
2314
Jesse Noller1b90efb2009-06-30 17:11:52 +00002315#
2316# Issue 5155, 5313, 5331: Test process in processes
2317# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2318#
2319
Richard Oudkerkc5496072013-09-29 17:10:40 +01002320def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002321 try:
2322 item = q.get(block=False)
2323 except Queue.Empty:
2324 pass
2325
Richard Oudkerkc5496072013-09-29 17:10:40 +01002326def _test_process(q):
2327 queue = multiprocessing.Queue()
2328 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2329 subProc.daemon = True
2330 subProc.start()
2331 subProc.join()
2332
Jesse Noller1b90efb2009-06-30 17:11:52 +00002333def _afunc(x):
2334 return x*x
2335
2336def pool_in_process():
2337 pool = multiprocessing.Pool(processes=4)
2338 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2339
2340class _file_like(object):
2341 def __init__(self, delegate):
2342 self._delegate = delegate
2343 self._pid = None
2344
2345 @property
2346 def cache(self):
2347 pid = os.getpid()
2348 # There are no race conditions since fork keeps only the running thread
2349 if pid != self._pid:
2350 self._pid = pid
2351 self._cache = []
2352 return self._cache
2353
2354 def write(self, data):
2355 self.cache.append(data)
2356
2357 def flush(self):
2358 self._delegate.write(''.join(self.cache))
2359 self._cache = []
2360
2361class TestStdinBadfiledescriptor(unittest.TestCase):
2362
2363 def test_queue_in_process(self):
2364 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002365 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002366 proc.start()
2367 proc.join()
2368
2369 def test_pool_in_process(self):
2370 p = multiprocessing.Process(target=pool_in_process)
2371 p.start()
2372 p.join()
2373
2374 def test_flushing(self):
2375 sio = StringIO()
2376 flike = _file_like(sio)
2377 flike.write('foo')
2378 proc = multiprocessing.Process(target=lambda: flike.flush())
2379 flike.flush()
2380 assert sio.getvalue() == 'foo'
2381
Richard Oudkerke4b99382012-07-27 14:05:46 +01002382#
2383# Test interaction with socket timeouts - see Issue #6056
2384#
2385
2386class TestTimeouts(unittest.TestCase):
2387 @classmethod
2388 def _test_timeout(cls, child, address):
2389 time.sleep(1)
2390 child.send(123)
2391 child.close()
2392 conn = multiprocessing.connection.Client(address)
2393 conn.send(456)
2394 conn.close()
2395
2396 def test_timeout(self):
2397 old_timeout = socket.getdefaulttimeout()
2398 try:
2399 socket.setdefaulttimeout(0.1)
2400 parent, child = multiprocessing.Pipe(duplex=True)
2401 l = multiprocessing.connection.Listener(family='AF_INET')
2402 p = multiprocessing.Process(target=self._test_timeout,
2403 args=(child, l.address))
2404 p.start()
2405 child.close()
2406 self.assertEqual(parent.recv(), 123)
2407 parent.close()
2408 conn = l.accept()
2409 self.assertEqual(conn.recv(), 456)
2410 conn.close()
2411 l.close()
2412 p.join(10)
2413 finally:
2414 socket.setdefaulttimeout(old_timeout)
2415
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002416#
2417# Test what happens with no "if __name__ == '__main__'"
2418#
2419
2420class TestNoForkBomb(unittest.TestCase):
2421 def test_noforkbomb(self):
2422 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2423 if WIN32:
2424 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002425 self.assertEqual(out, '')
2426 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002427 else:
2428 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002429 self.assertEqual(out.rstrip(), '123')
2430 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002431
2432#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002433# Issue 12098: check sys.flags of child matches that for parent
2434#
2435
2436class TestFlags(unittest.TestCase):
2437 @classmethod
2438 def run_in_grandchild(cls, conn):
2439 conn.send(tuple(sys.flags))
2440
2441 @classmethod
2442 def run_in_child(cls):
2443 import json
2444 r, w = multiprocessing.Pipe(duplex=False)
2445 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2446 p.start()
2447 grandchild_flags = r.recv()
2448 p.join()
2449 r.close()
2450 w.close()
2451 flags = (tuple(sys.flags), grandchild_flags)
2452 print(json.dumps(flags))
2453
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002454 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002455 def test_flags(self):
2456 import json, subprocess
2457 # start child process using unusual flags
2458 prog = ('from test.test_multiprocessing import TestFlags; ' +
2459 'TestFlags.run_in_child()')
2460 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002461 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002462 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2463 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002464
2465#
2466# Issue #17555: ForkAwareThreadLock
2467#
2468
2469class TestForkAwareThreadLock(unittest.TestCase):
2470 # We recurisvely start processes. Issue #17555 meant that the
2471 # after fork registry would get duplicate entries for the same
2472 # lock. The size of the registry at generation n was ~2**n.
2473
2474 @classmethod
2475 def child(cls, n, conn):
2476 if n > 1:
2477 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2478 p.start()
2479 p.join()
2480 else:
2481 conn.send(len(util._afterfork_registry))
2482 conn.close()
2483
2484 def test_lock(self):
2485 r, w = multiprocessing.Pipe(False)
2486 l = util.ForkAwareThreadLock()
2487 old_size = len(util._afterfork_registry)
2488 p = multiprocessing.Process(target=self.child, args=(5, w))
2489 p.start()
2490 new_size = r.recv()
2491 p.join()
2492 self.assertLessEqual(new_size, old_size)
2493
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002494#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002495# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2496#
2497
2498class TestIgnoreEINTR(unittest.TestCase):
2499
2500 @classmethod
2501 def _test_ignore(cls, conn):
2502 def handler(signum, frame):
2503 pass
2504 signal.signal(signal.SIGUSR1, handler)
2505 conn.send('ready')
2506 x = conn.recv()
2507 conn.send(x)
2508 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2509
2510 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2511 def test_ignore(self):
2512 conn, child_conn = multiprocessing.Pipe()
2513 try:
2514 p = multiprocessing.Process(target=self._test_ignore,
2515 args=(child_conn,))
2516 p.daemon = True
2517 p.start()
2518 child_conn.close()
2519 self.assertEqual(conn.recv(), 'ready')
2520 time.sleep(0.1)
2521 os.kill(p.pid, signal.SIGUSR1)
2522 time.sleep(0.1)
2523 conn.send(1234)
2524 self.assertEqual(conn.recv(), 1234)
2525 time.sleep(0.1)
2526 os.kill(p.pid, signal.SIGUSR1)
2527 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2528 time.sleep(0.1)
2529 p.join()
2530 finally:
2531 conn.close()
2532
2533 @classmethod
2534 def _test_ignore_listener(cls, conn):
2535 def handler(signum, frame):
2536 pass
2537 signal.signal(signal.SIGUSR1, handler)
2538 l = multiprocessing.connection.Listener()
2539 conn.send(l.address)
2540 a = l.accept()
2541 a.send('welcome')
2542
2543 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2544 def test_ignore_listener(self):
2545 conn, child_conn = multiprocessing.Pipe()
2546 try:
2547 p = multiprocessing.Process(target=self._test_ignore_listener,
2548 args=(child_conn,))
2549 p.daemon = True
2550 p.start()
2551 child_conn.close()
2552 address = conn.recv()
2553 time.sleep(0.1)
2554 os.kill(p.pid, signal.SIGUSR1)
2555 time.sleep(0.1)
2556 client = multiprocessing.connection.Client(address)
2557 self.assertEqual(client.recv(), 'welcome')
2558 p.join()
2559 finally:
2560 conn.close()
2561
2562#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002563#
2564#
2565
Jesse Noller1b90efb2009-06-30 17:11:52 +00002566testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002567 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002568 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002569
Benjamin Petersondfd79492008-06-13 19:13:39 +00002570#
2571#
2572#
2573
2574def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002575 if sys.platform.startswith("linux"):
2576 try:
2577 lock = multiprocessing.RLock()
2578 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002579 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002580
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002581 check_enough_semaphores()
2582
Benjamin Petersondfd79492008-06-13 19:13:39 +00002583 if run is None:
2584 from test.test_support import run_unittest as run
2585
2586 util.get_temp_dir() # creates temp directory for use by all processes
2587
2588 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2589
Jesse Noller146b7ab2008-07-02 16:44:09 +00002590 ProcessesMixin.pool = multiprocessing.Pool(4)
2591 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2592 ManagerMixin.manager.__init__()
2593 ManagerMixin.manager.start()
2594 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002595
2596 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002597 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2598 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002599 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2600 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002601 )
2602
2603 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2604 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002605 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2606 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002607 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002608 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002609 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002610 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2611 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2612 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002613 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002614
Jesse Noller146b7ab2008-07-02 16:44:09 +00002615 ThreadsMixin.pool.terminate()
2616 ProcessesMixin.pool.terminate()
2617 ManagerMixin.pool.terminate()
2618 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002619
Jesse Noller146b7ab2008-07-02 16:44:09 +00002620 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002621
2622def main():
2623 test_main(unittest.TextTestRunner(verbosity=2).run)
2624
2625if __name__ == '__main__':
2626 main()