blob: c66727cf3ae52e7c09de2cedafd7a6f45e15a17f [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010017import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020021# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000022# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
Charles-François Natalif8413b22011-09-21 18:44:49 +020035from multiprocessing import util
36
37try:
38 from multiprocessing import reduction
39 HAS_REDUCTION = True
40except ImportError:
41 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Brian Curtina06e9b82010-10-07 02:27:41 +000043try:
44 from multiprocessing.sharedctypes import Value, copy
45 HAS_SHAREDCTYPES = True
46except ImportError:
47 HAS_SHAREDCTYPES = False
48
Antoine Pitroua1a8da82011-08-23 19:54:20 +020049try:
50 import msvcrt
51except ImportError:
52 msvcrt = None
53
Benjamin Petersondfd79492008-06-13 19:13:39 +000054#
55#
56#
57
Benjamin Petersone79edf52008-07-13 18:34:58 +000058latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000059
Benjamin Petersondfd79492008-06-13 19:13:39 +000060#
61# Constants
62#
63
64LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000065#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000066
67DELTA = 0.1
68CHECK_TIMINGS = False # making true makes tests take a lot longer
69 # and can sometimes cause some non-serious
70 # failures because some calls block a bit
71 # longer than expected
72if CHECK_TIMINGS:
73 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
74else:
75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
76
77HAVE_GETVALUE = not getattr(_multiprocessing,
78 'HAVE_BROKEN_SEM_GETVALUE', False)
79
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000080WIN32 = (sys.platform == "win32")
81
Antoine Pitroua1a8da82011-08-23 19:54:20 +020082try:
83 MAXFD = os.sysconf("SC_OPEN_MAX")
84except:
85 MAXFD = 256
86
Benjamin Petersondfd79492008-06-13 19:13:39 +000087#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000088# Some tests require ctypes
89#
90
91try:
Nick Coghlan13623662010-04-10 14:24:36 +000092 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000093except ImportError:
94 Structure = object
95 c_int = c_double = None
96
Charles-François Natali6392d7f2011-11-22 18:35:18 +010097
98def check_enough_semaphores():
99 """Check that the system supports enough semaphores to run the test."""
100 # minimum number of semaphores available according to POSIX
101 nsems_min = 256
102 try:
103 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
104 except (AttributeError, ValueError):
105 # sysconf not available or setting not available
106 return
107 if nsems == -1 or nsems >= nsems_min:
108 return
109 raise unittest.SkipTest("The OS doesn't support enough semaphores "
110 "to run the test (required: %d)." % nsems_min)
111
112
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000113#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000114# Creates a wrapper for a function which records the time it takes to finish
115#
116
117class TimingWrapper(object):
118
119 def __init__(self, func):
120 self.func = func
121 self.elapsed = None
122
123 def __call__(self, *args, **kwds):
124 t = time.time()
125 try:
126 return self.func(*args, **kwds)
127 finally:
128 self.elapsed = time.time() - t
129
130#
131# Base class for test cases
132#
133
134class BaseTestCase(object):
135
136 ALLOWED_TYPES = ('processes', 'manager', 'threads')
137
138 def assertTimingAlmostEqual(self, a, b):
139 if CHECK_TIMINGS:
140 self.assertAlmostEqual(a, b, 1)
141
142 def assertReturnsIfImplemented(self, value, func, *args):
143 try:
144 res = func(*args)
145 except NotImplementedError:
146 pass
147 else:
148 return self.assertEqual(value, res)
149
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000150 # For the sanity of Windows users, rather than crashing or freezing in
151 # multiple ways.
152 def __reduce__(self, *args):
153 raise NotImplementedError("shouldn't try to pickle a test case")
154
155 __reduce_ex__ = __reduce__
156
Benjamin Petersondfd79492008-06-13 19:13:39 +0000157#
158# Return the value of a semaphore
159#
160
161def get_value(self):
162 try:
163 return self.get_value()
164 except AttributeError:
165 try:
166 return self._Semaphore__value
167 except AttributeError:
168 try:
169 return self._value
170 except AttributeError:
171 raise NotImplementedError
172
173#
174# Testcases
175#
176
177class _TestProcess(BaseTestCase):
178
179 ALLOWED_TYPES = ('processes', 'threads')
180
181 def test_current(self):
182 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600183 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184
185 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000186 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000187
188 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000189 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000190 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000191 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000192 self.assertEqual(current.ident, os.getpid())
193 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000195 @classmethod
196 def _test(cls, q, *args, **kwds):
197 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198 q.put(args)
199 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000201 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203 q.put(current.pid)
204
205 def test_process(self):
206 q = self.Queue(1)
207 e = self.Event()
208 args = (q, 1, 2)
209 kwargs = {'hello':23, 'bye':2.54}
210 name = 'SomeProcess'
211 p = self.Process(
212 target=self._test, args=args, kwargs=kwargs, name=name
213 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000214 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215 current = self.current_process()
216
217 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000218 self.assertEqual(p.authkey, current.authkey)
219 self.assertEqual(p.is_alive(), False)
220 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000221 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000223 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000224
225 p.start()
226
Ezio Melotti2623a372010-11-21 13:34:58 +0000227 self.assertEqual(p.exitcode, None)
228 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000229 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000230
Ezio Melotti2623a372010-11-21 13:34:58 +0000231 self.assertEqual(q.get(), args[1:])
232 self.assertEqual(q.get(), kwargs)
233 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000234 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000235 self.assertEqual(q.get(), current.authkey)
236 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000237
238 p.join()
239
Ezio Melotti2623a372010-11-21 13:34:58 +0000240 self.assertEqual(p.exitcode, 0)
241 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000242 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000243
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000244 @classmethod
245 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000246 time.sleep(1000)
247
248 def test_terminate(self):
249 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600250 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000251
252 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000253 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000254 p.start()
255
256 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000258 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000259
260 p.terminate()
261
262 join = TimingWrapper(p.join)
263 self.assertEqual(join(), None)
264 self.assertTimingAlmostEqual(join.elapsed, 0.0)
265
266 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000267 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000268
269 p.join()
270
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000271 # XXX sometimes get p.exitcode == 0 on Windows ...
272 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000273
274 def test_cpu_count(self):
275 try:
276 cpus = multiprocessing.cpu_count()
277 except NotImplementedError:
278 cpus = 1
279 self.assertTrue(type(cpus) is int)
280 self.assertTrue(cpus >= 1)
281
282 def test_active_children(self):
283 self.assertEqual(type(self.active_children()), list)
284
285 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000286 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000287
Jesus Cea6f6016b2011-09-09 20:26:57 +0200288 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000289 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000290 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000291
292 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000294
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000295 @classmethod
296 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000297 from multiprocessing import forking
298 wconn.send(id)
299 if len(id) < 2:
300 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000301 p = cls.Process(
302 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000303 )
304 p.start()
305 p.join()
306
307 def test_recursion(self):
308 rconn, wconn = self.Pipe(duplex=False)
309 self._test_recursion(wconn, [])
310
311 time.sleep(DELTA)
312 result = []
313 while rconn.poll():
314 result.append(rconn.recv())
315
316 expected = [
317 [],
318 [0],
319 [0, 0],
320 [0, 1],
321 [1],
322 [1, 0],
323 [1, 1]
324 ]
325 self.assertEqual(result, expected)
326
Richard Oudkerk2182e052012-06-06 19:01:14 +0100327 @classmethod
328 def _test_sys_exit(cls, reason, testfn):
329 sys.stderr = open(testfn, 'w')
330 sys.exit(reason)
331
332 def test_sys_exit(self):
333 # See Issue 13854
334 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600335 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100336
337 testfn = test_support.TESTFN
338 self.addCleanup(test_support.unlink, testfn)
339
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000340 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100341 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
342 p.daemon = True
343 p.start()
344 p.join(5)
345 self.assertEqual(p.exitcode, code)
346
347 with open(testfn, 'r') as f:
348 self.assertEqual(f.read().rstrip(), str(reason))
349
350 for reason in (True, False, 8):
351 p = self.Process(target=sys.exit, args=(reason,))
352 p.daemon = True
353 p.start()
354 p.join(5)
355 self.assertEqual(p.exitcode, reason)
356
Benjamin Petersondfd79492008-06-13 19:13:39 +0000357#
358#
359#
360
361class _UpperCaser(multiprocessing.Process):
362
363 def __init__(self):
364 multiprocessing.Process.__init__(self)
365 self.child_conn, self.parent_conn = multiprocessing.Pipe()
366
367 def run(self):
368 self.parent_conn.close()
369 for s in iter(self.child_conn.recv, None):
370 self.child_conn.send(s.upper())
371 self.child_conn.close()
372
373 def submit(self, s):
374 assert type(s) is str
375 self.parent_conn.send(s)
376 return self.parent_conn.recv()
377
378 def stop(self):
379 self.parent_conn.send(None)
380 self.parent_conn.close()
381 self.child_conn.close()
382
383class _TestSubclassingProcess(BaseTestCase):
384
385 ALLOWED_TYPES = ('processes',)
386
387 def test_subclassing(self):
388 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200389 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000390 uppercaser.start()
391 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
392 self.assertEqual(uppercaser.submit('world'), 'WORLD')
393 uppercaser.stop()
394 uppercaser.join()
395
396#
397#
398#
399
400def queue_empty(q):
401 if hasattr(q, 'empty'):
402 return q.empty()
403 else:
404 return q.qsize() == 0
405
406def queue_full(q, maxsize):
407 if hasattr(q, 'full'):
408 return q.full()
409 else:
410 return q.qsize() == maxsize
411
412
413class _TestQueue(BaseTestCase):
414
415
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000416 @classmethod
417 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000418 child_can_start.wait()
419 for i in range(6):
420 queue.get()
421 parent_can_continue.set()
422
423 def test_put(self):
424 MAXSIZE = 6
425 queue = self.Queue(maxsize=MAXSIZE)
426 child_can_start = self.Event()
427 parent_can_continue = self.Event()
428
429 proc = self.Process(
430 target=self._test_put,
431 args=(queue, child_can_start, parent_can_continue)
432 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000433 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000434 proc.start()
435
436 self.assertEqual(queue_empty(queue), True)
437 self.assertEqual(queue_full(queue, MAXSIZE), False)
438
439 queue.put(1)
440 queue.put(2, True)
441 queue.put(3, True, None)
442 queue.put(4, False)
443 queue.put(5, False, None)
444 queue.put_nowait(6)
445
446 # the values may be in buffer but not yet in pipe so sleep a bit
447 time.sleep(DELTA)
448
449 self.assertEqual(queue_empty(queue), False)
450 self.assertEqual(queue_full(queue, MAXSIZE), True)
451
452 put = TimingWrapper(queue.put)
453 put_nowait = TimingWrapper(queue.put_nowait)
454
455 self.assertRaises(Queue.Full, put, 7, False)
456 self.assertTimingAlmostEqual(put.elapsed, 0)
457
458 self.assertRaises(Queue.Full, put, 7, False, None)
459 self.assertTimingAlmostEqual(put.elapsed, 0)
460
461 self.assertRaises(Queue.Full, put_nowait, 7)
462 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
463
464 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
465 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
466
467 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
468 self.assertTimingAlmostEqual(put.elapsed, 0)
469
470 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
471 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
472
473 child_can_start.set()
474 parent_can_continue.wait()
475
476 self.assertEqual(queue_empty(queue), True)
477 self.assertEqual(queue_full(queue, MAXSIZE), False)
478
479 proc.join()
480
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000481 @classmethod
482 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000483 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000484 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000485 queue.put(2)
486 queue.put(3)
487 queue.put(4)
488 queue.put(5)
489 parent_can_continue.set()
490
491 def test_get(self):
492 queue = self.Queue()
493 child_can_start = self.Event()
494 parent_can_continue = self.Event()
495
496 proc = self.Process(
497 target=self._test_get,
498 args=(queue, child_can_start, parent_can_continue)
499 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000500 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000501 proc.start()
502
503 self.assertEqual(queue_empty(queue), True)
504
505 child_can_start.set()
506 parent_can_continue.wait()
507
508 time.sleep(DELTA)
509 self.assertEqual(queue_empty(queue), False)
510
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000511 # Hangs unexpectedly, remove for now
512 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000513 self.assertEqual(queue.get(True, None), 2)
514 self.assertEqual(queue.get(True), 3)
515 self.assertEqual(queue.get(timeout=1), 4)
516 self.assertEqual(queue.get_nowait(), 5)
517
518 self.assertEqual(queue_empty(queue), True)
519
520 get = TimingWrapper(queue.get)
521 get_nowait = TimingWrapper(queue.get_nowait)
522
523 self.assertRaises(Queue.Empty, get, False)
524 self.assertTimingAlmostEqual(get.elapsed, 0)
525
526 self.assertRaises(Queue.Empty, get, False, None)
527 self.assertTimingAlmostEqual(get.elapsed, 0)
528
529 self.assertRaises(Queue.Empty, get_nowait)
530 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
531
532 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
533 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
534
535 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
536 self.assertTimingAlmostEqual(get.elapsed, 0)
537
538 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
539 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
540
541 proc.join()
542
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000543 @classmethod
544 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000545 for i in range(10, 20):
546 queue.put(i)
547 # note that at this point the items may only be buffered, so the
548 # process cannot shutdown until the feeder thread has finished
549 # pushing items onto the pipe.
550
551 def test_fork(self):
552 # Old versions of Queue would fail to create a new feeder
553 # thread for a forked process if the original process had its
554 # own feeder thread. This test checks that this no longer
555 # happens.
556
557 queue = self.Queue()
558
559 # put items on queue so that main process starts a feeder thread
560 for i in range(10):
561 queue.put(i)
562
563 # wait to make sure thread starts before we fork a new process
564 time.sleep(DELTA)
565
566 # fork process
567 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200568 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000569 p.start()
570
571 # check that all expected items are in the queue
572 for i in range(20):
573 self.assertEqual(queue.get(), i)
574 self.assertRaises(Queue.Empty, queue.get, False)
575
576 p.join()
577
578 def test_qsize(self):
579 q = self.Queue()
580 try:
581 self.assertEqual(q.qsize(), 0)
582 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600583 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584 q.put(1)
585 self.assertEqual(q.qsize(), 1)
586 q.put(5)
587 self.assertEqual(q.qsize(), 2)
588 q.get()
589 self.assertEqual(q.qsize(), 1)
590 q.get()
591 self.assertEqual(q.qsize(), 0)
592
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000593 @classmethod
594 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000595 for obj in iter(q.get, None):
596 time.sleep(DELTA)
597 q.task_done()
598
599 def test_task_done(self):
600 queue = self.JoinableQueue()
601
602 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000603 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000604
605 workers = [self.Process(target=self._test_task_done, args=(queue,))
606 for i in xrange(4)]
607
608 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200609 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000610 p.start()
611
612 for i in xrange(10):
613 queue.put(i)
614
615 queue.join()
616
617 for p in workers:
618 queue.put(None)
619
620 for p in workers:
621 p.join()
622
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200623 def test_no_import_lock_contention(self):
624 with test_support.temp_cwd():
625 module_name = 'imported_by_an_imported_module'
626 with open(module_name + '.py', 'w') as f:
627 f.write("""if 1:
628 import multiprocessing
629
630 q = multiprocessing.Queue()
631 q.put('knock knock')
632 q.get(timeout=3)
633 q.close()
634 """)
635
636 with test_support.DirsOnSysPath(os.getcwd()):
637 try:
638 __import__(module_name)
639 except Queue.Empty:
640 self.fail("Probable regression on import lock contention;"
641 " see Issue #22853")
642
Benjamin Petersondfd79492008-06-13 19:13:39 +0000643#
644#
645#
646
647class _TestLock(BaseTestCase):
648
649 def test_lock(self):
650 lock = self.Lock()
651 self.assertEqual(lock.acquire(), True)
652 self.assertEqual(lock.acquire(False), False)
653 self.assertEqual(lock.release(), None)
654 self.assertRaises((ValueError, threading.ThreadError), lock.release)
655
656 def test_rlock(self):
657 lock = self.RLock()
658 self.assertEqual(lock.acquire(), True)
659 self.assertEqual(lock.acquire(), True)
660 self.assertEqual(lock.acquire(), True)
661 self.assertEqual(lock.release(), None)
662 self.assertEqual(lock.release(), None)
663 self.assertEqual(lock.release(), None)
664 self.assertRaises((AssertionError, RuntimeError), lock.release)
665
Jesse Noller82eb5902009-03-30 23:29:31 +0000666 def test_lock_context(self):
667 with self.Lock():
668 pass
669
Benjamin Petersondfd79492008-06-13 19:13:39 +0000670
671class _TestSemaphore(BaseTestCase):
672
673 def _test_semaphore(self, sem):
674 self.assertReturnsIfImplemented(2, get_value, sem)
675 self.assertEqual(sem.acquire(), True)
676 self.assertReturnsIfImplemented(1, get_value, sem)
677 self.assertEqual(sem.acquire(), True)
678 self.assertReturnsIfImplemented(0, get_value, sem)
679 self.assertEqual(sem.acquire(False), False)
680 self.assertReturnsIfImplemented(0, get_value, sem)
681 self.assertEqual(sem.release(), None)
682 self.assertReturnsIfImplemented(1, get_value, sem)
683 self.assertEqual(sem.release(), None)
684 self.assertReturnsIfImplemented(2, get_value, sem)
685
686 def test_semaphore(self):
687 sem = self.Semaphore(2)
688 self._test_semaphore(sem)
689 self.assertEqual(sem.release(), None)
690 self.assertReturnsIfImplemented(3, get_value, sem)
691 self.assertEqual(sem.release(), None)
692 self.assertReturnsIfImplemented(4, get_value, sem)
693
694 def test_bounded_semaphore(self):
695 sem = self.BoundedSemaphore(2)
696 self._test_semaphore(sem)
697 # Currently fails on OS/X
698 #if HAVE_GETVALUE:
699 # self.assertRaises(ValueError, sem.release)
700 # self.assertReturnsIfImplemented(2, get_value, sem)
701
702 def test_timeout(self):
703 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600704 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000705
706 sem = self.Semaphore(0)
707 acquire = TimingWrapper(sem.acquire)
708
709 self.assertEqual(acquire(False), False)
710 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
711
712 self.assertEqual(acquire(False, None), False)
713 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
714
715 self.assertEqual(acquire(False, TIMEOUT1), False)
716 self.assertTimingAlmostEqual(acquire.elapsed, 0)
717
718 self.assertEqual(acquire(True, TIMEOUT2), False)
719 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
720
721 self.assertEqual(acquire(timeout=TIMEOUT3), False)
722 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
723
724
725class _TestCondition(BaseTestCase):
726
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000727 @classmethod
728 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000729 cond.acquire()
730 sleeping.release()
731 cond.wait(timeout)
732 woken.release()
733 cond.release()
734
735 def check_invariant(self, cond):
736 # this is only supposed to succeed when there are no sleepers
737 if self.TYPE == 'processes':
738 try:
739 sleepers = (cond._sleeping_count.get_value() -
740 cond._woken_count.get_value())
741 self.assertEqual(sleepers, 0)
742 self.assertEqual(cond._wait_semaphore.get_value(), 0)
743 except NotImplementedError:
744 pass
745
746 def test_notify(self):
747 cond = self.Condition()
748 sleeping = self.Semaphore(0)
749 woken = self.Semaphore(0)
750
751 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000752 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000753 p.start()
754
755 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000756 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000757 p.start()
758
759 # wait for both children to start sleeping
760 sleeping.acquire()
761 sleeping.acquire()
762
763 # check no process/thread has woken up
764 time.sleep(DELTA)
765 self.assertReturnsIfImplemented(0, get_value, woken)
766
767 # wake up one process/thread
768 cond.acquire()
769 cond.notify()
770 cond.release()
771
772 # check one process/thread has woken up
773 time.sleep(DELTA)
774 self.assertReturnsIfImplemented(1, get_value, woken)
775
776 # wake up another
777 cond.acquire()
778 cond.notify()
779 cond.release()
780
781 # check other has woken up
782 time.sleep(DELTA)
783 self.assertReturnsIfImplemented(2, get_value, woken)
784
785 # check state is not mucked up
786 self.check_invariant(cond)
787 p.join()
788
789 def test_notify_all(self):
790 cond = self.Condition()
791 sleeping = self.Semaphore(0)
792 woken = self.Semaphore(0)
793
794 # start some threads/processes which will timeout
795 for i in range(3):
796 p = self.Process(target=self.f,
797 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000798 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 p.start()
800
801 t = threading.Thread(target=self.f,
802 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000803 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000804 t.start()
805
806 # wait for them all to sleep
807 for i in xrange(6):
808 sleeping.acquire()
809
810 # check they have all timed out
811 for i in xrange(6):
812 woken.acquire()
813 self.assertReturnsIfImplemented(0, get_value, woken)
814
815 # check state is not mucked up
816 self.check_invariant(cond)
817
818 # start some more threads/processes
819 for i in range(3):
820 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000821 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000822 p.start()
823
824 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000825 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000826 t.start()
827
828 # wait for them to all sleep
829 for i in xrange(6):
830 sleeping.acquire()
831
832 # check no process/thread has woken up
833 time.sleep(DELTA)
834 self.assertReturnsIfImplemented(0, get_value, woken)
835
836 # wake them all up
837 cond.acquire()
838 cond.notify_all()
839 cond.release()
840
841 # check they have all woken
842 time.sleep(DELTA)
843 self.assertReturnsIfImplemented(6, get_value, woken)
844
845 # check state is not mucked up
846 self.check_invariant(cond)
847
848 def test_timeout(self):
849 cond = self.Condition()
850 wait = TimingWrapper(cond.wait)
851 cond.acquire()
852 res = wait(TIMEOUT1)
853 cond.release()
854 self.assertEqual(res, None)
855 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
856
857
858class _TestEvent(BaseTestCase):
859
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000860 @classmethod
861 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000862 time.sleep(TIMEOUT2)
863 event.set()
864
865 def test_event(self):
866 event = self.Event()
867 wait = TimingWrapper(event.wait)
868
Ezio Melottic2077b02011-03-16 12:34:31 +0200869 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000870 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000871 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000872
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000873 # Removed, threading.Event.wait() will return the value of the __flag
874 # instead of None. API Shear with the semaphore backed mp.Event
875 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000876 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000877 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000878 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
879
880 event.set()
881
882 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000883 self.assertEqual(event.is_set(), True)
884 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000885 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000886 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000887 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
888 # self.assertEqual(event.is_set(), True)
889
890 event.clear()
891
892 #self.assertEqual(event.is_set(), False)
893
Jesus Cea6f6016b2011-09-09 20:26:57 +0200894 p = self.Process(target=self._test_event, args=(event,))
895 p.daemon = True
896 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000897 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000898
899#
900#
901#
902
903class _TestValue(BaseTestCase):
904
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000905 ALLOWED_TYPES = ('processes',)
906
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 codes_values = [
908 ('i', 4343, 24234),
909 ('d', 3.625, -4.25),
910 ('h', -232, 234),
911 ('c', latin('x'), latin('y'))
912 ]
913
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000914 def setUp(self):
915 if not HAS_SHAREDCTYPES:
916 self.skipTest("requires multiprocessing.sharedctypes")
917
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000918 @classmethod
919 def _test(cls, values):
920 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000921 sv.value = cv[2]
922
923
924 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000925 if raw:
926 values = [self.RawValue(code, value)
927 for code, value, _ in self.codes_values]
928 else:
929 values = [self.Value(code, value)
930 for code, value, _ in self.codes_values]
931
932 for sv, cv in zip(values, self.codes_values):
933 self.assertEqual(sv.value, cv[1])
934
935 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200936 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000937 proc.start()
938 proc.join()
939
940 for sv, cv in zip(values, self.codes_values):
941 self.assertEqual(sv.value, cv[2])
942
943 def test_rawvalue(self):
944 self.test_value(raw=True)
945
946 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000947 val1 = self.Value('i', 5)
948 lock1 = val1.get_lock()
949 obj1 = val1.get_obj()
950
951 val2 = self.Value('i', 5, lock=None)
952 lock2 = val2.get_lock()
953 obj2 = val2.get_obj()
954
955 lock = self.Lock()
956 val3 = self.Value('i', 5, lock=lock)
957 lock3 = val3.get_lock()
958 obj3 = val3.get_obj()
959 self.assertEqual(lock, lock3)
960
Jesse Noller6ab22152009-01-18 02:45:38 +0000961 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000962 self.assertFalse(hasattr(arr4, 'get_lock'))
963 self.assertFalse(hasattr(arr4, 'get_obj'))
964
Jesse Noller6ab22152009-01-18 02:45:38 +0000965 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
966
967 arr5 = self.RawValue('i', 5)
968 self.assertFalse(hasattr(arr5, 'get_lock'))
969 self.assertFalse(hasattr(arr5, 'get_obj'))
970
Benjamin Petersondfd79492008-06-13 19:13:39 +0000971
972class _TestArray(BaseTestCase):
973
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000974 ALLOWED_TYPES = ('processes',)
975
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000976 @classmethod
977 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000978 for i in range(1, len(seq)):
979 seq[i] += seq[i-1]
980
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000981 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000982 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000983 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
984 if raw:
985 arr = self.RawArray('i', seq)
986 else:
987 arr = self.Array('i', seq)
988
989 self.assertEqual(len(arr), len(seq))
990 self.assertEqual(arr[3], seq[3])
991 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
992
993 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
994
995 self.assertEqual(list(arr[:]), seq)
996
997 self.f(seq)
998
999 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001000 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001001 p.start()
1002 p.join()
1003
1004 self.assertEqual(list(arr[:]), seq)
1005
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001006 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001007 def test_array_from_size(self):
1008 size = 10
1009 # Test for zeroing (see issue #11675).
1010 # The repetition below strengthens the test by increasing the chances
1011 # of previously allocated non-zero memory being used for the new array
1012 # on the 2nd and 3rd loops.
1013 for _ in range(3):
1014 arr = self.Array('i', size)
1015 self.assertEqual(len(arr), size)
1016 self.assertEqual(list(arr), [0] * size)
1017 arr[:] = range(10)
1018 self.assertEqual(list(arr), range(10))
1019 del arr
1020
1021 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001022 def test_rawarray(self):
1023 self.test_array(raw=True)
1024
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001025 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001026 def test_array_accepts_long(self):
1027 arr = self.Array('i', 10L)
1028 self.assertEqual(len(arr), 10)
1029 raw_arr = self.RawArray('i', 10L)
1030 self.assertEqual(len(raw_arr), 10)
1031
1032 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001033 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001034 arr1 = self.Array('i', range(10))
1035 lock1 = arr1.get_lock()
1036 obj1 = arr1.get_obj()
1037
1038 arr2 = self.Array('i', range(10), lock=None)
1039 lock2 = arr2.get_lock()
1040 obj2 = arr2.get_obj()
1041
1042 lock = self.Lock()
1043 arr3 = self.Array('i', range(10), lock=lock)
1044 lock3 = arr3.get_lock()
1045 obj3 = arr3.get_obj()
1046 self.assertEqual(lock, lock3)
1047
Jesse Noller6ab22152009-01-18 02:45:38 +00001048 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001049 self.assertFalse(hasattr(arr4, 'get_lock'))
1050 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001051 self.assertRaises(AttributeError,
1052 self.Array, 'i', range(10), lock='notalock')
1053
1054 arr5 = self.RawArray('i', range(10))
1055 self.assertFalse(hasattr(arr5, 'get_lock'))
1056 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001057
1058#
1059#
1060#
1061
1062class _TestContainers(BaseTestCase):
1063
1064 ALLOWED_TYPES = ('manager',)
1065
1066 def test_list(self):
1067 a = self.list(range(10))
1068 self.assertEqual(a[:], range(10))
1069
1070 b = self.list()
1071 self.assertEqual(b[:], [])
1072
1073 b.extend(range(5))
1074 self.assertEqual(b[:], range(5))
1075
1076 self.assertEqual(b[2], 2)
1077 self.assertEqual(b[2:10], [2,3,4])
1078
1079 b *= 2
1080 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1081
1082 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1083
1084 self.assertEqual(a[:], range(10))
1085
1086 d = [a, b]
1087 e = self.list(d)
1088 self.assertEqual(
1089 e[:],
1090 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1091 )
1092
1093 f = self.list([a])
1094 a.append('hello')
1095 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1096
1097 def test_dict(self):
1098 d = self.dict()
1099 indices = range(65, 70)
1100 for i in indices:
1101 d[i] = chr(i)
1102 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1103 self.assertEqual(sorted(d.keys()), indices)
1104 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1105 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1106
1107 def test_namespace(self):
1108 n = self.Namespace()
1109 n.name = 'Bob'
1110 n.job = 'Builder'
1111 n._hidden = 'hidden'
1112 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1113 del n.job
1114 self.assertEqual(str(n), "Namespace(name='Bob')")
1115 self.assertTrue(hasattr(n, 'name'))
1116 self.assertTrue(not hasattr(n, 'job'))
1117
1118#
1119#
1120#
1121
1122def sqr(x, wait=0.0):
1123 time.sleep(wait)
1124 return x*x
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001125
1126class SayWhenError(ValueError): pass
1127
1128def exception_throwing_generator(total, when):
1129 for i in range(total):
1130 if i == when:
1131 raise SayWhenError("Somebody said when")
1132 yield i
1133
Benjamin Petersondfd79492008-06-13 19:13:39 +00001134class _TestPool(BaseTestCase):
1135
1136 def test_apply(self):
1137 papply = self.pool.apply
1138 self.assertEqual(papply(sqr, (5,)), sqr(5))
1139 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1140
1141 def test_map(self):
1142 pmap = self.pool.map
1143 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1144 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1145 map(sqr, range(100)))
1146
Richard Oudkerk21aad972013-10-28 23:02:22 +00001147 def test_map_unplicklable(self):
1148 # Issue #19425 -- failure to pickle should not cause a hang
1149 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001150 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001151 class A(object):
1152 def __reduce__(self):
1153 raise RuntimeError('cannot pickle')
1154 with self.assertRaises(RuntimeError):
1155 self.pool.map(sqr, [A()]*10)
1156
Jesse Noller7530e472009-07-16 14:23:04 +00001157 def test_map_chunksize(self):
1158 try:
1159 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1160 except multiprocessing.TimeoutError:
1161 self.fail("pool.map_async with chunksize stalled on null list")
1162
Benjamin Petersondfd79492008-06-13 19:13:39 +00001163 def test_async(self):
1164 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1165 get = TimingWrapper(res.get)
1166 self.assertEqual(get(), 49)
1167 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1168
1169 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001170 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001171 get = TimingWrapper(res.get)
1172 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1173 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1174
1175 def test_imap(self):
1176 it = self.pool.imap(sqr, range(10))
1177 self.assertEqual(list(it), map(sqr, range(10)))
1178
1179 it = self.pool.imap(sqr, range(10))
1180 for i in range(10):
1181 self.assertEqual(it.next(), i*i)
1182 self.assertRaises(StopIteration, it.next)
1183
1184 it = self.pool.imap(sqr, range(1000), chunksize=100)
1185 for i in range(1000):
1186 self.assertEqual(it.next(), i*i)
1187 self.assertRaises(StopIteration, it.next)
1188
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001189 def test_imap_handle_iterable_exception(self):
1190 if self.TYPE == 'manager':
1191 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1192
1193 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1194 for i in range(3):
1195 self.assertEqual(next(it), i*i)
1196 self.assertRaises(SayWhenError, it.next)
1197
1198 # SayWhenError seen at start of problematic chunk's results
1199 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1200 for i in range(6):
1201 self.assertEqual(next(it), i*i)
1202 self.assertRaises(SayWhenError, it.next)
1203 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1204 for i in range(4):
1205 self.assertEqual(next(it), i*i)
1206 self.assertRaises(SayWhenError, it.next)
1207
Benjamin Petersondfd79492008-06-13 19:13:39 +00001208 def test_imap_unordered(self):
1209 it = self.pool.imap_unordered(sqr, range(1000))
1210 self.assertEqual(sorted(it), map(sqr, range(1000)))
1211
1212 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1213 self.assertEqual(sorted(it), map(sqr, range(1000)))
1214
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001215 def test_imap_unordered_handle_iterable_exception(self):
1216 if self.TYPE == 'manager':
1217 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1218
1219 it = self.pool.imap_unordered(sqr,
1220 exception_throwing_generator(10, 3),
1221 1)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001222 expected_values = map(sqr, range(10))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001223 with self.assertRaises(SayWhenError):
1224 # imap_unordered makes it difficult to anticipate the SayWhenError
1225 for i in range(10):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001226 value = next(it)
1227 self.assertIn(value, expected_values)
1228 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001229
1230 it = self.pool.imap_unordered(sqr,
1231 exception_throwing_generator(20, 7),
1232 2)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001233 expected_values = map(sqr, range(20))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001234 with self.assertRaises(SayWhenError):
1235 for i in range(20):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001236 value = next(it)
1237 self.assertIn(value, expected_values)
1238 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001239
Benjamin Petersondfd79492008-06-13 19:13:39 +00001240 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001241 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1242 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1243
Benjamin Petersondfd79492008-06-13 19:13:39 +00001244 p = multiprocessing.Pool(3)
1245 self.assertEqual(3, len(p._pool))
1246 p.close()
1247 p.join()
1248
1249 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001250 p = self.Pool(4)
1251 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001252 time.sleep, [0.1 for i in range(10000)], chunksize=1
1253 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001254 p.terminate()
1255 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001256 join()
1257 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001258
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001259 def test_empty_iterable(self):
1260 # See Issue 12157
1261 p = self.Pool(1)
1262
1263 self.assertEqual(p.map(sqr, []), [])
1264 self.assertEqual(list(p.imap(sqr, [])), [])
1265 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1266 self.assertEqual(p.map_async(sqr, []).get(), [])
1267
1268 p.close()
1269 p.join()
1270
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001271def unpickleable_result():
1272 return lambda: 42
1273
1274class _TestPoolWorkerErrors(BaseTestCase):
1275 ALLOWED_TYPES = ('processes', )
1276
1277 def test_unpickleable_result(self):
1278 from multiprocessing.pool import MaybeEncodingError
1279 p = multiprocessing.Pool(2)
1280
1281 # Make sure we don't lose pool processes because of encoding errors.
1282 for iteration in range(20):
1283 res = p.apply_async(unpickleable_result)
1284 self.assertRaises(MaybeEncodingError, res.get)
1285
1286 p.close()
1287 p.join()
1288
Jesse Noller654ade32010-01-27 03:05:57 +00001289class _TestPoolWorkerLifetime(BaseTestCase):
1290
1291 ALLOWED_TYPES = ('processes', )
1292 def test_pool_worker_lifetime(self):
1293 p = multiprocessing.Pool(3, maxtasksperchild=10)
1294 self.assertEqual(3, len(p._pool))
1295 origworkerpids = [w.pid for w in p._pool]
1296 # Run many tasks so each worker gets replaced (hopefully)
1297 results = []
1298 for i in range(100):
1299 results.append(p.apply_async(sqr, (i, )))
1300 # Fetch the results and verify we got the right answers,
1301 # also ensuring all the tasks have completed.
1302 for (j, res) in enumerate(results):
1303 self.assertEqual(res.get(), sqr(j))
1304 # Refill the pool
1305 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001306 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001307 # (countdown * DELTA = 5 seconds max startup process time)
1308 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001309 while countdown and not all(w.is_alive() for w in p._pool):
1310 countdown -= 1
1311 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001312 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001313 # All pids should be assigned. See issue #7805.
1314 self.assertNotIn(None, origworkerpids)
1315 self.assertNotIn(None, finalworkerpids)
1316 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001317 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1318 p.close()
1319 p.join()
1320
Charles-François Natali46f990e2011-10-24 18:43:51 +02001321 def test_pool_worker_lifetime_early_close(self):
1322 # Issue #10332: closing a pool whose workers have limited lifetimes
1323 # before all the tasks completed would make join() hang.
1324 p = multiprocessing.Pool(3, maxtasksperchild=1)
1325 results = []
1326 for i in range(6):
1327 results.append(p.apply_async(sqr, (i, 0.3)))
1328 p.close()
1329 p.join()
1330 # check the results
1331 for (j, res) in enumerate(results):
1332 self.assertEqual(res.get(), sqr(j))
1333
1334
Benjamin Petersondfd79492008-06-13 19:13:39 +00001335#
1336# Test that manager has expected number of shared objects left
1337#
1338
1339class _TestZZZNumberOfObjects(BaseTestCase):
1340 # Because test cases are sorted alphabetically, this one will get
1341 # run after all the other tests for the manager. It tests that
1342 # there have been no "reference leaks" for the manager's shared
1343 # objects. Note the comment in _TestPool.test_terminate().
1344 ALLOWED_TYPES = ('manager',)
1345
1346 def test_number_of_objects(self):
1347 EXPECTED_NUMBER = 1 # the pool object is still alive
1348 multiprocessing.active_children() # discard dead process objs
1349 gc.collect() # do garbage collection
1350 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001351 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001352 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001353 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001354 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001355
1356 self.assertEqual(refs, EXPECTED_NUMBER)
1357
1358#
1359# Test of creating a customized manager class
1360#
1361
1362from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1363
1364class FooBar(object):
1365 def f(self):
1366 return 'f()'
1367 def g(self):
1368 raise ValueError
1369 def _h(self):
1370 return '_h()'
1371
1372def baz():
1373 for i in xrange(10):
1374 yield i*i
1375
1376class IteratorProxy(BaseProxy):
1377 _exposed_ = ('next', '__next__')
1378 def __iter__(self):
1379 return self
1380 def next(self):
1381 return self._callmethod('next')
1382 def __next__(self):
1383 return self._callmethod('__next__')
1384
1385class MyManager(BaseManager):
1386 pass
1387
1388MyManager.register('Foo', callable=FooBar)
1389MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1390MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1391
1392
1393class _TestMyManager(BaseTestCase):
1394
1395 ALLOWED_TYPES = ('manager',)
1396
1397 def test_mymanager(self):
1398 manager = MyManager()
1399 manager.start()
1400
1401 foo = manager.Foo()
1402 bar = manager.Bar()
1403 baz = manager.baz()
1404
1405 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1406 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1407
1408 self.assertEqual(foo_methods, ['f', 'g'])
1409 self.assertEqual(bar_methods, ['f', '_h'])
1410
1411 self.assertEqual(foo.f(), 'f()')
1412 self.assertRaises(ValueError, foo.g)
1413 self.assertEqual(foo._callmethod('f'), 'f()')
1414 self.assertRaises(RemoteError, foo._callmethod, '_h')
1415
1416 self.assertEqual(bar.f(), 'f()')
1417 self.assertEqual(bar._h(), '_h()')
1418 self.assertEqual(bar._callmethod('f'), 'f()')
1419 self.assertEqual(bar._callmethod('_h'), '_h()')
1420
1421 self.assertEqual(list(baz), [i*i for i in range(10)])
1422
1423 manager.shutdown()
1424
1425#
1426# Test of connecting to a remote server and using xmlrpclib for serialization
1427#
1428
1429_queue = Queue.Queue()
1430def get_queue():
1431 return _queue
1432
1433class QueueManager(BaseManager):
1434 '''manager class used by server process'''
1435QueueManager.register('get_queue', callable=get_queue)
1436
1437class QueueManager2(BaseManager):
1438 '''manager class which specifies the same interface as QueueManager'''
1439QueueManager2.register('get_queue')
1440
1441
1442SERIALIZER = 'xmlrpclib'
1443
1444class _TestRemoteManager(BaseTestCase):
1445
1446 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001447 values = ['hello world', None, True, 2.25,
1448 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1449 ]
1450 result = values[:]
1451 if test_support.have_unicode:
1452 #result[-1] = u'hall\xe5 v\xe4rlden'
1453 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1454 r'\u0441\u0432\u0456\u0442')
1455 values.append(uvalue)
1456 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001457
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001458 @classmethod
1459 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001460 manager = QueueManager2(
1461 address=address, authkey=authkey, serializer=SERIALIZER
1462 )
1463 manager.connect()
1464 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001465 # Note that xmlrpclib will deserialize object as a list not a tuple
1466 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001467
1468 def test_remote(self):
1469 authkey = os.urandom(32)
1470
1471 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001472 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001473 )
1474 manager.start()
1475
1476 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001477 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001478 p.start()
1479
1480 manager2 = QueueManager2(
1481 address=manager.address, authkey=authkey, serializer=SERIALIZER
1482 )
1483 manager2.connect()
1484 queue = manager2.get_queue()
1485
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001486 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001487
1488 # Because we are using xmlrpclib for serialization instead of
1489 # pickle this will cause a serialization error.
1490 self.assertRaises(Exception, queue.put, time.sleep)
1491
1492 # Make queue finalizer run before the server is stopped
1493 del queue
1494 manager.shutdown()
1495
Jesse Noller459a6482009-03-30 15:50:42 +00001496class _TestManagerRestart(BaseTestCase):
1497
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001498 @classmethod
1499 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001500 manager = QueueManager(
1501 address=address, authkey=authkey, serializer=SERIALIZER)
1502 manager.connect()
1503 queue = manager.get_queue()
1504 queue.put('hello world')
1505
1506 def test_rapid_restart(self):
1507 authkey = os.urandom(32)
1508 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001509 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001510 srvr = manager.get_server()
1511 addr = srvr.address
1512 # Close the connection.Listener socket which gets opened as a part
1513 # of manager.get_server(). It's not needed for the test.
1514 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001515 manager.start()
1516
1517 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001518 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001519 p.start()
1520 queue = manager.get_queue()
1521 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001522 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001523 manager.shutdown()
1524 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001525 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001526 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001527 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001528
Benjamin Petersondfd79492008-06-13 19:13:39 +00001529#
1530#
1531#
1532
1533SENTINEL = latin('')
1534
1535class _TestConnection(BaseTestCase):
1536
1537 ALLOWED_TYPES = ('processes', 'threads')
1538
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001539 @classmethod
1540 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001541 for msg in iter(conn.recv_bytes, SENTINEL):
1542 conn.send_bytes(msg)
1543 conn.close()
1544
1545 def test_connection(self):
1546 conn, child_conn = self.Pipe()
1547
1548 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001549 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001550 p.start()
1551
1552 seq = [1, 2.25, None]
1553 msg = latin('hello world')
1554 longmsg = msg * 10
1555 arr = array.array('i', range(4))
1556
1557 if self.TYPE == 'processes':
1558 self.assertEqual(type(conn.fileno()), int)
1559
1560 self.assertEqual(conn.send(seq), None)
1561 self.assertEqual(conn.recv(), seq)
1562
1563 self.assertEqual(conn.send_bytes(msg), None)
1564 self.assertEqual(conn.recv_bytes(), msg)
1565
1566 if self.TYPE == 'processes':
1567 buffer = array.array('i', [0]*10)
1568 expected = list(arr) + [0] * (10 - len(arr))
1569 self.assertEqual(conn.send_bytes(arr), None)
1570 self.assertEqual(conn.recv_bytes_into(buffer),
1571 len(arr) * buffer.itemsize)
1572 self.assertEqual(list(buffer), expected)
1573
1574 buffer = array.array('i', [0]*10)
1575 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1576 self.assertEqual(conn.send_bytes(arr), None)
1577 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1578 len(arr) * buffer.itemsize)
1579 self.assertEqual(list(buffer), expected)
1580
1581 buffer = bytearray(latin(' ' * 40))
1582 self.assertEqual(conn.send_bytes(longmsg), None)
1583 try:
1584 res = conn.recv_bytes_into(buffer)
1585 except multiprocessing.BufferTooShort, e:
1586 self.assertEqual(e.args, (longmsg,))
1587 else:
1588 self.fail('expected BufferTooShort, got %s' % res)
1589
1590 poll = TimingWrapper(conn.poll)
1591
1592 self.assertEqual(poll(), False)
1593 self.assertTimingAlmostEqual(poll.elapsed, 0)
1594
1595 self.assertEqual(poll(TIMEOUT1), False)
1596 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1597
1598 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001599 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001600
1601 self.assertEqual(poll(TIMEOUT1), True)
1602 self.assertTimingAlmostEqual(poll.elapsed, 0)
1603
1604 self.assertEqual(conn.recv(), None)
1605
1606 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1607 conn.send_bytes(really_big_msg)
1608 self.assertEqual(conn.recv_bytes(), really_big_msg)
1609
1610 conn.send_bytes(SENTINEL) # tell child to quit
1611 child_conn.close()
1612
1613 if self.TYPE == 'processes':
1614 self.assertEqual(conn.readable, True)
1615 self.assertEqual(conn.writable, True)
1616 self.assertRaises(EOFError, conn.recv)
1617 self.assertRaises(EOFError, conn.recv_bytes)
1618
1619 p.join()
1620
1621 def test_duplex_false(self):
1622 reader, writer = self.Pipe(duplex=False)
1623 self.assertEqual(writer.send(1), None)
1624 self.assertEqual(reader.recv(), 1)
1625 if self.TYPE == 'processes':
1626 self.assertEqual(reader.readable, True)
1627 self.assertEqual(reader.writable, False)
1628 self.assertEqual(writer.readable, False)
1629 self.assertEqual(writer.writable, True)
1630 self.assertRaises(IOError, reader.send, 2)
1631 self.assertRaises(IOError, writer.recv)
1632 self.assertRaises(IOError, writer.poll)
1633
1634 def test_spawn_close(self):
1635 # We test that a pipe connection can be closed by parent
1636 # process immediately after child is spawned. On Windows this
1637 # would have sometimes failed on old versions because
1638 # child_conn would be closed before the child got a chance to
1639 # duplicate it.
1640 conn, child_conn = self.Pipe()
1641
1642 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001643 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001644 p.start()
1645 child_conn.close() # this might complete before child initializes
1646
1647 msg = latin('hello')
1648 conn.send_bytes(msg)
1649 self.assertEqual(conn.recv_bytes(), msg)
1650
1651 conn.send_bytes(SENTINEL)
1652 conn.close()
1653 p.join()
1654
1655 def test_sendbytes(self):
1656 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001657 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001658
1659 msg = latin('abcdefghijklmnopqrstuvwxyz')
1660 a, b = self.Pipe()
1661
1662 a.send_bytes(msg)
1663 self.assertEqual(b.recv_bytes(), msg)
1664
1665 a.send_bytes(msg, 5)
1666 self.assertEqual(b.recv_bytes(), msg[5:])
1667
1668 a.send_bytes(msg, 7, 8)
1669 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1670
1671 a.send_bytes(msg, 26)
1672 self.assertEqual(b.recv_bytes(), latin(''))
1673
1674 a.send_bytes(msg, 26, 0)
1675 self.assertEqual(b.recv_bytes(), latin(''))
1676
1677 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1678
1679 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1680
1681 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1682
1683 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1684
1685 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1686
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001687 @classmethod
1688 def _is_fd_assigned(cls, fd):
1689 try:
1690 os.fstat(fd)
1691 except OSError as e:
1692 if e.errno == errno.EBADF:
1693 return False
1694 raise
1695 else:
1696 return True
1697
1698 @classmethod
1699 def _writefd(cls, conn, data, create_dummy_fds=False):
1700 if create_dummy_fds:
1701 for i in range(0, 256):
1702 if not cls._is_fd_assigned(i):
1703 os.dup2(conn.fileno(), i)
1704 fd = reduction.recv_handle(conn)
1705 if msvcrt:
1706 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1707 os.write(fd, data)
1708 os.close(fd)
1709
Charles-François Natalif8413b22011-09-21 18:44:49 +02001710 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001711 def test_fd_transfer(self):
1712 if self.TYPE != 'processes':
1713 self.skipTest("only makes sense with processes")
1714 conn, child_conn = self.Pipe(duplex=True)
1715
1716 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001717 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001718 p.start()
1719 with open(test_support.TESTFN, "wb") as f:
1720 fd = f.fileno()
1721 if msvcrt:
1722 fd = msvcrt.get_osfhandle(fd)
1723 reduction.send_handle(conn, fd, p.pid)
1724 p.join()
1725 with open(test_support.TESTFN, "rb") as f:
1726 self.assertEqual(f.read(), b"foo")
1727
Charles-François Natalif8413b22011-09-21 18:44:49 +02001728 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001729 @unittest.skipIf(sys.platform == "win32",
1730 "test semantics don't make sense on Windows")
1731 @unittest.skipIf(MAXFD <= 256,
1732 "largest assignable fd number is too small")
1733 @unittest.skipUnless(hasattr(os, "dup2"),
1734 "test needs os.dup2()")
1735 def test_large_fd_transfer(self):
1736 # With fd > 256 (issue #11657)
1737 if self.TYPE != 'processes':
1738 self.skipTest("only makes sense with processes")
1739 conn, child_conn = self.Pipe(duplex=True)
1740
1741 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001742 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001743 p.start()
1744 with open(test_support.TESTFN, "wb") as f:
1745 fd = f.fileno()
1746 for newfd in range(256, MAXFD):
1747 if not self._is_fd_assigned(newfd):
1748 break
1749 else:
1750 self.fail("could not find an unassigned large file descriptor")
1751 os.dup2(fd, newfd)
1752 try:
1753 reduction.send_handle(conn, newfd, p.pid)
1754 finally:
1755 os.close(newfd)
1756 p.join()
1757 with open(test_support.TESTFN, "rb") as f:
1758 self.assertEqual(f.read(), b"bar")
1759
Jesus Ceac23484b2011-09-21 03:47:39 +02001760 @classmethod
1761 def _send_data_without_fd(self, conn):
1762 os.write(conn.fileno(), b"\0")
1763
Charles-François Natalif8413b22011-09-21 18:44:49 +02001764 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001765 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1766 def test_missing_fd_transfer(self):
1767 # Check that exception is raised when received data is not
1768 # accompanied by a file descriptor in ancillary data.
1769 if self.TYPE != 'processes':
1770 self.skipTest("only makes sense with processes")
1771 conn, child_conn = self.Pipe(duplex=True)
1772
1773 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1774 p.daemon = True
1775 p.start()
1776 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1777 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001778
Benjamin Petersondfd79492008-06-13 19:13:39 +00001779class _TestListenerClient(BaseTestCase):
1780
1781 ALLOWED_TYPES = ('processes', 'threads')
1782
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001783 @classmethod
1784 def _test(cls, address):
1785 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001786 conn.send('hello')
1787 conn.close()
1788
1789 def test_listener_client(self):
1790 for family in self.connection.families:
1791 l = self.connection.Listener(family=family)
1792 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001793 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001794 p.start()
1795 conn = l.accept()
1796 self.assertEqual(conn.recv(), 'hello')
1797 p.join()
1798 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001799
1800 def test_issue14725(self):
1801 l = self.connection.Listener()
1802 p = self.Process(target=self._test, args=(l.address,))
1803 p.daemon = True
1804 p.start()
1805 time.sleep(1)
1806 # On Windows the client process should by now have connected,
1807 # written data and closed the pipe handle by now. This causes
1808 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1809 # 14725.
1810 conn = l.accept()
1811 self.assertEqual(conn.recv(), 'hello')
1812 conn.close()
1813 p.join()
1814 l.close()
1815
Benjamin Petersondfd79492008-06-13 19:13:39 +00001816#
1817# Test of sending connection and socket objects between processes
1818#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001819"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001820class _TestPicklingConnections(BaseTestCase):
1821
1822 ALLOWED_TYPES = ('processes',)
1823
1824 def _listener(self, conn, families):
1825 for fam in families:
1826 l = self.connection.Listener(family=fam)
1827 conn.send(l.address)
1828 new_conn = l.accept()
1829 conn.send(new_conn)
1830
1831 if self.TYPE == 'processes':
1832 l = socket.socket()
1833 l.bind(('localhost', 0))
1834 conn.send(l.getsockname())
1835 l.listen(1)
1836 new_conn, addr = l.accept()
1837 conn.send(new_conn)
1838
1839 conn.recv()
1840
1841 def _remote(self, conn):
1842 for (address, msg) in iter(conn.recv, None):
1843 client = self.connection.Client(address)
1844 client.send(msg.upper())
1845 client.close()
1846
1847 if self.TYPE == 'processes':
1848 address, msg = conn.recv()
1849 client = socket.socket()
1850 client.connect(address)
1851 client.sendall(msg.upper())
1852 client.close()
1853
1854 conn.close()
1855
1856 def test_pickling(self):
1857 try:
1858 multiprocessing.allow_connection_pickling()
1859 except ImportError:
1860 return
1861
1862 families = self.connection.families
1863
1864 lconn, lconn0 = self.Pipe()
1865 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001866 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001867 lp.start()
1868 lconn0.close()
1869
1870 rconn, rconn0 = self.Pipe()
1871 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001872 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001873 rp.start()
1874 rconn0.close()
1875
1876 for fam in families:
1877 msg = ('This connection uses family %s' % fam).encode('ascii')
1878 address = lconn.recv()
1879 rconn.send((address, msg))
1880 new_conn = lconn.recv()
1881 self.assertEqual(new_conn.recv(), msg.upper())
1882
1883 rconn.send(None)
1884
1885 if self.TYPE == 'processes':
1886 msg = latin('This connection uses a normal socket')
1887 address = lconn.recv()
1888 rconn.send((address, msg))
1889 if hasattr(socket, 'fromfd'):
1890 new_conn = lconn.recv()
1891 self.assertEqual(new_conn.recv(100), msg.upper())
1892 else:
1893 # XXX On Windows with Py2.6 need to backport fromfd()
1894 discard = lconn.recv_bytes()
1895
1896 lconn.send(None)
1897
1898 rconn.close()
1899 lconn.close()
1900
1901 lp.join()
1902 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001903"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001904#
1905#
1906#
1907
1908class _TestHeap(BaseTestCase):
1909
1910 ALLOWED_TYPES = ('processes',)
1911
1912 def test_heap(self):
1913 iterations = 5000
1914 maxblocks = 50
1915 blocks = []
1916
1917 # create and destroy lots of blocks of different sizes
1918 for i in xrange(iterations):
1919 size = int(random.lognormvariate(0, 1) * 1000)
1920 b = multiprocessing.heap.BufferWrapper(size)
1921 blocks.append(b)
1922 if len(blocks) > maxblocks:
1923 i = random.randrange(maxblocks)
1924 del blocks[i]
1925
1926 # get the heap object
1927 heap = multiprocessing.heap.BufferWrapper._heap
1928
1929 # verify the state of the heap
1930 all = []
1931 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001932 heap._lock.acquire()
1933 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001934 for L in heap._len_to_seq.values():
1935 for arena, start, stop in L:
1936 all.append((heap._arenas.index(arena), start, stop,
1937 stop-start, 'free'))
1938 for arena, start, stop in heap._allocated_blocks:
1939 all.append((heap._arenas.index(arena), start, stop,
1940 stop-start, 'occupied'))
1941 occupied += (stop-start)
1942
1943 all.sort()
1944
1945 for i in range(len(all)-1):
1946 (arena, start, stop) = all[i][:3]
1947 (narena, nstart, nstop) = all[i+1][:3]
1948 self.assertTrue((arena != narena and nstart == 0) or
1949 (stop == nstart))
1950
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001951 def test_free_from_gc(self):
1952 # Check that freeing of blocks by the garbage collector doesn't deadlock
1953 # (issue #12352).
1954 # Make sure the GC is enabled, and set lower collection thresholds to
1955 # make collections more frequent (and increase the probability of
1956 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001957 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001958 gc.enable()
1959 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001960 thresholds = gc.get_threshold()
1961 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001962 gc.set_threshold(10)
1963
1964 # perform numerous block allocations, with cyclic references to make
1965 # sure objects are collected asynchronously by the gc
1966 for i in range(5000):
1967 a = multiprocessing.heap.BufferWrapper(1)
1968 b = multiprocessing.heap.BufferWrapper(1)
1969 # circular references
1970 a.buddy = b
1971 b.buddy = a
1972
Benjamin Petersondfd79492008-06-13 19:13:39 +00001973#
1974#
1975#
1976
Benjamin Petersondfd79492008-06-13 19:13:39 +00001977class _Foo(Structure):
1978 _fields_ = [
1979 ('x', c_int),
1980 ('y', c_double)
1981 ]
1982
1983class _TestSharedCTypes(BaseTestCase):
1984
1985 ALLOWED_TYPES = ('processes',)
1986
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001987 def setUp(self):
1988 if not HAS_SHAREDCTYPES:
1989 self.skipTest("requires multiprocessing.sharedctypes")
1990
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001991 @classmethod
1992 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001993 x.value *= 2
1994 y.value *= 2
1995 foo.x *= 2
1996 foo.y *= 2
1997 string.value *= 2
1998 for i in range(len(arr)):
1999 arr[i] *= 2
2000
2001 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002002 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002003 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002004 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002005 arr = self.Array('d', range(10), lock=lock)
2006 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00002007 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002008
2009 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002010 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002011 p.start()
2012 p.join()
2013
2014 self.assertEqual(x.value, 14)
2015 self.assertAlmostEqual(y.value, 2.0/3.0)
2016 self.assertEqual(foo.x, 6)
2017 self.assertAlmostEqual(foo.y, 4.0)
2018 for i in range(10):
2019 self.assertAlmostEqual(arr[i], i*2)
2020 self.assertEqual(string.value, latin('hellohello'))
2021
2022 def test_synchronize(self):
2023 self.test_sharedctypes(lock=True)
2024
2025 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002026 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00002027 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002028 foo.x = 0
2029 foo.y = 0
2030 self.assertEqual(bar.x, 2)
2031 self.assertAlmostEqual(bar.y, 5.0)
2032
2033#
2034#
2035#
2036
2037class _TestFinalize(BaseTestCase):
2038
2039 ALLOWED_TYPES = ('processes',)
2040
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002041 @classmethod
2042 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002043 class Foo(object):
2044 pass
2045
2046 a = Foo()
2047 util.Finalize(a, conn.send, args=('a',))
2048 del a # triggers callback for a
2049
2050 b = Foo()
2051 close_b = util.Finalize(b, conn.send, args=('b',))
2052 close_b() # triggers callback for b
2053 close_b() # does nothing because callback has already been called
2054 del b # does nothing because callback has already been called
2055
2056 c = Foo()
2057 util.Finalize(c, conn.send, args=('c',))
2058
2059 d10 = Foo()
2060 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2061
2062 d01 = Foo()
2063 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2064 d02 = Foo()
2065 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2066 d03 = Foo()
2067 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2068
2069 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2070
2071 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2072
Ezio Melottic2077b02011-03-16 12:34:31 +02002073 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002074 # garbage collecting locals
2075 util._exit_function()
2076 conn.close()
2077 os._exit(0)
2078
2079 def test_finalize(self):
2080 conn, child_conn = self.Pipe()
2081
2082 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002083 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002084 p.start()
2085 p.join()
2086
2087 result = [obj for obj in iter(conn.recv, 'STOP')]
2088 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2089
2090#
2091# Test that from ... import * works for each module
2092#
2093
2094class _TestImportStar(BaseTestCase):
2095
2096 ALLOWED_TYPES = ('processes',)
2097
2098 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002099 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002100 'multiprocessing', 'multiprocessing.connection',
2101 'multiprocessing.heap', 'multiprocessing.managers',
2102 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002103 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002104 ]
2105
Charles-François Natalif8413b22011-09-21 18:44:49 +02002106 if HAS_REDUCTION:
2107 modules.append('multiprocessing.reduction')
2108
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002109 if c_int is not None:
2110 # This module requires _ctypes
2111 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002112
2113 for name in modules:
2114 __import__(name)
2115 mod = sys.modules[name]
2116
2117 for attr in getattr(mod, '__all__', ()):
2118 self.assertTrue(
2119 hasattr(mod, attr),
2120 '%r does not have attribute %r' % (mod, attr)
2121 )
2122
2123#
2124# Quick test that logging works -- does not test logging output
2125#
2126
2127class _TestLogging(BaseTestCase):
2128
2129 ALLOWED_TYPES = ('processes',)
2130
2131 def test_enable_logging(self):
2132 logger = multiprocessing.get_logger()
2133 logger.setLevel(util.SUBWARNING)
2134 self.assertTrue(logger is not None)
2135 logger.debug('this will not be printed')
2136 logger.info('nor will this')
2137 logger.setLevel(LOG_LEVEL)
2138
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002139 @classmethod
2140 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002141 logger = multiprocessing.get_logger()
2142 conn.send(logger.getEffectiveLevel())
2143
2144 def test_level(self):
2145 LEVEL1 = 32
2146 LEVEL2 = 37
2147
2148 logger = multiprocessing.get_logger()
2149 root_logger = logging.getLogger()
2150 root_level = root_logger.level
2151
2152 reader, writer = multiprocessing.Pipe(duplex=False)
2153
2154 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002155 p = self.Process(target=self._test_level, args=(writer,))
2156 p.daemon = True
2157 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002158 self.assertEqual(LEVEL1, reader.recv())
2159
2160 logger.setLevel(logging.NOTSET)
2161 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002162 p = self.Process(target=self._test_level, args=(writer,))
2163 p.daemon = True
2164 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002165 self.assertEqual(LEVEL2, reader.recv())
2166
2167 root_logger.setLevel(root_level)
2168 logger.setLevel(level=LOG_LEVEL)
2169
Jesse Noller814d02d2009-11-21 14:38:23 +00002170
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002171# class _TestLoggingProcessName(BaseTestCase):
2172#
2173# def handle(self, record):
2174# assert record.processName == multiprocessing.current_process().name
2175# self.__handled = True
2176#
2177# def test_logging(self):
2178# handler = logging.Handler()
2179# handler.handle = self.handle
2180# self.__handled = False
2181# # Bypass getLogger() and side-effects
2182# logger = logging.getLoggerClass()(
2183# 'multiprocessing.test.TestLoggingProcessName')
2184# logger.addHandler(handler)
2185# logger.propagate = False
2186#
2187# logger.warn('foo')
2188# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002189
Benjamin Petersondfd79492008-06-13 19:13:39 +00002190#
Richard Oudkerkba482642013-02-26 12:37:07 +00002191# Check that Process.join() retries if os.waitpid() fails with EINTR
2192#
2193
2194class _TestPollEintr(BaseTestCase):
2195
2196 ALLOWED_TYPES = ('processes',)
2197
2198 @classmethod
2199 def _killer(cls, pid):
2200 time.sleep(0.5)
2201 os.kill(pid, signal.SIGUSR1)
2202
2203 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2204 def test_poll_eintr(self):
2205 got_signal = [False]
2206 def record(*args):
2207 got_signal[0] = True
2208 pid = os.getpid()
2209 oldhandler = signal.signal(signal.SIGUSR1, record)
2210 try:
2211 killer = self.Process(target=self._killer, args=(pid,))
2212 killer.start()
2213 p = self.Process(target=time.sleep, args=(1,))
2214 p.start()
2215 p.join()
2216 self.assertTrue(got_signal[0])
2217 self.assertEqual(p.exitcode, 0)
2218 killer.join()
2219 finally:
2220 signal.signal(signal.SIGUSR1, oldhandler)
2221
2222#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002223# Test to verify handle verification, see issue 3321
2224#
2225
2226class TestInvalidHandle(unittest.TestCase):
2227
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002228 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002229 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002230 conn = _multiprocessing.Connection(44977608)
2231 self.assertRaises(IOError, conn.poll)
2232 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002233
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002234#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002235# Functions used to create test cases from the base ones in this module
2236#
2237
2238def get_attributes(Source, names):
2239 d = {}
2240 for name in names:
2241 obj = getattr(Source, name)
2242 if type(obj) == type(get_attributes):
2243 obj = staticmethod(obj)
2244 d[name] = obj
2245 return d
2246
2247def create_test_cases(Mixin, type):
2248 result = {}
2249 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002250 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002251
2252 for name in glob.keys():
2253 if name.startswith('_Test'):
2254 base = glob[name]
2255 if type in base.ALLOWED_TYPES:
2256 newname = 'With' + Type + name[1:]
2257 class Temp(base, unittest.TestCase, Mixin):
2258 pass
2259 result[newname] = Temp
2260 Temp.__name__ = newname
2261 Temp.__module__ = Mixin.__module__
2262 return result
2263
2264#
2265# Create test cases
2266#
2267
2268class ProcessesMixin(object):
2269 TYPE = 'processes'
2270 Process = multiprocessing.Process
2271 locals().update(get_attributes(multiprocessing, (
2272 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2273 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2274 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002275 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002276 )))
2277
2278testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2279globals().update(testcases_processes)
2280
2281
2282class ManagerMixin(object):
2283 TYPE = 'manager'
2284 Process = multiprocessing.Process
2285 manager = object.__new__(multiprocessing.managers.SyncManager)
2286 locals().update(get_attributes(manager, (
2287 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2288 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002289 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002290 )))
2291
2292testcases_manager = create_test_cases(ManagerMixin, type='manager')
2293globals().update(testcases_manager)
2294
2295
2296class ThreadsMixin(object):
2297 TYPE = 'threads'
2298 Process = multiprocessing.dummy.Process
2299 locals().update(get_attributes(multiprocessing.dummy, (
2300 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2301 'Condition', 'Event', 'Value', 'Array', 'current_process',
2302 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002303 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002304 )))
2305
2306testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2307globals().update(testcases_threads)
2308
Neal Norwitz0c519b32008-08-25 01:50:24 +00002309class OtherTest(unittest.TestCase):
2310 # TODO: add more tests for deliver/answer challenge.
2311 def test_deliver_challenge_auth_failure(self):
2312 class _FakeConnection(object):
2313 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002314 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002315 def send_bytes(self, data):
2316 pass
2317 self.assertRaises(multiprocessing.AuthenticationError,
2318 multiprocessing.connection.deliver_challenge,
2319 _FakeConnection(), b'abc')
2320
2321 def test_answer_challenge_auth_failure(self):
2322 class _FakeConnection(object):
2323 def __init__(self):
2324 self.count = 0
2325 def recv_bytes(self, size):
2326 self.count += 1
2327 if self.count == 1:
2328 return multiprocessing.connection.CHALLENGE
2329 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002330 return b'something bogus'
2331 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002332 def send_bytes(self, data):
2333 pass
2334 self.assertRaises(multiprocessing.AuthenticationError,
2335 multiprocessing.connection.answer_challenge,
2336 _FakeConnection(), b'abc')
2337
Jesse Noller7152f6d2009-04-02 05:17:26 +00002338#
2339# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2340#
2341
2342def initializer(ns):
2343 ns.test += 1
2344
2345class TestInitializers(unittest.TestCase):
2346 def setUp(self):
2347 self.mgr = multiprocessing.Manager()
2348 self.ns = self.mgr.Namespace()
2349 self.ns.test = 0
2350
2351 def tearDown(self):
2352 self.mgr.shutdown()
2353
2354 def test_manager_initializer(self):
2355 m = multiprocessing.managers.SyncManager()
2356 self.assertRaises(TypeError, m.start, 1)
2357 m.start(initializer, (self.ns,))
2358 self.assertEqual(self.ns.test, 1)
2359 m.shutdown()
2360
2361 def test_pool_initializer(self):
2362 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2363 p = multiprocessing.Pool(1, initializer, (self.ns,))
2364 p.close()
2365 p.join()
2366 self.assertEqual(self.ns.test, 1)
2367
Jesse Noller1b90efb2009-06-30 17:11:52 +00002368#
2369# Issue 5155, 5313, 5331: Test process in processes
2370# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2371#
2372
Richard Oudkerkc5496072013-09-29 17:10:40 +01002373def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002374 try:
2375 item = q.get(block=False)
2376 except Queue.Empty:
2377 pass
2378
Richard Oudkerkc5496072013-09-29 17:10:40 +01002379def _test_process(q):
2380 queue = multiprocessing.Queue()
2381 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2382 subProc.daemon = True
2383 subProc.start()
2384 subProc.join()
2385
Jesse Noller1b90efb2009-06-30 17:11:52 +00002386def _afunc(x):
2387 return x*x
2388
2389def pool_in_process():
2390 pool = multiprocessing.Pool(processes=4)
2391 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2392
2393class _file_like(object):
2394 def __init__(self, delegate):
2395 self._delegate = delegate
2396 self._pid = None
2397
2398 @property
2399 def cache(self):
2400 pid = os.getpid()
2401 # There are no race conditions since fork keeps only the running thread
2402 if pid != self._pid:
2403 self._pid = pid
2404 self._cache = []
2405 return self._cache
2406
2407 def write(self, data):
2408 self.cache.append(data)
2409
2410 def flush(self):
2411 self._delegate.write(''.join(self.cache))
2412 self._cache = []
2413
2414class TestStdinBadfiledescriptor(unittest.TestCase):
2415
2416 def test_queue_in_process(self):
2417 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002418 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002419 proc.start()
2420 proc.join()
2421
2422 def test_pool_in_process(self):
2423 p = multiprocessing.Process(target=pool_in_process)
2424 p.start()
2425 p.join()
2426
2427 def test_flushing(self):
2428 sio = StringIO()
2429 flike = _file_like(sio)
2430 flike.write('foo')
2431 proc = multiprocessing.Process(target=lambda: flike.flush())
2432 flike.flush()
2433 assert sio.getvalue() == 'foo'
2434
Richard Oudkerke4b99382012-07-27 14:05:46 +01002435#
2436# Test interaction with socket timeouts - see Issue #6056
2437#
2438
2439class TestTimeouts(unittest.TestCase):
2440 @classmethod
2441 def _test_timeout(cls, child, address):
2442 time.sleep(1)
2443 child.send(123)
2444 child.close()
2445 conn = multiprocessing.connection.Client(address)
2446 conn.send(456)
2447 conn.close()
2448
2449 def test_timeout(self):
2450 old_timeout = socket.getdefaulttimeout()
2451 try:
2452 socket.setdefaulttimeout(0.1)
2453 parent, child = multiprocessing.Pipe(duplex=True)
2454 l = multiprocessing.connection.Listener(family='AF_INET')
2455 p = multiprocessing.Process(target=self._test_timeout,
2456 args=(child, l.address))
2457 p.start()
2458 child.close()
2459 self.assertEqual(parent.recv(), 123)
2460 parent.close()
2461 conn = l.accept()
2462 self.assertEqual(conn.recv(), 456)
2463 conn.close()
2464 l.close()
2465 p.join(10)
2466 finally:
2467 socket.setdefaulttimeout(old_timeout)
2468
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002469#
2470# Test what happens with no "if __name__ == '__main__'"
2471#
2472
2473class TestNoForkBomb(unittest.TestCase):
2474 def test_noforkbomb(self):
2475 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2476 if WIN32:
2477 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002478 self.assertEqual(out, '')
2479 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002480 else:
2481 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002482 self.assertEqual(out.rstrip(), '123')
2483 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002484
2485#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002486# Issue 12098: check sys.flags of child matches that for parent
2487#
2488
2489class TestFlags(unittest.TestCase):
2490 @classmethod
2491 def run_in_grandchild(cls, conn):
2492 conn.send(tuple(sys.flags))
2493
2494 @classmethod
2495 def run_in_child(cls):
2496 import json
2497 r, w = multiprocessing.Pipe(duplex=False)
2498 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2499 p.start()
2500 grandchild_flags = r.recv()
2501 p.join()
2502 r.close()
2503 w.close()
2504 flags = (tuple(sys.flags), grandchild_flags)
2505 print(json.dumps(flags))
2506
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002507 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002508 def test_flags(self):
2509 import json, subprocess
2510 # start child process using unusual flags
2511 prog = ('from test.test_multiprocessing import TestFlags; ' +
2512 'TestFlags.run_in_child()')
2513 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002514 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002515 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2516 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002517
2518#
2519# Issue #17555: ForkAwareThreadLock
2520#
2521
2522class TestForkAwareThreadLock(unittest.TestCase):
2523 # We recurisvely start processes. Issue #17555 meant that the
2524 # after fork registry would get duplicate entries for the same
2525 # lock. The size of the registry at generation n was ~2**n.
2526
2527 @classmethod
2528 def child(cls, n, conn):
2529 if n > 1:
2530 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2531 p.start()
2532 p.join()
2533 else:
2534 conn.send(len(util._afterfork_registry))
2535 conn.close()
2536
2537 def test_lock(self):
2538 r, w = multiprocessing.Pipe(False)
2539 l = util.ForkAwareThreadLock()
2540 old_size = len(util._afterfork_registry)
2541 p = multiprocessing.Process(target=self.child, args=(5, w))
2542 p.start()
2543 new_size = r.recv()
2544 p.join()
2545 self.assertLessEqual(new_size, old_size)
2546
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002547#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002548# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2549#
2550
2551class TestIgnoreEINTR(unittest.TestCase):
2552
2553 @classmethod
2554 def _test_ignore(cls, conn):
2555 def handler(signum, frame):
2556 pass
2557 signal.signal(signal.SIGUSR1, handler)
2558 conn.send('ready')
2559 x = conn.recv()
2560 conn.send(x)
2561 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2562
2563 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2564 def test_ignore(self):
2565 conn, child_conn = multiprocessing.Pipe()
2566 try:
2567 p = multiprocessing.Process(target=self._test_ignore,
2568 args=(child_conn,))
2569 p.daemon = True
2570 p.start()
2571 child_conn.close()
2572 self.assertEqual(conn.recv(), 'ready')
2573 time.sleep(0.1)
2574 os.kill(p.pid, signal.SIGUSR1)
2575 time.sleep(0.1)
2576 conn.send(1234)
2577 self.assertEqual(conn.recv(), 1234)
2578 time.sleep(0.1)
2579 os.kill(p.pid, signal.SIGUSR1)
2580 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2581 time.sleep(0.1)
2582 p.join()
2583 finally:
2584 conn.close()
2585
2586 @classmethod
2587 def _test_ignore_listener(cls, conn):
2588 def handler(signum, frame):
2589 pass
2590 signal.signal(signal.SIGUSR1, handler)
2591 l = multiprocessing.connection.Listener()
2592 conn.send(l.address)
2593 a = l.accept()
2594 a.send('welcome')
2595
2596 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2597 def test_ignore_listener(self):
2598 conn, child_conn = multiprocessing.Pipe()
2599 try:
2600 p = multiprocessing.Process(target=self._test_ignore_listener,
2601 args=(child_conn,))
2602 p.daemon = True
2603 p.start()
2604 child_conn.close()
2605 address = conn.recv()
2606 time.sleep(0.1)
2607 os.kill(p.pid, signal.SIGUSR1)
2608 time.sleep(0.1)
2609 client = multiprocessing.connection.Client(address)
2610 self.assertEqual(client.recv(), 'welcome')
2611 p.join()
2612 finally:
2613 conn.close()
2614
2615#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002616#
2617#
2618
Jesse Noller1b90efb2009-06-30 17:11:52 +00002619testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002620 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002621 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002622
Benjamin Petersondfd79492008-06-13 19:13:39 +00002623#
2624#
2625#
2626
2627def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002628 if sys.platform.startswith("linux"):
2629 try:
2630 lock = multiprocessing.RLock()
2631 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002632 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002633
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002634 check_enough_semaphores()
2635
Benjamin Petersondfd79492008-06-13 19:13:39 +00002636 if run is None:
2637 from test.test_support import run_unittest as run
2638
2639 util.get_temp_dir() # creates temp directory for use by all processes
2640
2641 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2642
Jesse Noller146b7ab2008-07-02 16:44:09 +00002643 ProcessesMixin.pool = multiprocessing.Pool(4)
2644 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2645 ManagerMixin.manager.__init__()
2646 ManagerMixin.manager.start()
2647 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002648
2649 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002650 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2651 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002652 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2653 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002654 )
2655
2656 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2657 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002658 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2659 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002660 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002661 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002662 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002663 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2664 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2665 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002666 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002667
Jesse Noller146b7ab2008-07-02 16:44:09 +00002668 ThreadsMixin.pool.terminate()
2669 ProcessesMixin.pool.terminate()
2670 ManagerMixin.pool.terminate()
2671 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002672
Jesse Noller146b7ab2008-07-02 16:44:09 +00002673 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002674
2675def main():
2676 test_main(unittest.TextTestRunner(verbosity=2).run)
2677
2678if __name__ == '__main__':
2679 main()